Fix bug 916: NS instance operational status not updated after successful instantiation
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
25
26 from osm_lcm import ROclient
27 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
28 from n2vc.k8s_helm_conn import K8sHelmConnector
29 from n2vc.k8s_juju_conn import K8sJujuConnector
30
31 from osm_common.dbbase import DbException
32 from osm_common.fsbase import FsException
33
34 from n2vc.n2vc_juju_conn import N2VCJujuConnector
35 from n2vc.exceptions import N2VCException
36
37 from copy import copy, deepcopy
38 from http import HTTPStatus
39 from time import time
40 from uuid import uuid4
41
42 __author__ = "Alfonso Tierno"
43
44
45 def get_iterable(in_dict, in_key):
46 """
47 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
48 :param in_dict: a dictionary
49 :param in_key: the key to look for at in_dict
50 :return: in_dict[in_var] or () if it is None or not present
51 """
52 if not in_dict.get(in_key):
53 return ()
54 return in_dict[in_key]
55
56
57 def populate_dict(target_dict, key_list, value):
58 """
59 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
60 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
61 :param target_dict: dictionary to be changed
62 :param key_list: list of keys to insert at target_dict
63 :param value:
64 :return: None
65 """
66 for key in key_list[0:-1]:
67 if key not in target_dict:
68 target_dict[key] = {}
69 target_dict = target_dict[key]
70 target_dict[key_list[-1]] = value
71
72
73 class NsLcm(LcmBase):
74 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
75 total_deploy_timeout = 2 * 3600 # global timeout for deployment
76 timeout_charm_delete = 10 * 60
77 timeout_primitive = 10 * 60 # timeout for primitive execution
78
79 SUBOPERATION_STATUS_NOT_FOUND = -1
80 SUBOPERATION_STATUS_NEW = -2
81 SUBOPERATION_STATUS_SKIP = -3
82
83 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
84 """
85 Init, Connect to database, filesystem storage, and messaging
86 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
87 :return: None
88 """
89 super().__init__(
90 db=db,
91 msg=msg,
92 fs=fs,
93 logger=logging.getLogger('lcm.ns')
94 )
95
96 self.loop = loop
97 self.lcm_tasks = lcm_tasks
98 self.ro_config = ro_config
99 self.vca_config = vca_config
100 if 'pubkey' in self.vca_config:
101 self.vca_config['public_key'] = self.vca_config['pubkey']
102 if 'cacert' in self.vca_config:
103 self.vca_config['ca_cert'] = self.vca_config['cacert']
104 if 'apiproxy' in self.vca_config:
105 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
106
107 # create N2VC connector
108 self.n2vc = N2VCJujuConnector(
109 db=self.db,
110 fs=self.fs,
111 log=self.logger,
112 loop=self.loop,
113 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
114 username=self.vca_config.get('user', None),
115 vca_config=self.vca_config,
116 on_update_db=self._on_update_n2vc_db,
117 # ca_cert=self.vca_config.get('cacert'),
118 # api_proxy=self.vca_config.get('apiproxy'),
119 )
120
121 self.k8sclusterhelm = K8sHelmConnector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helmpath"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 fs=self.fs,
134 log=self.logger,
135 db=self.db,
136 on_update_db=None,
137 )
138
139 # create RO client
140 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
141
142 def _on_update_n2vc_db(self, table, filter, path, updated_data):
143
144 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
145 .format(table, filter, path, updated_data))
146
147 return
148 # write NS status to database
149 # try:
150 # # nsrs_id = filter.get('_id')
151 # # print(nsrs_id)
152 # # get ns record
153 # nsr = self.db.get_one(table=table, q_filter=filter)
154 # # get VCA deployed list
155 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
156 # # get RO deployed
157 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
158 # for vca in vca_list:
159 # # status = vca.get('status')
160 # # print(status)
161 # # detailed_status = vca.get('detailed-status')
162 # # print(detailed_status)
163 # # for ro in ro_list:
164 # # print(ro)
165 #
166 # except Exception as e:
167 # self.logger.error('Error writing NS status to db: {}'.format(e))
168
169 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
170 """
171 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
172 :param vnfd: input vnfd
173 :param new_id: overrides vnf id if provided
174 :param additionalParams: Instantiation params for VNFs provided
175 :param nsrId: Id of the NSR
176 :return: copy of vnfd
177 """
178 try:
179 vnfd_RO = deepcopy(vnfd)
180 # remove unused by RO configuration, monitoring, scaling and internal keys
181 vnfd_RO.pop("_id", None)
182 vnfd_RO.pop("_admin", None)
183 vnfd_RO.pop("vnf-configuration", None)
184 vnfd_RO.pop("monitoring-param", None)
185 vnfd_RO.pop("scaling-group-descriptor", None)
186 vnfd_RO.pop("kdu", None)
187 vnfd_RO.pop("k8s-cluster", None)
188 if new_id:
189 vnfd_RO["id"] = new_id
190
191 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
192 for vdu in get_iterable(vnfd_RO, "vdu"):
193 cloud_init_file = None
194 if vdu.get("cloud-init-file"):
195 base_folder = vnfd["_admin"]["storage"]
196 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
197 vdu["cloud-init-file"])
198 with self.fs.file_open(cloud_init_file, "r") as ci_file:
199 cloud_init_content = ci_file.read()
200 vdu.pop("cloud-init-file", None)
201 elif vdu.get("cloud-init"):
202 cloud_init_content = vdu["cloud-init"]
203 else:
204 continue
205
206 env = Environment()
207 ast = env.parse(cloud_init_content)
208 mandatory_vars = meta.find_undeclared_variables(ast)
209 if mandatory_vars:
210 for var in mandatory_vars:
211 if not additionalParams or var not in additionalParams.keys():
212 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
213 "file, must be provided in the instantiation parameters inside the "
214 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
215 template = Template(cloud_init_content)
216 cloud_init_content = template.render(additionalParams or {})
217 vdu["cloud-init"] = cloud_init_content
218
219 return vnfd_RO
220 except FsException as e:
221 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
222 format(vnfd["id"], vdu["id"], cloud_init_file, e))
223 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
224 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
225 format(vnfd["id"], vdu["id"], e))
226
227 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
228 """
229 Creates a RO ns descriptor from OSM ns_instantiate params
230 :param ns_params: OSM instantiate params
231 :return: The RO ns descriptor
232 """
233 vim_2_RO = {}
234 wim_2_RO = {}
235 # TODO feature 1417: Check that no instantiation is set over PDU
236 # check if PDU forces a concrete vim-network-id and add it
237 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
238
239 def vim_account_2_RO(vim_account):
240 if vim_account in vim_2_RO:
241 return vim_2_RO[vim_account]
242
243 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
244 if db_vim["_admin"]["operationalState"] != "ENABLED":
245 raise LcmException("VIM={} is not available. operationalState={}".format(
246 vim_account, db_vim["_admin"]["operationalState"]))
247 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
248 vim_2_RO[vim_account] = RO_vim_id
249 return RO_vim_id
250
251 def wim_account_2_RO(wim_account):
252 if isinstance(wim_account, str):
253 if wim_account in wim_2_RO:
254 return wim_2_RO[wim_account]
255
256 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
257 if db_wim["_admin"]["operationalState"] != "ENABLED":
258 raise LcmException("WIM={} is not available. operationalState={}".format(
259 wim_account, db_wim["_admin"]["operationalState"]))
260 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
261 wim_2_RO[wim_account] = RO_wim_id
262 return RO_wim_id
263 else:
264 return wim_account
265
266 def ip_profile_2_RO(ip_profile):
267 RO_ip_profile = deepcopy((ip_profile))
268 if "dns-server" in RO_ip_profile:
269 if isinstance(RO_ip_profile["dns-server"], list):
270 RO_ip_profile["dns-address"] = []
271 for ds in RO_ip_profile.pop("dns-server"):
272 RO_ip_profile["dns-address"].append(ds['address'])
273 else:
274 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
275 if RO_ip_profile.get("ip-version") == "ipv4":
276 RO_ip_profile["ip-version"] = "IPv4"
277 if RO_ip_profile.get("ip-version") == "ipv6":
278 RO_ip_profile["ip-version"] = "IPv6"
279 if "dhcp-params" in RO_ip_profile:
280 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
281 return RO_ip_profile
282
283 if not ns_params:
284 return None
285 RO_ns_params = {
286 # "name": ns_params["nsName"],
287 # "description": ns_params.get("nsDescription"),
288 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
289 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
290 # "scenario": ns_params["nsdId"],
291 }
292
293 n2vc_key_list = n2vc_key_list or []
294 for vnfd_ref, vnfd in vnfd_dict.items():
295 vdu_needed_access = []
296 mgmt_cp = None
297 if vnfd.get("vnf-configuration"):
298 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
299 if ssh_required and vnfd.get("mgmt-interface"):
300 if vnfd["mgmt-interface"].get("vdu-id"):
301 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
302 elif vnfd["mgmt-interface"].get("cp"):
303 mgmt_cp = vnfd["mgmt-interface"]["cp"]
304
305 for vdu in vnfd.get("vdu", ()):
306 if vdu.get("vdu-configuration"):
307 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
308 if ssh_required:
309 vdu_needed_access.append(vdu["id"])
310 elif mgmt_cp:
311 for vdu_interface in vdu.get("interface"):
312 if vdu_interface.get("external-connection-point-ref") and \
313 vdu_interface["external-connection-point-ref"] == mgmt_cp:
314 vdu_needed_access.append(vdu["id"])
315 mgmt_cp = None
316 break
317
318 if vdu_needed_access:
319 for vnf_member in nsd.get("constituent-vnfd"):
320 if vnf_member["vnfd-id-ref"] != vnfd_ref:
321 continue
322 for vdu in vdu_needed_access:
323 populate_dict(RO_ns_params,
324 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
325 n2vc_key_list)
326
327 if ns_params.get("vduImage"):
328 RO_ns_params["vduImage"] = ns_params["vduImage"]
329
330 if ns_params.get("ssh_keys"):
331 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
332 for vnf_params in get_iterable(ns_params, "vnf"):
333 for constituent_vnfd in nsd["constituent-vnfd"]:
334 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
335 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
336 break
337 else:
338 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
339 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
340 if vnf_params.get("vimAccountId"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
342 vim_account_2_RO(vnf_params["vimAccountId"]))
343
344 for vdu_params in get_iterable(vnf_params, "vdu"):
345 # TODO feature 1417: check that this VDU exist and it is not a PDU
346 if vdu_params.get("volume"):
347 for volume_params in vdu_params["volume"]:
348 if volume_params.get("vim-volume-id"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
350 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
351 volume_params["vim-volume-id"])
352 if vdu_params.get("interface"):
353 for interface_params in vdu_params["interface"]:
354 if interface_params.get("ip-address"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "ip_address"),
358 interface_params["ip-address"])
359 if interface_params.get("mac-address"):
360 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
361 vdu_params["id"], "interfaces", interface_params["name"],
362 "mac_address"),
363 interface_params["mac-address"])
364 if interface_params.get("floating-ip-required"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_params["id"], "interfaces", interface_params["name"],
367 "floating-ip"),
368 interface_params["floating-ip-required"])
369
370 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
371 if internal_vld_params.get("vim-network-name"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
373 internal_vld_params["name"], "vim-network-name"),
374 internal_vld_params["vim-network-name"])
375 if internal_vld_params.get("vim-network-id"):
376 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
377 internal_vld_params["name"], "vim-network-id"),
378 internal_vld_params["vim-network-id"])
379 if internal_vld_params.get("ip-profile"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(internal_vld_params["ip-profile"]))
383 if internal_vld_params.get("provider-network"):
384
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
386 internal_vld_params["name"], "provider-network"),
387 internal_vld_params["provider-network"].copy())
388
389 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
390 # look for interface
391 iface_found = False
392 for vdu_descriptor in vnf_descriptor["vdu"]:
393 for vdu_interface in vdu_descriptor["interface"]:
394 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
395 if icp_params.get("ip-address"):
396 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
397 vdu_descriptor["id"], "interfaces",
398 vdu_interface["name"], "ip_address"),
399 icp_params["ip-address"])
400
401 if icp_params.get("mac-address"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_descriptor["id"], "interfaces",
404 vdu_interface["name"], "mac_address"),
405 icp_params["mac-address"])
406 iface_found = True
407 break
408 if iface_found:
409 break
410 else:
411 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
412 "internal-vld:id-ref={} is not present at vnfd:internal-"
413 "connection-point".format(vnf_params["member-vnf-index"],
414 icp_params["id-ref"]))
415
416 for vld_params in get_iterable(ns_params, "vld"):
417 if "ip-profile" in vld_params:
418 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
419 ip_profile_2_RO(vld_params["ip-profile"]))
420
421 if vld_params.get("provider-network"):
422
423 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
424 vld_params["provider-network"].copy())
425
426 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
427 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
428 wim_account_2_RO(vld_params["wimAccountId"])),
429 if vld_params.get("vim-network-name"):
430 RO_vld_sites = []
431 if isinstance(vld_params["vim-network-name"], dict):
432 for vim_account, vim_net in vld_params["vim-network-name"].items():
433 RO_vld_sites.append({
434 "netmap-use": vim_net,
435 "datacenter": vim_account_2_RO(vim_account)
436 })
437 else: # isinstance str
438 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
439 if RO_vld_sites:
440 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
441
442 if vld_params.get("vim-network-id"):
443 RO_vld_sites = []
444 if isinstance(vld_params["vim-network-id"], dict):
445 for vim_account, vim_net in vld_params["vim-network-id"].items():
446 RO_vld_sites.append({
447 "netmap-use": vim_net,
448 "datacenter": vim_account_2_RO(vim_account)
449 })
450 else: # isinstance str
451 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
452 if RO_vld_sites:
453 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
454 if vld_params.get("ns-net"):
455 if isinstance(vld_params["ns-net"], dict):
456 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
457 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
458 if RO_vld_ns_net:
459 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
460 if "vnfd-connection-point-ref" in vld_params:
461 for cp_params in vld_params["vnfd-connection-point-ref"]:
462 # look for interface
463 for constituent_vnfd in nsd["constituent-vnfd"]:
464 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
465 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
466 break
467 else:
468 raise LcmException(
469 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
470 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
471 match_cp = False
472 for vdu_descriptor in vnf_descriptor["vdu"]:
473 for interface_descriptor in vdu_descriptor["interface"]:
474 if interface_descriptor.get("external-connection-point-ref") == \
475 cp_params["vnfd-connection-point-ref"]:
476 match_cp = True
477 break
478 if match_cp:
479 break
480 else:
481 raise LcmException(
482 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
483 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
484 cp_params["member-vnf-index-ref"],
485 cp_params["vnfd-connection-point-ref"],
486 vnf_descriptor["id"]))
487 if cp_params.get("ip-address"):
488 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
489 vdu_descriptor["id"], "interfaces",
490 interface_descriptor["name"], "ip_address"),
491 cp_params["ip-address"])
492 if cp_params.get("mac-address"):
493 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
494 vdu_descriptor["id"], "interfaces",
495 interface_descriptor["name"], "mac_address"),
496 cp_params["mac-address"])
497 return RO_ns_params
498
499 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
500 # make a copy to do not change
501 vdu_create = copy(vdu_create)
502 vdu_delete = copy(vdu_delete)
503
504 vdurs = db_vnfr.get("vdur")
505 if vdurs is None:
506 vdurs = []
507 vdu_index = len(vdurs)
508 while vdu_index:
509 vdu_index -= 1
510 vdur = vdurs[vdu_index]
511 if vdur.get("pdu-type"):
512 continue
513 vdu_id_ref = vdur["vdu-id-ref"]
514 if vdu_create and vdu_create.get(vdu_id_ref):
515 for index in range(0, vdu_create[vdu_id_ref]):
516 vdur = deepcopy(vdur)
517 vdur["_id"] = str(uuid4())
518 vdur["count-index"] += 1
519 vdurs.insert(vdu_index+1+index, vdur)
520 del vdu_create[vdu_id_ref]
521 if vdu_delete and vdu_delete.get(vdu_id_ref):
522 del vdurs[vdu_index]
523 vdu_delete[vdu_id_ref] -= 1
524 if not vdu_delete[vdu_id_ref]:
525 del vdu_delete[vdu_id_ref]
526 # check all operations are done
527 if vdu_create or vdu_delete:
528 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
529 vdu_create))
530 if vdu_delete:
531 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
532 vdu_delete))
533
534 vnfr_update = {"vdur": vdurs}
535 db_vnfr["vdur"] = vdurs
536 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
537
538 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
539 """
540 Updates database nsr with the RO info for the created vld
541 :param ns_update_nsr: dictionary to be filled with the updated info
542 :param db_nsr: content of db_nsr. This is also modified
543 :param nsr_desc_RO: nsr descriptor from RO
544 :return: Nothing, LcmException is raised on errors
545 """
546
547 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
548 for net_RO in get_iterable(nsr_desc_RO, "nets"):
549 if vld["id"] != net_RO.get("ns_net_osm_id"):
550 continue
551 vld["vim-id"] = net_RO.get("vim_net_id")
552 vld["name"] = net_RO.get("vim_name")
553 vld["status"] = net_RO.get("status")
554 vld["status-detailed"] = net_RO.get("error_msg")
555 ns_update_nsr["vld.{}".format(vld_index)] = vld
556 break
557 else:
558 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
559
560 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
561 """
562 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
563 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
564 :param nsr_desc_RO: nsr descriptor from RO
565 :return: Nothing, LcmException is raised on errors
566 """
567 for vnf_index, db_vnfr in db_vnfrs.items():
568 for vnf_RO in nsr_desc_RO["vnfs"]:
569 if vnf_RO["member_vnf_index"] != vnf_index:
570 continue
571 vnfr_update = {}
572 if vnf_RO.get("ip_address"):
573 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
574 elif not db_vnfr.get("ip-address"):
575 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
576
577 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
578 vdur_RO_count_index = 0
579 if vdur.get("pdu-type"):
580 continue
581 for vdur_RO in get_iterable(vnf_RO, "vms"):
582 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
583 continue
584 if vdur["count-index"] != vdur_RO_count_index:
585 vdur_RO_count_index += 1
586 continue
587 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
588 if vdur_RO.get("ip_address"):
589 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
590 else:
591 vdur["ip-address"] = None
592 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
593 vdur["name"] = vdur_RO.get("vim_name")
594 vdur["status"] = vdur_RO.get("status")
595 vdur["status-detailed"] = vdur_RO.get("error_msg")
596 for ifacer in get_iterable(vdur, "interfaces"):
597 for interface_RO in get_iterable(vdur_RO, "interfaces"):
598 if ifacer["name"] == interface_RO.get("internal_name"):
599 ifacer["ip-address"] = interface_RO.get("ip_address")
600 ifacer["mac-address"] = interface_RO.get("mac_address")
601 break
602 else:
603 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
604 "from VIM info"
605 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
606 vnfr_update["vdur.{}".format(vdu_index)] = vdur
607 break
608 else:
609 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
610 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
611
612 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
613 for net_RO in get_iterable(nsr_desc_RO, "nets"):
614 if vld["id"] != net_RO.get("vnf_net_osm_id"):
615 continue
616 vld["vim-id"] = net_RO.get("vim_net_id")
617 vld["name"] = net_RO.get("vim_name")
618 vld["status"] = net_RO.get("status")
619 vld["status-detailed"] = net_RO.get("error_msg")
620 vnfr_update["vld.{}".format(vld_index)] = vld
621 break
622 else:
623 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
624 vnf_index, vld["id"]))
625
626 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
627 break
628
629 else:
630 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
631
632 @staticmethod
633 def _get_ns_config_info(vca_deployed_list):
634 """
635 Generates a mapping between vnf,vdu elements and the N2VC id
636 :param vca_deployed_list: List of database _admin.deploy.VCA that contains this list
637 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
638 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
639 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
640 """
641 mapping = {}
642 ns_config_info = {"osm-config-mapping": mapping}
643 for vca in vca_deployed_list:
644 if not vca["member-vnf-index"]:
645 continue
646 if not vca["vdu_id"]:
647 mapping[vca["member-vnf-index"]] = vca["application"]
648 else:
649 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
650 vca["application"]
651 return ns_config_info
652
653 @staticmethod
654 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
655 """
656 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
657 primitives as verify-ssh-credentials, or config when needed
658 :param desc_primitive_list: information of the descriptor
659 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
660 this element contains a ssh public key
661 :return: The modified list. Can ba an empty list, but always a list
662 """
663 if desc_primitive_list:
664 primitive_list = desc_primitive_list.copy()
665 else:
666 primitive_list = []
667 # look for primitive config, and get the position. None if not present
668 config_position = None
669 for index, primitive in enumerate(primitive_list):
670 if primitive["name"] == "config":
671 config_position = index
672 break
673
674 # for NS, add always a config primitive if not present (bug 874)
675 if not vca_deployed["member-vnf-index"] and config_position is None:
676 primitive_list.insert(0, {"name": "config", "parameter": []})
677 config_position = 0
678 # for VNF/VDU add verify-ssh-credentials after config
679 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
680 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
681 return primitive_list
682
683 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
684 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
685
686 db_nsr_update = {}
687 RO_descriptor_number = 0 # number of descriptors created at RO
688 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
689 start_deploy = time()
690 vdu_flag = False # If any of the VNFDs has VDUs
691 ns_params = db_nslcmop.get("operationParams")
692
693 # deploy RO
694
695 # get vnfds, instantiate at RO
696
697 for c_vnf in nsd.get("constituent-vnfd", ()):
698 member_vnf_index = c_vnf["member-vnf-index"]
699 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
700 if vnfd.get("vdu"):
701 vdu_flag = True
702 vnfd_ref = vnfd["id"]
703 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
704 " RO".format(vnfd_ref, member_vnf_index)
705 # self.logger.debug(logging_text + step)
706 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
707 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
708 RO_descriptor_number += 1
709
710 # look position at deployed.RO.vnfd if not present it will be appended at the end
711 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
712 if vnf_deployed["member-vnf-index"] == member_vnf_index:
713 break
714 else:
715 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
716 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
717
718 # look if present
719 RO_update = {"member-vnf-index": member_vnf_index}
720 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
721 if vnfd_list:
722 RO_update["id"] = vnfd_list[0]["uuid"]
723 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
724 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
725 else:
726 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
727 get("additionalParamsForVnf"), nsr_id)
728 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
729 RO_update["id"] = desc["uuid"]
730 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
731 vnfd_ref, member_vnf_index, desc["uuid"]))
732 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
733 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
734 self.update_db_2("nsrs", nsr_id, db_nsr_update)
735 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
736
737 # create nsd at RO
738 nsd_ref = nsd["id"]
739 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
740 # self.logger.debug(logging_text + step)
741
742 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
743 RO_descriptor_number += 1
744 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
745 if nsd_list:
746 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
747 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
748 nsd_ref, RO_nsd_uuid))
749 else:
750 nsd_RO = deepcopy(nsd)
751 nsd_RO["id"] = RO_osm_nsd_id
752 nsd_RO.pop("_id", None)
753 nsd_RO.pop("_admin", None)
754 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
755 member_vnf_index = c_vnf["member-vnf-index"]
756 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
757 for c_vld in nsd_RO.get("vld", ()):
758 for cp in c_vld.get("vnfd-connection-point-ref", ()):
759 member_vnf_index = cp["member-vnf-index-ref"]
760 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
761
762 desc = await self.RO.create("nsd", descriptor=nsd_RO)
763 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
764 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
765 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
766 self.update_db_2("nsrs", nsr_id, db_nsr_update)
767 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
768
769 # Crate ns at RO
770 # if present use it unless in error status
771 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
772 if RO_nsr_id:
773 try:
774 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
775 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
776 desc = await self.RO.show("ns", RO_nsr_id)
777 except ROclient.ROClientException as e:
778 if e.http_code != HTTPStatus.NOT_FOUND:
779 raise
780 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
781 if RO_nsr_id:
782 ns_status, ns_status_info = self.RO.check_ns_status(desc)
783 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
784 if ns_status == "ERROR":
785 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
786 .format(RO_nsr_id)
787 self.logger.debug(logging_text + step)
788 await self.RO.delete("ns", RO_nsr_id)
789 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
790 if not RO_nsr_id:
791 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
792 # self.logger.debug(logging_text + step)
793
794 # check if VIM is creating and wait look if previous tasks in process
795 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
796 if task_dependency:
797 step = "Waiting for related tasks to be completed: {}".format(task_name)
798 self.logger.debug(logging_text + step)
799 await asyncio.wait(task_dependency, timeout=3600)
800 if ns_params.get("vnf"):
801 for vnf in ns_params["vnf"]:
802 if "vimAccountId" in vnf:
803 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
804 vnf["vimAccountId"])
805 if task_dependency:
806 step = "Waiting for related tasks to be completed: {}".format(task_name)
807 self.logger.debug(logging_text + step)
808 await asyncio.wait(task_dependency, timeout=3600)
809
810 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
811
812 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
813
814 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
815 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
816 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
817 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
818 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
819 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
820 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
821 self.update_db_2("nsrs", nsr_id, db_nsr_update)
822 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
823
824 # wait until NS is ready
825 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
826 detailed_status_old = None
827 self.logger.debug(logging_text + step)
828
829 while time() <= start_deploy + self.total_deploy_timeout:
830 desc = await self.RO.show("ns", RO_nsr_id)
831 ns_status, ns_status_info = self.RO.check_ns_status(desc)
832 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
833 if ns_status == "ERROR":
834 raise ROclient.ROClientException(ns_status_info)
835 elif ns_status == "BUILD":
836 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
837 elif ns_status == "ACTIVE":
838 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
839 try:
840 if vdu_flag:
841 self.ns_update_vnfr(db_vnfrs, desc)
842 break
843 except LcmExceptionNoMgmtIP:
844 pass
845 else:
846 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
847 if detailed_status != detailed_status_old:
848 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
849 self.update_db_2("nsrs", nsr_id, db_nsr_update)
850 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
851 await asyncio.sleep(5, loop=self.loop)
852 else: # total_deploy_timeout
853 raise ROclient.ROClientException("Timeout waiting ns to be ready")
854
855 step = "Updating NSR"
856 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
857
858 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
859 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
860 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
861 self.update_db_2("nsrs", nsr_id, db_nsr_update)
862 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
863
864 step = "Deployed at VIM"
865 self.logger.debug(logging_text + step)
866
867 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
868 """
869 Wait for ip addres at RO, and optionally, insert public key in virtual machine
870 :param logging_text: prefix use for logging
871 :param nsr_id:
872 :param vnfr_id:
873 :param vdu_id:
874 :param vdu_index:
875 :param pub_key: public ssh key to inject, None to skip
876 :param user: user to apply the public ssh key
877 :return: IP address
878 """
879
880 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
881 ro_nsr_id = None
882 ip_address = None
883 nb_tries = 0
884 target_vdu_id = None
885 ro_retries = 0
886
887 while True:
888
889 ro_retries += 1
890 if ro_retries >= 360: # 1 hour
891 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
892
893 await asyncio.sleep(10, loop=self.loop)
894 # wait until NS is deployed at RO
895 if not ro_nsr_id:
896 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
897 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
898 if not ro_nsr_id:
899 continue
900
901 # get ip address
902 if not target_vdu_id:
903 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
904
905 if not vdu_id: # for the VNF case
906 ip_address = db_vnfr.get("ip-address")
907 if not ip_address:
908 continue
909 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
910 else: # VDU case
911 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
912 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
913
914 if not vdur:
915 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
916 vnfr_id, vdu_id, vdu_index
917 ))
918
919 if vdur.get("status") == "ACTIVE":
920 ip_address = vdur.get("ip-address")
921 if not ip_address:
922 continue
923 target_vdu_id = vdur["vdu-id-ref"]
924 elif vdur.get("status") == "ERROR":
925 raise LcmException("Cannot inject ssh-key because target VM is in error state")
926
927 if not target_vdu_id:
928 continue
929
930 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
931
932 # inject public key into machine
933 if pub_key and user:
934 # self.logger.debug(logging_text + "Inserting RO key")
935 try:
936 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
937 result_dict = await self.RO.create_action(
938 item="ns",
939 item_id_name=ro_nsr_id,
940 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
941 )
942 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
943 if not result_dict or not isinstance(result_dict, dict):
944 raise LcmException("Unknown response from RO when injecting key")
945 for result in result_dict.values():
946 if result.get("vim_result") == 200:
947 break
948 else:
949 raise ROclient.ROClientException("error injecting key: {}".format(
950 result.get("description")))
951 break
952 except ROclient.ROClientException as e:
953 if not nb_tries:
954 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
955 format(e, 20*10))
956 nb_tries += 1
957 if nb_tries >= 20:
958 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
959 else:
960 break
961
962 return ip_address
963
964 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
965 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
966 nsr_id = db_nsr["_id"]
967 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
968 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
969 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
970 db_dict = {
971 'collection': 'nsrs',
972 'filter': {'_id': nsr_id},
973 'path': db_update_entry
974 }
975 logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} ".format(db_vnfr["member-vnf-index-ref"],
976 vdu_id, vdu_index)
977
978 step = ""
979 try:
980 vnfr_id = None
981 if db_vnfr:
982 vnfr_id = db_vnfr["_id"]
983
984 namespace = "{nsi}.{ns}".format(
985 nsi=nsi_id if nsi_id else "",
986 ns=nsr_id)
987 if vnfr_id:
988 namespace += "." + vnfr_id
989 if vdu_id:
990 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
991
992 # Get artifact path
993 artifact_path = "{}/{}/charms/{}".format(
994 base_folder["folder"],
995 base_folder["pkg-dir"],
996 config_descriptor["juju"]["charm"]
997 )
998
999 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
1000 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
1001 is_proxy_charm = False
1002
1003 # n2vc_redesign STEP 3.1
1004
1005 # find old ee_id if exists
1006 ee_id = vca_deployed.get("ee_id")
1007
1008 # create or register execution environment in VCA
1009 if is_proxy_charm:
1010 step = "create execution environment"
1011 self.logger.debug(logging_text + step)
1012 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1013 reuse_ee_id=ee_id,
1014 db_dict=db_dict)
1015 else:
1016 step = "Waiting to VM being up and getting IP address"
1017 self.logger.debug(logging_text + step)
1018 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1019 user=None, pub_key=None)
1020 credentials = {"hostname": rw_mgmt_ip}
1021 # get username
1022 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1023 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1024 # merged. Meanwhile let's get username from initial-config-primitive
1025 if not username and config_descriptor.get("initial-config-primitive"):
1026 for config_primitive in config_descriptor["initial-config-primitive"]:
1027 for param in config_primitive.get("parameter", ()):
1028 if param["name"] == "ssh-username":
1029 username = param["value"]
1030 break
1031 if not username:
1032 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1033 "'config-access.ssh-access.default-user'")
1034 credentials["username"] = username
1035 # n2vc_redesign STEP 3.2
1036
1037 step = "register execution environment {}".format(credentials)
1038 self.logger.debug(logging_text + step)
1039 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1040 db_dict=db_dict)
1041
1042 # for compatibility with MON/POL modules, the need model and application name at database
1043 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1044 ee_id_parts = ee_id.split('.')
1045 model_name = ee_id_parts[0]
1046 application_name = ee_id_parts[1]
1047 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1048 db_update_entry + "application": application_name,
1049 db_update_entry + "ee_id": ee_id})
1050
1051 # n2vc_redesign STEP 3.3
1052
1053 step = "Install configuration Software"
1054 # TODO check if already done
1055 self.logger.debug(logging_text + step)
1056 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1057
1058 # if SSH access is required, then get execution environment SSH public
1059 if is_proxy_charm: # if native charm we have waited already to VM be UP
1060 pub_key = None
1061 user = None
1062 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1063 # Needed to inject a ssh key
1064 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1065 step = "Install configuration Software, getting public ssh key"
1066 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1067
1068 step = "Insert public key into VM"
1069 else:
1070 step = "Waiting to VM being up and getting IP address"
1071 self.logger.debug(logging_text + step)
1072
1073 # n2vc_redesign STEP 5.1
1074 # wait for RO (ip-address) Insert pub_key into VM
1075 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1076 user=user, pub_key=pub_key)
1077
1078 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1079
1080 # store rw_mgmt_ip in deploy params for later replacement
1081 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1082
1083 # n2vc_redesign STEP 6 Execute initial config primitive
1084 step = 'execute initial config primitive'
1085 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1086
1087 # sort initial config primitives by 'seq'
1088 try:
1089 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1090 except Exception as e:
1091 self.logger.error(logging_text + step + ": " + str(e))
1092
1093 # add config if not present for NS charm
1094 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1095 vca_deployed)
1096
1097 for initial_config_primitive in initial_config_primitive_list:
1098 # adding information on the vca_deployed if it is a NS execution environment
1099 if not vca_deployed["member-vnf-index"]:
1100 deploy_params["ns_config_info"] = self._get_ns_config_info(vca_deployed_list)
1101 # TODO check if already done
1102 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1103
1104 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1105 self.logger.debug(logging_text + step)
1106 await self.n2vc.exec_primitive(
1107 ee_id=ee_id,
1108 primitive_name=initial_config_primitive["name"],
1109 params_dict=primitive_params_,
1110 db_dict=db_dict
1111 )
1112 # TODO register in database that primitive is done
1113
1114 step = "instantiated at VCA"
1115 self.logger.debug(logging_text + step)
1116
1117 except Exception as e: # TODO not use Exception but N2VC exception
1118 raise Exception("{} {}".format(step, e)) from e
1119 # TODO raise N2VC exception with 'step' extra information
1120
1121 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1122 error_description: str = None, error_detail: str = None):
1123 try:
1124 db_dict = dict()
1125 if ns_state:
1126 db_dict["nsState"] = ns_state
1127 db_dict["currentOperation"] = current_operation
1128 db_dict["currentOperationID"] = current_operation_id
1129 db_dict["errorDescription"] = error_description
1130 db_dict["errorDetail"] = error_detail
1131 self.update_db_2("nsrs", nsr_id, db_dict)
1132 except Exception as e:
1133 self.logger.warn('Error writing NS status: {}'.format(e))
1134
1135 async def instantiate(self, nsr_id, nslcmop_id):
1136 """
1137
1138 :param nsr_id: ns instance to deploy
1139 :param nslcmop_id: operation to run
1140 :return:
1141 """
1142
1143 # Try to lock HA task here
1144 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1145 if not task_is_locked_by_me:
1146 self.logger.debug('instantiate() task is not locked by me')
1147 return
1148
1149 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1150 self.logger.debug(logging_text + "Enter")
1151
1152 # get all needed from database
1153
1154 # database nsrs record
1155 db_nsr = None
1156
1157 # database nslcmops record
1158 db_nslcmop = None
1159
1160 # update operation on nsrs
1161 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1162 "_admin.current-operation": nslcmop_id,
1163 "_admin.operation-type": "instantiate"}
1164 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1165
1166 # update operation on nslcmops
1167 db_nslcmop_update = {}
1168
1169 nslcmop_operation_state = None
1170 db_vnfrs = {} # vnf's info indexed by member-index
1171 # n2vc_info = {}
1172 task_instantiation_list = []
1173 task_instantiation_info = {} # from task to info text
1174 exc = None
1175 try:
1176 # wait for any previous tasks in process
1177 step = "Waiting for previous operations to terminate"
1178 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1179
1180 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1181
1182 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1183 self._write_ns_status(
1184 nsr_id=nsr_id,
1185 ns_state="BUILDING",
1186 current_operation="INSTANTIATING",
1187 current_operation_id=nslcmop_id
1188 )
1189
1190 # read from db: operation
1191 step = "Getting nslcmop={} from db".format(nslcmop_id)
1192 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1193
1194 # read from db: ns
1195 step = "Getting nsr={} from db".format(nsr_id)
1196 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1197 # nsd is replicated into ns (no db read)
1198 nsd = db_nsr["nsd"]
1199 # nsr_name = db_nsr["name"] # TODO short-name??
1200
1201 # read from db: vnf's of this ns
1202 step = "Getting vnfrs from db"
1203 self.logger.debug(logging_text + step)
1204 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1205
1206 # read from db: vnfd's for every vnf
1207 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1208 db_vnfds = {} # every vnfd data indexed by vnf id
1209 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1210
1211 # for each vnf in ns, read vnfd
1212 for vnfr in db_vnfrs_list:
1213 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1214 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1215 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1216 # if we haven't this vnfd, read it from db
1217 if vnfd_id not in db_vnfds:
1218 # read from cb
1219 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1220 self.logger.debug(logging_text + step)
1221 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1222
1223 # store vnfd
1224 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1225 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1226 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1227
1228 # Get or generates the _admin.deployed.VCA list
1229 vca_deployed_list = None
1230 if db_nsr["_admin"].get("deployed"):
1231 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1232 if vca_deployed_list is None:
1233 vca_deployed_list = []
1234 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1235 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1236 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1237 elif isinstance(vca_deployed_list, dict):
1238 # maintain backward compatibility. Change a dict to list at database
1239 vca_deployed_list = list(vca_deployed_list.values())
1240 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1241 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1242
1243 db_nsr_update["detailed-status"] = "creating"
1244 db_nsr_update["operational-status"] = "init"
1245
1246 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1247 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1248 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1249
1250 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1251 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1252 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1253 self.logger.debug(logging_text + "Before deploy_kdus")
1254 # Call to deploy_kdus in case exists the "vdu:kdu" param
1255 task_kdu = asyncio.ensure_future(
1256 self.deploy_kdus(
1257 logging_text=logging_text,
1258 nsr_id=nsr_id,
1259 db_nsr=db_nsr,
1260 db_vnfrs=db_vnfrs,
1261 )
1262 )
1263 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1264 task_instantiation_info[task_kdu] = "Deploy KDUs"
1265 task_instantiation_list.append(task_kdu)
1266 # n2vc_redesign STEP 1 Get VCA public ssh-key
1267 # feature 1429. Add n2vc public key to needed VMs
1268 n2vc_key = self.n2vc.get_public_key()
1269 n2vc_key_list = [n2vc_key]
1270 if self.vca_config.get("public_key"):
1271 n2vc_key_list.append(self.vca_config["public_key"])
1272
1273 # n2vc_redesign STEP 2 Deploy Network Scenario
1274 task_ro = asyncio.ensure_future(
1275 self.instantiate_RO(
1276 logging_text=logging_text,
1277 nsr_id=nsr_id,
1278 nsd=nsd,
1279 db_nsr=db_nsr,
1280 db_nslcmop=db_nslcmop,
1281 db_vnfrs=db_vnfrs,
1282 db_vnfds_ref=db_vnfds_ref,
1283 n2vc_key_list=n2vc_key_list
1284 )
1285 )
1286 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1287 task_instantiation_info[task_ro] = "Deploy at VIM"
1288 task_instantiation_list.append(task_ro)
1289
1290 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1291 step = "Looking for needed vnfd to configure with proxy charm"
1292 self.logger.debug(logging_text + step)
1293
1294 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1295 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1296 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1297 vnfd_id = c_vnf["vnfd-id-ref"]
1298 vnfd = db_vnfds_ref[vnfd_id]
1299 member_vnf_index = str(c_vnf["member-vnf-index"])
1300 db_vnfr = db_vnfrs[member_vnf_index]
1301 base_folder = vnfd["_admin"]["storage"]
1302 vdu_id = None
1303 vdu_index = 0
1304 vdu_name = None
1305 kdu_name = None
1306
1307 # Get additional parameters
1308 deploy_params = {}
1309 if db_vnfr.get("additionalParamsForVnf"):
1310 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1311
1312 descriptor_config = vnfd.get("vnf-configuration")
1313 if descriptor_config and descriptor_config.get("juju"):
1314 self._deploy_n2vc(
1315 logging_text=logging_text,
1316 db_nsr=db_nsr,
1317 db_vnfr=db_vnfr,
1318 nslcmop_id=nslcmop_id,
1319 nsr_id=nsr_id,
1320 nsi_id=nsi_id,
1321 vnfd_id=vnfd_id,
1322 vdu_id=vdu_id,
1323 kdu_name=kdu_name,
1324 member_vnf_index=member_vnf_index,
1325 vdu_index=vdu_index,
1326 vdu_name=vdu_name,
1327 deploy_params=deploy_params,
1328 descriptor_config=descriptor_config,
1329 base_folder=base_folder,
1330 task_instantiation_list=task_instantiation_list,
1331 task_instantiation_info=task_instantiation_info
1332 )
1333
1334 # Deploy charms for each VDU that supports one.
1335 for vdud in get_iterable(vnfd, 'vdu'):
1336 vdu_id = vdud["id"]
1337 descriptor_config = vdud.get('vdu-configuration')
1338 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1339 if vdur.get("additionalParams"):
1340 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1341 else:
1342 deploy_params_vdu = deploy_params
1343 if descriptor_config and descriptor_config.get("juju"):
1344 # look for vdu index in the db_vnfr["vdu"] section
1345 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1346 # if vdur["vdu-id-ref"] == vdu_id:
1347 # break
1348 # else:
1349 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1350 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1351 # vdu_name = vdur.get("name")
1352 vdu_name = None
1353 kdu_name = None
1354 for vdu_index in range(int(vdud.get("count", 1))):
1355 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1356 self._deploy_n2vc(
1357 logging_text=logging_text,
1358 db_nsr=db_nsr,
1359 db_vnfr=db_vnfr,
1360 nslcmop_id=nslcmop_id,
1361 nsr_id=nsr_id,
1362 nsi_id=nsi_id,
1363 vnfd_id=vnfd_id,
1364 vdu_id=vdu_id,
1365 kdu_name=kdu_name,
1366 member_vnf_index=member_vnf_index,
1367 vdu_index=vdu_index,
1368 vdu_name=vdu_name,
1369 deploy_params=deploy_params_vdu,
1370 descriptor_config=descriptor_config,
1371 base_folder=base_folder,
1372 task_instantiation_list=task_instantiation_list,
1373 task_instantiation_info=task_instantiation_info
1374 )
1375 for kdud in get_iterable(vnfd, 'kdu'):
1376 kdu_name = kdud["name"]
1377 descriptor_config = kdud.get('kdu-configuration')
1378 if descriptor_config and descriptor_config.get("juju"):
1379 vdu_id = None
1380 vdu_index = 0
1381 vdu_name = None
1382 # look for vdu index in the db_vnfr["vdu"] section
1383 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1384 # if vdur["vdu-id-ref"] == vdu_id:
1385 # break
1386 # else:
1387 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1388 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1389 # vdu_name = vdur.get("name")
1390 # vdu_name = None
1391
1392 self._deploy_n2vc(
1393 logging_text=logging_text,
1394 db_nsr=db_nsr,
1395 db_vnfr=db_vnfr,
1396 nslcmop_id=nslcmop_id,
1397 nsr_id=nsr_id,
1398 nsi_id=nsi_id,
1399 vnfd_id=vnfd_id,
1400 vdu_id=vdu_id,
1401 kdu_name=kdu_name,
1402 member_vnf_index=member_vnf_index,
1403 vdu_index=vdu_index,
1404 vdu_name=vdu_name,
1405 deploy_params=deploy_params,
1406 descriptor_config=descriptor_config,
1407 base_folder=base_folder,
1408 task_instantiation_list=task_instantiation_list,
1409 task_instantiation_info=task_instantiation_info
1410 )
1411
1412 # Check if this NS has a charm configuration
1413 descriptor_config = nsd.get("ns-configuration")
1414 if descriptor_config and descriptor_config.get("juju"):
1415 vnfd_id = None
1416 db_vnfr = None
1417 member_vnf_index = None
1418 vdu_id = None
1419 kdu_name = None
1420 vdu_index = 0
1421 vdu_name = None
1422
1423 # Get additional parameters
1424 deploy_params = {}
1425 if db_nsr.get("additionalParamsForNs"):
1426 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1427 base_folder = nsd["_admin"]["storage"]
1428 self._deploy_n2vc(
1429 logging_text=logging_text,
1430 db_nsr=db_nsr,
1431 db_vnfr=db_vnfr,
1432 nslcmop_id=nslcmop_id,
1433 nsr_id=nsr_id,
1434 nsi_id=nsi_id,
1435 vnfd_id=vnfd_id,
1436 vdu_id=vdu_id,
1437 kdu_name=kdu_name,
1438 member_vnf_index=member_vnf_index,
1439 vdu_index=vdu_index,
1440 vdu_name=vdu_name,
1441 deploy_params=deploy_params,
1442 descriptor_config=descriptor_config,
1443 base_folder=base_folder,
1444 task_instantiation_list=task_instantiation_list,
1445 task_instantiation_info=task_instantiation_info
1446 )
1447
1448 # Wait until all tasks of "task_instantiation_list" have been finished
1449
1450 # while time() <= start_deploy + self.total_deploy_timeout:
1451 error_text_list = []
1452 timeout = 3600
1453
1454 # let's begin with all OK
1455 instantiated_ok = True
1456 # let's begin with RO 'running' status (later we can change it)
1457 db_nsr_update["operational-status"] = "running"
1458 # let's begin with VCA 'configured' status (later we can change it)
1459 db_nsr_update["config-status"] = "configured"
1460
1461 if task_instantiation_list:
1462 # wait for all tasks completion
1463 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1464
1465 for task in pending:
1466 instantiated_ok = False
1467 if task == task_ro:
1468 db_nsr_update["operational-status"] = "failed"
1469 else:
1470 db_nsr_update["config-status"] = "failed"
1471 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1472 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1473 for task in done:
1474 if task.cancelled():
1475 instantiated_ok = False
1476 if task == task_ro:
1477 db_nsr_update["operational-status"] = "failed"
1478 else:
1479 db_nsr_update["config-status"] = "failed"
1480 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1481 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1482 else:
1483 exc = task.exception()
1484 if exc:
1485 instantiated_ok = False
1486 if task == task_ro:
1487 db_nsr_update["operational-status"] = "failed"
1488 else:
1489 db_nsr_update["config-status"] = "failed"
1490 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1491 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1492 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1493 else:
1494 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1495 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1496 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1497 else:
1498 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1499
1500 if error_text_list:
1501 error_text = "\n".join(error_text_list)
1502 db_nsr_update["detailed-status"] = error_text
1503 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1504 db_nslcmop_update["detailed-status"] = error_text
1505 db_nslcmop_update["statusEnteredTime"] = time()
1506 else:
1507 # all is done
1508 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1509 db_nslcmop_update["statusEnteredTime"] = time()
1510 db_nslcmop_update["detailed-status"] = "done"
1511 db_nsr_update["detailed-status"] = "done"
1512
1513 except (ROclient.ROClientException, DbException, LcmException) as e:
1514 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1515 exc = e
1516 except asyncio.CancelledError:
1517 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1518 exc = "Operation was cancelled"
1519 except Exception as e:
1520 exc = traceback.format_exc()
1521 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1522 exc_info=True)
1523 finally:
1524 if exc:
1525 if db_nsr:
1526 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1527 db_nsr_update["operational-status"] = "failed"
1528 db_nsr_update["config-status"] = "failed"
1529 if db_nslcmop:
1530 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1531 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1532 db_nslcmop_update["statusEnteredTime"] = time()
1533 try:
1534 if db_nsr:
1535 db_nsr_update["_admin.nslcmop"] = None
1536 db_nsr_update["_admin.current-operation"] = None
1537 db_nsr_update["_admin.operation-type"] = None
1538 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1539
1540 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1541 ns_state = None
1542 error_description = None
1543 error_detail = None
1544 if instantiated_ok:
1545 ns_state = "READY"
1546 else:
1547 ns_state = "BROKEN"
1548 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1549 error_detail = error_text
1550 self._write_ns_status(
1551 nsr_id=nsr_id,
1552 ns_state=ns_state,
1553 current_operation="IDLE",
1554 current_operation_id=None,
1555 error_description=error_description,
1556 error_detail=error_detail
1557 )
1558
1559 if db_nslcmop_update:
1560 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1561 except DbException as e:
1562 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1563 if nslcmop_operation_state:
1564 try:
1565 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1566 "operationState": nslcmop_operation_state},
1567 loop=self.loop)
1568 except Exception as e:
1569 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1570
1571 self.logger.debug(logging_text + "Exit")
1572 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1573
1574 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1575 # Launch kdus if present in the descriptor
1576
1577 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1578
1579 def _get_cluster_id(cluster_id, cluster_type):
1580 nonlocal k8scluster_id_2_uuic
1581 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1582 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1583
1584 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1585 if not db_k8scluster:
1586 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1587 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1588 if not k8s_id:
1589 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1590 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1591 return k8s_id
1592
1593 logging_text += "Deploy kdus: "
1594 try:
1595 db_nsr_update = {"_admin.deployed.K8s": []}
1596 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1597
1598 # Look for all vnfds
1599 pending_tasks = {}
1600 index = 0
1601 for vnfr_data in db_vnfrs.values():
1602 for kdur in get_iterable(vnfr_data, "kdur"):
1603 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1604 kdumodel = None
1605 k8sclustertype = None
1606 error_text = None
1607 cluster_uuid = None
1608 if kdur.get("helm-chart"):
1609 kdumodel = kdur["helm-chart"]
1610 k8sclustertype = "chart"
1611 k8sclustertype_full = "helm-chart"
1612 elif kdur.get("juju-bundle"):
1613 kdumodel = kdur["juju-bundle"]
1614 k8sclustertype = "juju"
1615 k8sclustertype_full = "juju-bundle"
1616 else:
1617 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1618 " running"
1619 try:
1620 if not error_text:
1621 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1622 except LcmException as e:
1623 error_text = str(e)
1624 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1625
1626 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1627 "k8scluster-type": k8sclustertype,
1628 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1629 if error_text:
1630 k8s_instace_info["detailed-status"] = error_text
1631 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1632 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1633 if error_text:
1634 continue
1635
1636 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1637 "{}".format(index)}
1638 if k8sclustertype == "chart":
1639 task = asyncio.ensure_future(
1640 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1641 params=desc_params, db_dict=db_dict, timeout=3600)
1642 )
1643 else:
1644 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1645 atomic=True, params=desc_params,
1646 db_dict=db_dict, timeout=600)
1647
1648 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1649 index += 1
1650 if not pending_tasks:
1651 return
1652 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1653 pending_list = list(pending_tasks.keys())
1654 while pending_list:
1655 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1656 return_when=asyncio.FIRST_COMPLETED)
1657 if not done_list: # timeout
1658 for task in pending_list:
1659 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1660 break
1661 for task in done_list:
1662 exc = task.exception()
1663 if exc:
1664 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1665 else:
1666 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1667
1668 except Exception as e:
1669 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1670 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1671 finally:
1672 # TODO Write in data base
1673 if db_nsr_update:
1674 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1675
1676 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1677 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1678 base_folder, task_instantiation_list, task_instantiation_info):
1679 # launch instantiate_N2VC in a asyncio task and register task object
1680 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1681 # if not found, create one entry and update database
1682
1683 # fill db_nsr._admin.deployed.VCA.<index>
1684 vca_index = -1
1685 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1686 if not vca_deployed:
1687 continue
1688 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1689 vca_deployed.get("vdu_id") == vdu_id and \
1690 vca_deployed.get("kdu_name") == kdu_name and \
1691 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1692 break
1693 else:
1694 # not found, create one.
1695 vca_deployed = {
1696 "member-vnf-index": member_vnf_index,
1697 "vdu_id": vdu_id,
1698 "kdu_name": kdu_name,
1699 "vdu_count_index": vdu_index,
1700 "operational-status": "init", # TODO revise
1701 "detailed-status": "", # TODO revise
1702 "step": "initial-deploy", # TODO revise
1703 "vnfd_id": vnfd_id,
1704 "vdu_name": vdu_name,
1705 }
1706 vca_index += 1
1707 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1708 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1709
1710 # Launch task
1711 task_n2vc = asyncio.ensure_future(
1712 self.instantiate_N2VC(
1713 logging_text=logging_text,
1714 vca_index=vca_index,
1715 nsi_id=nsi_id,
1716 db_nsr=db_nsr,
1717 db_vnfr=db_vnfr,
1718 vdu_id=vdu_id,
1719 kdu_name=kdu_name,
1720 vdu_index=vdu_index,
1721 deploy_params=deploy_params,
1722 config_descriptor=descriptor_config,
1723 base_folder=base_folder,
1724 )
1725 )
1726 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1727 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1728 task_instantiation_list.append(task_n2vc)
1729
1730 # Check if this VNFD has a configured terminate action
1731 def _has_terminate_config_primitive(self, vnfd):
1732 vnf_config = vnfd.get("vnf-configuration")
1733 if vnf_config and vnf_config.get("terminate-config-primitive"):
1734 return True
1735 else:
1736 return False
1737
1738 @staticmethod
1739 def _get_terminate_config_primitive_seq_list(vnfd):
1740 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1741 # No need to check for existing primitive twice, already done before
1742 vnf_config = vnfd.get("vnf-configuration")
1743 seq_list = vnf_config.get("terminate-config-primitive")
1744 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1745 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1746 return seq_list_sorted
1747
1748 @staticmethod
1749 def _create_nslcmop(nsr_id, operation, params):
1750 """
1751 Creates a ns-lcm-opp content to be stored at database.
1752 :param nsr_id: internal id of the instance
1753 :param operation: instantiate, terminate, scale, action, ...
1754 :param params: user parameters for the operation
1755 :return: dictionary following SOL005 format
1756 """
1757 # Raise exception if invalid arguments
1758 if not (nsr_id and operation and params):
1759 raise LcmException(
1760 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1761 now = time()
1762 _id = str(uuid4())
1763 nslcmop = {
1764 "id": _id,
1765 "_id": _id,
1766 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1767 "operationState": "PROCESSING",
1768 "statusEnteredTime": now,
1769 "nsInstanceId": nsr_id,
1770 "lcmOperationType": operation,
1771 "startTime": now,
1772 "isAutomaticInvocation": False,
1773 "operationParams": params,
1774 "isCancelPending": False,
1775 "links": {
1776 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1777 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1778 }
1779 }
1780 return nslcmop
1781
1782 def _format_additional_params(self, params):
1783 params = params or {}
1784 for key, value in params.items():
1785 if str(value).startswith("!!yaml "):
1786 params[key] = yaml.safe_load(value[7:])
1787 return params
1788
1789 def _get_terminate_primitive_params(self, seq, vnf_index):
1790 primitive = seq.get('name')
1791 primitive_params = {}
1792 params = {
1793 "member_vnf_index": vnf_index,
1794 "primitive": primitive,
1795 "primitive_params": primitive_params,
1796 }
1797 desc_params = {}
1798 return self._map_primitive_params(seq, params, desc_params)
1799
1800 # sub-operations
1801
1802 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1803 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1804 if (op.get('operationState') == 'COMPLETED'):
1805 # b. Skip sub-operation
1806 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1807 return self.SUBOPERATION_STATUS_SKIP
1808 else:
1809 # c. Reintent executing sub-operation
1810 # The sub-operation exists, and operationState != 'COMPLETED'
1811 # Update operationState = 'PROCESSING' to indicate a reintent.
1812 operationState = 'PROCESSING'
1813 detailed_status = 'In progress'
1814 self._update_suboperation_status(
1815 db_nslcmop, op_index, operationState, detailed_status)
1816 # Return the sub-operation index
1817 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1818 # with arguments extracted from the sub-operation
1819 return op_index
1820
1821 # Find a sub-operation where all keys in a matching dictionary must match
1822 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1823 def _find_suboperation(self, db_nslcmop, match):
1824 if (db_nslcmop and match):
1825 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1826 for i, op in enumerate(op_list):
1827 if all(op.get(k) == match[k] for k in match):
1828 return i
1829 return self.SUBOPERATION_STATUS_NOT_FOUND
1830
1831 # Update status for a sub-operation given its index
1832 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1833 # Update DB for HA tasks
1834 q_filter = {'_id': db_nslcmop['_id']}
1835 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1836 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1837 self.db.set_one("nslcmops",
1838 q_filter=q_filter,
1839 update_dict=update_dict,
1840 fail_on_empty=False)
1841
1842 # Add sub-operation, return the index of the added sub-operation
1843 # Optionally, set operationState, detailed-status, and operationType
1844 # Status and type are currently set for 'scale' sub-operations:
1845 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1846 # 'detailed-status' : status message
1847 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1848 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1849 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1850 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1851 RO_nsr_id=None, RO_scaling_info=None):
1852 if not (db_nslcmop):
1853 return self.SUBOPERATION_STATUS_NOT_FOUND
1854 # Get the "_admin.operations" list, if it exists
1855 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1856 op_list = db_nslcmop_admin.get('operations')
1857 # Create or append to the "_admin.operations" list
1858 new_op = {'member_vnf_index': vnf_index,
1859 'vdu_id': vdu_id,
1860 'vdu_count_index': vdu_count_index,
1861 'primitive': primitive,
1862 'primitive_params': mapped_primitive_params}
1863 if operationState:
1864 new_op['operationState'] = operationState
1865 if detailed_status:
1866 new_op['detailed-status'] = detailed_status
1867 if operationType:
1868 new_op['lcmOperationType'] = operationType
1869 if RO_nsr_id:
1870 new_op['RO_nsr_id'] = RO_nsr_id
1871 if RO_scaling_info:
1872 new_op['RO_scaling_info'] = RO_scaling_info
1873 if not op_list:
1874 # No existing operations, create key 'operations' with current operation as first list element
1875 db_nslcmop_admin.update({'operations': [new_op]})
1876 op_list = db_nslcmop_admin.get('operations')
1877 else:
1878 # Existing operations, append operation to list
1879 op_list.append(new_op)
1880
1881 db_nslcmop_update = {'_admin.operations': op_list}
1882 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1883 op_index = len(op_list) - 1
1884 return op_index
1885
1886 # Helper methods for scale() sub-operations
1887
1888 # pre-scale/post-scale:
1889 # Check for 3 different cases:
1890 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1891 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1892 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1893 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1894 operationType, RO_nsr_id=None, RO_scaling_info=None):
1895 # Find this sub-operation
1896 if (RO_nsr_id and RO_scaling_info):
1897 operationType = 'SCALE-RO'
1898 match = {
1899 'member_vnf_index': vnf_index,
1900 'RO_nsr_id': RO_nsr_id,
1901 'RO_scaling_info': RO_scaling_info,
1902 }
1903 else:
1904 match = {
1905 'member_vnf_index': vnf_index,
1906 'primitive': vnf_config_primitive,
1907 'primitive_params': primitive_params,
1908 'lcmOperationType': operationType
1909 }
1910 op_index = self._find_suboperation(db_nslcmop, match)
1911 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1912 # a. New sub-operation
1913 # The sub-operation does not exist, add it.
1914 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1915 # The following parameters are set to None for all kind of scaling:
1916 vdu_id = None
1917 vdu_count_index = None
1918 vdu_name = None
1919 if (RO_nsr_id and RO_scaling_info):
1920 vnf_config_primitive = None
1921 primitive_params = None
1922 else:
1923 RO_nsr_id = None
1924 RO_scaling_info = None
1925 # Initial status for sub-operation
1926 operationState = 'PROCESSING'
1927 detailed_status = 'In progress'
1928 # Add sub-operation for pre/post-scaling (zero or more operations)
1929 self._add_suboperation(db_nslcmop,
1930 vnf_index,
1931 vdu_id,
1932 vdu_count_index,
1933 vdu_name,
1934 vnf_config_primitive,
1935 primitive_params,
1936 operationState,
1937 detailed_status,
1938 operationType,
1939 RO_nsr_id,
1940 RO_scaling_info)
1941 return self.SUBOPERATION_STATUS_NEW
1942 else:
1943 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1944 # or op_index (operationState != 'COMPLETED')
1945 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1946
1947 # Helper methods for terminate()
1948
1949 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1950 """ Create a primitive with params from VNFD
1951 Called from terminate() before deleting instance
1952 Calls action() to execute the primitive """
1953 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1954 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1955 db_vnfds = {}
1956 # Loop over VNFRs
1957 for vnfr in db_vnfrs_list:
1958 vnfd_id = vnfr["vnfd-id"]
1959 vnf_index = vnfr["member-vnf-index-ref"]
1960 if vnfd_id not in db_vnfds:
1961 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1962 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1963 db_vnfds[vnfd_id] = vnfd
1964 vnfd = db_vnfds[vnfd_id]
1965 if not self._has_terminate_config_primitive(vnfd):
1966 continue
1967 # Get the primitive's sorted sequence list
1968 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
1969 for seq in seq_list:
1970 # For each sequence in list, get primitive and call _ns_execute_primitive()
1971 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
1972 vnf_index, seq.get("name"))
1973 self.logger.debug(logging_text + step)
1974 # Create the primitive for each sequence, i.e. "primitive": "touch"
1975 primitive = seq.get('name')
1976 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
1977 # The following 3 parameters are currently set to None for 'terminate':
1978 # vdu_id, vdu_count_index, vdu_name
1979 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1980 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1981 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1982 # Add sub-operation
1983 self._add_suboperation(db_nslcmop,
1984 nslcmop_id,
1985 vnf_index,
1986 vdu_id,
1987 vdu_count_index,
1988 vdu_name,
1989 primitive,
1990 mapped_primitive_params)
1991 # Sub-operations: Call _ns_execute_primitive() instead of action()
1992 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1993 # nsr_deployed = db_nsr["_admin"]["deployed"]
1994
1995 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
1996 # nsr_id, nslcmop_terminate_action_id)
1997 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
1998 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
1999 # if result not in result_ok:
2000 # raise LcmException(
2001 # "terminate_primitive_action for vnf_member_index={}",
2002 # " primitive={} fails with error {}".format(
2003 # vnf_index, seq.get("name"), result_detail))
2004
2005 # TODO: find ee_id
2006 ee_id = None
2007 try:
2008 await self.n2vc.exec_primitive(
2009 ee_id=ee_id,
2010 primitive_name=primitive,
2011 params_dict=mapped_primitive_params
2012 )
2013 except Exception as e:
2014 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2015 raise LcmException(
2016 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2017 .format(vnf_index, seq.get("name"), e),
2018 )
2019
2020 async def terminate(self, nsr_id, nslcmop_id):
2021
2022 # Try to lock HA task here
2023 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2024 if not task_is_locked_by_me:
2025 return
2026
2027 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2028 self.logger.debug(logging_text + "Enter")
2029 db_nsr = None
2030 db_nslcmop = None
2031 exc = None
2032 failed_detail = [] # annotates all failed error messages
2033 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2034 "_admin.current-operation": nslcmop_id,
2035 "_admin.operation-type": "terminate"}
2036 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2037 db_nslcmop_update = {}
2038 nslcmop_operation_state = None
2039 autoremove = False # autoremove after terminated
2040 pending_tasks = []
2041 try:
2042 # wait for any previous tasks in process
2043 step = "Waiting for previous operations to terminate"
2044 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2045
2046 self._write_ns_status(
2047 nsr_id=nsr_id,
2048 ns_state="TERMINATING",
2049 current_operation="TERMINATING",
2050 current_operation_id=nslcmop_id
2051 )
2052
2053 step = "Getting nslcmop={} from db".format(nslcmop_id)
2054 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2055 step = "Getting nsr={} from db".format(nsr_id)
2056 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2057 # nsd = db_nsr["nsd"]
2058 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2059 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2060 return
2061 # #TODO check if VIM is creating and wait
2062 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2063 # Call internal terminate action
2064 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2065
2066 pending_tasks = []
2067
2068 db_nsr_update["operational-status"] = "terminating"
2069 db_nsr_update["config-status"] = "terminating"
2070
2071 # remove NS
2072 try:
2073 step = "delete execution environment"
2074 self.logger.debug(logging_text + step)
2075
2076 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2077 pending_tasks.append(task_delete_ee)
2078 except Exception as e:
2079 msg = "Failed while deleting NS in VCA: {}".format(e)
2080 self.logger.error(msg)
2081 failed_detail.append(msg)
2082
2083 try:
2084 # Delete from k8scluster
2085 step = "delete kdus"
2086 self.logger.debug(logging_text + step)
2087 # print(nsr_deployed)
2088 if nsr_deployed:
2089 for kdu in nsr_deployed.get("K8s", ()):
2090 kdu_instance = kdu.get("kdu-instance")
2091 if not kdu_instance:
2092 continue
2093 if kdu.get("k8scluster-type") == "chart":
2094 task_delete_kdu_instance = asyncio.ensure_future(
2095 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2096 kdu_instance=kdu_instance))
2097 elif kdu.get("k8scluster-type") == "juju":
2098 task_delete_kdu_instance = asyncio.ensure_future(
2099 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2100 kdu_instance=kdu_instance))
2101 else:
2102 self.error(logging_text + "Unknown k8s deployment type {}".
2103 format(kdu.get("k8scluster-type")))
2104 continue
2105 pending_tasks.append(task_delete_kdu_instance)
2106 except LcmException as e:
2107 msg = "Failed while deleting KDUs from NS: {}".format(e)
2108 self.logger.error(msg)
2109 failed_detail.append(msg)
2110
2111 # remove from RO
2112 RO_fail = False
2113
2114 # Delete ns
2115 RO_nsr_id = RO_delete_action = None
2116 if nsr_deployed and nsr_deployed.get("RO"):
2117 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2118 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2119 try:
2120 if RO_nsr_id:
2121 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2122 "Deleting ns from VIM"
2123 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2124 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2125 self.logger.debug(logging_text + step)
2126 desc = await self.RO.delete("ns", RO_nsr_id)
2127 RO_delete_action = desc["action_id"]
2128 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2129 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2130 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2131 if RO_delete_action:
2132 # wait until NS is deleted from VIM
2133 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2134 format(RO_nsr_id, RO_delete_action)
2135 detailed_status_old = None
2136 self.logger.debug(logging_text + step)
2137
2138 delete_timeout = 20 * 60 # 20 minutes
2139 while delete_timeout > 0:
2140 desc = await self.RO.show(
2141 "ns",
2142 item_id_name=RO_nsr_id,
2143 extra_item="action",
2144 extra_item_id=RO_delete_action)
2145 ns_status, ns_status_info = self.RO.check_action_status(desc)
2146 if ns_status == "ERROR":
2147 raise ROclient.ROClientException(ns_status_info)
2148 elif ns_status == "BUILD":
2149 detailed_status = step + "; {}".format(ns_status_info)
2150 elif ns_status == "ACTIVE":
2151 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2152 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2153 break
2154 else:
2155 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2156 if detailed_status != detailed_status_old:
2157 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2158 db_nsr_update["detailed-status"] = detailed_status
2159 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2161 await asyncio.sleep(5, loop=self.loop)
2162 delete_timeout -= 5
2163 else: # delete_timeout <= 0:
2164 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2165
2166 except ROclient.ROClientException as e:
2167 if e.http_code == 404: # not found
2168 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2169 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2170 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2171 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2172 elif e.http_code == 409: # conflict
2173 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2174 self.logger.debug(logging_text + failed_detail[-1])
2175 RO_fail = True
2176 else:
2177 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2178 self.logger.error(logging_text + failed_detail[-1])
2179 RO_fail = True
2180
2181 # Delete nsd
2182 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2183 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2184 try:
2185 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2186 "Deleting nsd from RO"
2187 await self.RO.delete("nsd", RO_nsd_id)
2188 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2189 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2190 except ROclient.ROClientException as e:
2191 if e.http_code == 404: # not found
2192 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2193 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2194 elif e.http_code == 409: # conflict
2195 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2196 self.logger.debug(logging_text + failed_detail[-1])
2197 RO_fail = True
2198 else:
2199 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2200 self.logger.error(logging_text + failed_detail[-1])
2201 RO_fail = True
2202
2203 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2204 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2205 if not vnf_deployed or not vnf_deployed["id"]:
2206 continue
2207 try:
2208 RO_vnfd_id = vnf_deployed["id"]
2209 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2210 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2211 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2212 await self.RO.delete("vnfd", RO_vnfd_id)
2213 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2214 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2215 except ROclient.ROClientException as e:
2216 if e.http_code == 404: # not found
2217 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2218 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2219 elif e.http_code == 409: # conflict
2220 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2221 self.logger.debug(logging_text + failed_detail[-1])
2222 else:
2223 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2224 self.logger.error(logging_text + failed_detail[-1])
2225
2226 if failed_detail:
2227 terminate_ok = False
2228 self.logger.error(logging_text + " ;".join(failed_detail))
2229 db_nsr_update["operational-status"] = "failed"
2230 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2231 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2232 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2233 db_nslcmop_update["statusEnteredTime"] = time()
2234 else:
2235 terminate_ok = True
2236 db_nsr_update["operational-status"] = "terminated"
2237 db_nsr_update["detailed-status"] = "Done"
2238 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2239 db_nslcmop_update["detailed-status"] = "Done"
2240 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2241 db_nslcmop_update["statusEnteredTime"] = time()
2242 if db_nslcmop["operationParams"].get("autoremove"):
2243 autoremove = True
2244
2245 except (ROclient.ROClientException, DbException, LcmException) as e:
2246 self.logger.error(logging_text + "Exit Exception {}".format(e))
2247 exc = e
2248 except asyncio.CancelledError:
2249 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2250 exc = "Operation was cancelled"
2251 except Exception as e:
2252 exc = traceback.format_exc()
2253 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2254 finally:
2255 if exc and db_nslcmop:
2256 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2257 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2258 db_nslcmop_update["statusEnteredTime"] = time()
2259 try:
2260 if db_nslcmop and db_nslcmop_update:
2261 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2262 if db_nsr:
2263 db_nsr_update["_admin.nslcmop"] = None
2264 db_nsr_update["_admin.current-operation"] = None
2265 db_nsr_update["_admin.operation-type"] = None
2266 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2267
2268 if terminate_ok:
2269 ns_state = "IDLE"
2270 error_description = None
2271 error_detail = None
2272 else:
2273 ns_state = "BROKEN"
2274 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2275 error_detail = "; ".join(failed_detail)
2276
2277 self._write_ns_status(
2278 nsr_id=nsr_id,
2279 ns_state=ns_state,
2280 current_operation="IDLE",
2281 current_operation_id=None,
2282 error_description=error_description,
2283 error_detail=error_detail
2284 )
2285
2286 except DbException as e:
2287 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2288 if nslcmop_operation_state:
2289 try:
2290 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2291 "operationState": nslcmop_operation_state,
2292 "autoremove": autoremove},
2293 loop=self.loop)
2294 except Exception as e:
2295 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2296
2297 # wait for pending tasks
2298 done = None
2299 pending = None
2300 if pending_tasks:
2301 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2302 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2303 if not pending:
2304 self.logger.debug(logging_text + 'All tasks finished...')
2305 else:
2306 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2307
2308 self.logger.debug(logging_text + "Exit")
2309 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2310
2311 @staticmethod
2312 def _map_primitive_params(primitive_desc, params, instantiation_params):
2313 """
2314 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2315 The default-value is used. If it is between < > it look for a value at instantiation_params
2316 :param primitive_desc: portion of VNFD/NSD that describes primitive
2317 :param params: Params provided by user
2318 :param instantiation_params: Instantiation params provided by user
2319 :return: a dictionary with the calculated params
2320 """
2321 calculated_params = {}
2322 for parameter in primitive_desc.get("parameter", ()):
2323 param_name = parameter["name"]
2324 if param_name in params:
2325 calculated_params[param_name] = params[param_name]
2326 elif "default-value" in parameter or "value" in parameter:
2327 if "value" in parameter:
2328 calculated_params[param_name] = parameter["value"]
2329 else:
2330 calculated_params[param_name] = parameter["default-value"]
2331 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2332 and calculated_params[param_name].endswith(">"):
2333 if calculated_params[param_name][1:-1] in instantiation_params:
2334 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2335 else:
2336 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2337 format(calculated_params[param_name], primitive_desc["name"]))
2338 else:
2339 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2340 format(param_name, primitive_desc["name"]))
2341
2342 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2343 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2344 width=256)
2345 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2346 calculated_params[param_name] = calculated_params[param_name][7:]
2347
2348 # add always ns_config_info if primitive name is config
2349 if primitive_desc["name"] == "config":
2350 if "ns_config_info" in instantiation_params:
2351 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2352 return calculated_params
2353
2354 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2355 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2356
2357 # find vca_deployed record for this action
2358 try:
2359 for vca_deployed in db_deployed["VCA"]:
2360 if not vca_deployed:
2361 continue
2362 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2363 continue
2364 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2365 continue
2366 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2367 continue
2368 break
2369 else:
2370 # vca_deployed not found
2371 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2372 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2373
2374 # get ee_id
2375 ee_id = vca_deployed.get("ee_id")
2376 if not ee_id:
2377 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2378 "execution environment"
2379 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2380
2381 if primitive == "config":
2382 primitive_params = {"params": primitive_params}
2383
2384 while retries >= 0:
2385 try:
2386 output = await self.n2vc.exec_primitive(
2387 ee_id=ee_id,
2388 primitive_name=primitive,
2389 params_dict=primitive_params
2390 )
2391 # execution was OK
2392 break
2393 except Exception as e:
2394 retries -= 1
2395 if retries >= 0:
2396 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2397 # wait and retry
2398 await asyncio.sleep(retries_interval, loop=self.loop)
2399 else:
2400 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2401
2402 return output, 'OK'
2403
2404 except Exception as e:
2405 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2406
2407 async def action(self, nsr_id, nslcmop_id):
2408
2409 # Try to lock HA task here
2410 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2411 if not task_is_locked_by_me:
2412 return
2413
2414 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2415 self.logger.debug(logging_text + "Enter")
2416 # get all needed from database
2417 db_nsr = None
2418 db_nslcmop = None
2419 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2420 "_admin.current-operation": nslcmop_id,
2421 "_admin.operation-type": "action"}
2422 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2423 db_nslcmop_update = {}
2424 nslcmop_operation_state = None
2425 nslcmop_operation_state_detail = None
2426 exc = None
2427 try:
2428 # wait for any previous tasks in process
2429 step = "Waiting for previous operations to terminate"
2430 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2431
2432 self._write_ns_status(
2433 nsr_id=nsr_id,
2434 ns_state=None,
2435 current_operation="RUNNING ACTION",
2436 current_operation_id=nslcmop_id
2437 )
2438
2439 step = "Getting information from database"
2440 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2441 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2442
2443 nsr_deployed = db_nsr["_admin"].get("deployed")
2444 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2445 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2446 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2447 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2448 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2449
2450 if vnf_index:
2451 step = "Getting vnfr from database"
2452 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2453 step = "Getting vnfd from database"
2454 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2455 else:
2456 if db_nsr.get("nsd"):
2457 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2458 else:
2459 step = "Getting nsd from database"
2460 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2461
2462 # for backward compatibility
2463 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2464 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2465 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2466 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2467
2468 primitive = db_nslcmop["operationParams"]["primitive"]
2469 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2470
2471 # look for primitive
2472 config_primitive_desc = None
2473 if vdu_id:
2474 for vdu in get_iterable(db_vnfd, "vdu"):
2475 if vdu_id == vdu["id"]:
2476 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2477 if config_primitive["name"] == primitive:
2478 config_primitive_desc = config_primitive
2479 break
2480 elif kdu_name:
2481 self.logger.debug(logging_text + "Checking actions in KDUs")
2482 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2483 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2484 if primitive_params:
2485 desc_params.update(primitive_params)
2486 # TODO Check if we will need something at vnf level
2487 index = 0
2488 for kdu in get_iterable(nsr_deployed, "K8s"):
2489 if kdu_name == kdu["kdu-name"]:
2490 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2491 "path": "_admin.deployed.K8s.{}".format(index)}
2492 if primitive == "upgrade":
2493 if desc_params.get("kdu_model"):
2494 kdu_model = desc_params.get("kdu_model")
2495 del desc_params["kdu_model"]
2496 else:
2497 kdu_model = kdu.get("kdu-model")
2498 parts = kdu_model.split(sep=":")
2499 if len(parts) == 2:
2500 kdu_model = parts[0]
2501
2502 if kdu.get("k8scluster-type") == "chart":
2503 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2504 kdu_instance=kdu.get("kdu-instance"),
2505 atomic=True, kdu_model=kdu_model,
2506 params=desc_params, db_dict=db_dict,
2507 timeout=300)
2508 elif kdu.get("k8scluster-type") == "juju":
2509 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2510 kdu_instance=kdu.get("kdu-instance"),
2511 atomic=True, kdu_model=kdu_model,
2512 params=desc_params, db_dict=db_dict,
2513 timeout=300)
2514
2515 else:
2516 msg = "k8scluster-type not defined"
2517 raise LcmException(msg)
2518
2519 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2520 break
2521 elif primitive == "rollback":
2522 if kdu.get("k8scluster-type") == "chart":
2523 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2524 kdu_instance=kdu.get("kdu-instance"),
2525 db_dict=db_dict)
2526 elif kdu.get("k8scluster-type") == "juju":
2527 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2528 kdu_instance=kdu.get("kdu-instance"),
2529 db_dict=db_dict)
2530 else:
2531 msg = "k8scluster-type not defined"
2532 raise LcmException(msg)
2533 break
2534 elif primitive == "status":
2535 if kdu.get("k8scluster-type") == "chart":
2536 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2537 kdu_instance=kdu.get("kdu-instance"))
2538 elif kdu.get("k8scluster-type") == "juju":
2539 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2540 kdu_instance=kdu.get("kdu-instance"))
2541 else:
2542 msg = "k8scluster-type not defined"
2543 raise LcmException(msg)
2544 break
2545 index += 1
2546
2547 else:
2548 raise LcmException("KDU '{}' not found".format(kdu_name))
2549 if output:
2550 db_nslcmop_update["detailed-status"] = output
2551 db_nslcmop_update["operationState"] = 'COMPLETED'
2552 db_nslcmop_update["statusEnteredTime"] = time()
2553 else:
2554 db_nslcmop_update["detailed-status"] = ''
2555 db_nslcmop_update["operationState"] = 'FAILED'
2556 db_nslcmop_update["statusEnteredTime"] = time()
2557 return
2558 elif vnf_index:
2559 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2560 if config_primitive["name"] == primitive:
2561 config_primitive_desc = config_primitive
2562 break
2563 else:
2564 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2565 if config_primitive["name"] == primitive:
2566 config_primitive_desc = config_primitive
2567 break
2568
2569 if not config_primitive_desc:
2570 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2571 format(primitive))
2572
2573 desc_params = {}
2574 if vnf_index:
2575 if db_vnfr.get("additionalParamsForVnf"):
2576 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2577 if vdu_id:
2578 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2579 if vdur.get("additionalParams"):
2580 desc_params = self._format_additional_params(vdur["additionalParams"])
2581 else:
2582 if db_nsr.get("additionalParamsForNs"):
2583 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2584
2585 # TODO check if ns is in a proper status
2586 output, detail = await self._ns_execute_primitive(
2587 db_deployed=nsr_deployed,
2588 member_vnf_index=vnf_index,
2589 vdu_id=vdu_id,
2590 vdu_name=vdu_name,
2591 vdu_count_index=vdu_count_index,
2592 primitive=primitive,
2593 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2594
2595 detailed_status = output
2596 if detail == 'OK':
2597 result = 'COMPLETED'
2598 else:
2599 result = 'FAILED'
2600
2601 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2602 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2603 db_nslcmop_update["statusEnteredTime"] = time()
2604 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2605 return # database update is called inside finally
2606
2607 except (DbException, LcmException) as e:
2608 self.logger.error(logging_text + "Exit Exception {}".format(e))
2609 exc = e
2610 except asyncio.CancelledError:
2611 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2612 exc = "Operation was cancelled"
2613 except Exception as e:
2614 exc = traceback.format_exc()
2615 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2616 finally:
2617 if exc and db_nslcmop:
2618 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2619 "FAILED {}: {}".format(step, exc)
2620 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2621 db_nslcmop_update["statusEnteredTime"] = time()
2622 try:
2623 if db_nslcmop_update:
2624 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2625 if db_nsr:
2626 db_nsr_update["_admin.nslcmop"] = None
2627 db_nsr_update["_admin.operation-type"] = None
2628 db_nsr_update["_admin.nslcmop"] = None
2629 db_nsr_update["_admin.current-operation"] = None
2630 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2631 self._write_ns_status(
2632 nsr_id=nsr_id,
2633 ns_state=None,
2634 current_operation="IDLE",
2635 current_operation_id=None
2636 )
2637 except DbException as e:
2638 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2639 self.logger.debug(logging_text + "Exit")
2640 if nslcmop_operation_state:
2641 try:
2642 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2643 "operationState": nslcmop_operation_state},
2644 loop=self.loop)
2645 except Exception as e:
2646 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2647 self.logger.debug(logging_text + "Exit")
2648 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2649 return nslcmop_operation_state, nslcmop_operation_state_detail
2650
2651 async def scale(self, nsr_id, nslcmop_id):
2652
2653 # Try to lock HA task here
2654 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2655 if not task_is_locked_by_me:
2656 return
2657
2658 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2659 self.logger.debug(logging_text + "Enter")
2660 # get all needed from database
2661 db_nsr = None
2662 db_nslcmop = None
2663 db_nslcmop_update = {}
2664 nslcmop_operation_state = None
2665 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2666 "_admin.current-operation": nslcmop_id,
2667 "_admin.operation-type": "scale"}
2668 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2669 exc = None
2670 # in case of error, indicates what part of scale was failed to put nsr at error status
2671 scale_process = None
2672 old_operational_status = ""
2673 old_config_status = ""
2674 vnfr_scaled = False
2675 try:
2676 # wait for any previous tasks in process
2677 step = "Waiting for previous operations to terminate"
2678 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2679
2680 self._write_ns_status(
2681 nsr_id=nsr_id,
2682 ns_state=None,
2683 current_operation="SCALING",
2684 current_operation_id=nslcmop_id
2685 )
2686
2687 step = "Getting nslcmop from database"
2688 self.logger.debug(step + " after having waited for previous tasks to be completed")
2689 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2690 step = "Getting nsr from database"
2691 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2692
2693 old_operational_status = db_nsr["operational-status"]
2694 old_config_status = db_nsr["config-status"]
2695 step = "Parsing scaling parameters"
2696 # self.logger.debug(step)
2697 db_nsr_update["operational-status"] = "scaling"
2698 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2699 nsr_deployed = db_nsr["_admin"].get("deployed")
2700
2701 #######
2702 nsr_deployed = db_nsr["_admin"].get("deployed")
2703 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2704 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2705 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2706 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2707 #######
2708
2709 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2710 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2711 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2712 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2713 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2714
2715 # for backward compatibility
2716 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2717 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2718 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2719 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2720
2721 step = "Getting vnfr from database"
2722 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2723 step = "Getting vnfd from database"
2724 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2725
2726 step = "Getting scaling-group-descriptor"
2727 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2728 if scaling_descriptor["name"] == scaling_group:
2729 break
2730 else:
2731 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2732 "at vnfd:scaling-group-descriptor".format(scaling_group))
2733
2734 # cooldown_time = 0
2735 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2736 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2737 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2738 # break
2739
2740 # TODO check if ns is in a proper status
2741 step = "Sending scale order to VIM"
2742 nb_scale_op = 0
2743 if not db_nsr["_admin"].get("scaling-group"):
2744 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2745 admin_scale_index = 0
2746 else:
2747 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2748 if admin_scale_info["name"] == scaling_group:
2749 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2750 break
2751 else: # not found, set index one plus last element and add new entry with the name
2752 admin_scale_index += 1
2753 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2754 RO_scaling_info = []
2755 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2756 if scaling_type == "SCALE_OUT":
2757 # count if max-instance-count is reached
2758 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2759 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2760 if nb_scale_op >= max_instance_count:
2761 raise LcmException("reached the limit of {} (max-instance-count) "
2762 "scaling-out operations for the "
2763 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2764
2765 nb_scale_op += 1
2766 vdu_scaling_info["scaling_direction"] = "OUT"
2767 vdu_scaling_info["vdu-create"] = {}
2768 for vdu_scale_info in scaling_descriptor["vdu"]:
2769 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2770 "type": "create", "count": vdu_scale_info.get("count", 1)})
2771 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2772
2773 elif scaling_type == "SCALE_IN":
2774 # count if min-instance-count is reached
2775 min_instance_count = 0
2776 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2777 min_instance_count = int(scaling_descriptor["min-instance-count"])
2778 if nb_scale_op <= min_instance_count:
2779 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2780 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2781 nb_scale_op -= 1
2782 vdu_scaling_info["scaling_direction"] = "IN"
2783 vdu_scaling_info["vdu-delete"] = {}
2784 for vdu_scale_info in scaling_descriptor["vdu"]:
2785 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2786 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2787 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2788
2789 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2790 vdu_create = vdu_scaling_info.get("vdu-create")
2791 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2792 if vdu_scaling_info["scaling_direction"] == "IN":
2793 for vdur in reversed(db_vnfr["vdur"]):
2794 if vdu_delete.get(vdur["vdu-id-ref"]):
2795 vdu_delete[vdur["vdu-id-ref"]] -= 1
2796 vdu_scaling_info["vdu"].append({
2797 "name": vdur["name"],
2798 "vdu_id": vdur["vdu-id-ref"],
2799 "interface": []
2800 })
2801 for interface in vdur["interfaces"]:
2802 vdu_scaling_info["vdu"][-1]["interface"].append({
2803 "name": interface["name"],
2804 "ip_address": interface["ip-address"],
2805 "mac_address": interface.get("mac-address"),
2806 })
2807 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2808
2809 # PRE-SCALE BEGIN
2810 step = "Executing pre-scale vnf-config-primitive"
2811 if scaling_descriptor.get("scaling-config-action"):
2812 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2813 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2814 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2815 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2816 step = db_nslcmop_update["detailed-status"] = \
2817 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2818
2819 # look for primitive
2820 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2821 if config_primitive["name"] == vnf_config_primitive:
2822 break
2823 else:
2824 raise LcmException(
2825 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2826 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2827 "primitive".format(scaling_group, config_primitive))
2828
2829 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2830 if db_vnfr.get("additionalParamsForVnf"):
2831 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2832
2833 scale_process = "VCA"
2834 db_nsr_update["config-status"] = "configuring pre-scaling"
2835 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2836
2837 # Pre-scale reintent check: Check if this sub-operation has been executed before
2838 op_index = self._check_or_add_scale_suboperation(
2839 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2840 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2841 # Skip sub-operation
2842 result = 'COMPLETED'
2843 result_detail = 'Done'
2844 self.logger.debug(logging_text +
2845 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2846 vnf_config_primitive, result, result_detail))
2847 else:
2848 if (op_index == self.SUBOPERATION_STATUS_NEW):
2849 # New sub-operation: Get index of this sub-operation
2850 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2851 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2852 format(vnf_config_primitive))
2853 else:
2854 # Reintent: Get registered params for this existing sub-operation
2855 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2856 vnf_index = op.get('member_vnf_index')
2857 vnf_config_primitive = op.get('primitive')
2858 primitive_params = op.get('primitive_params')
2859 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2860 format(vnf_config_primitive))
2861 # Execute the primitive, either with new (first-time) or registered (reintent) args
2862 result, result_detail = await self._ns_execute_primitive(
2863 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2864 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2865 vnf_config_primitive, result, result_detail))
2866 # Update operationState = COMPLETED | FAILED
2867 self._update_suboperation_status(
2868 db_nslcmop, op_index, result, result_detail)
2869
2870 if result == "FAILED":
2871 raise LcmException(result_detail)
2872 db_nsr_update["config-status"] = old_config_status
2873 scale_process = None
2874 # PRE-SCALE END
2875
2876 # SCALE RO - BEGIN
2877 # Should this block be skipped if 'RO_nsr_id' == None ?
2878 # if (RO_nsr_id and RO_scaling_info):
2879 if RO_scaling_info:
2880 scale_process = "RO"
2881 # Scale RO reintent check: Check if this sub-operation has been executed before
2882 op_index = self._check_or_add_scale_suboperation(
2883 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2884 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2885 # Skip sub-operation
2886 result = 'COMPLETED'
2887 result_detail = 'Done'
2888 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2889 result, result_detail))
2890 else:
2891 if (op_index == self.SUBOPERATION_STATUS_NEW):
2892 # New sub-operation: Get index of this sub-operation
2893 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2894 self.logger.debug(logging_text + "New sub-operation RO")
2895 else:
2896 # Reintent: Get registered params for this existing sub-operation
2897 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2898 RO_nsr_id = op.get('RO_nsr_id')
2899 RO_scaling_info = op.get('RO_scaling_info')
2900 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2901 vnf_config_primitive))
2902
2903 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2904 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2905 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2906 # wait until ready
2907 RO_nslcmop_id = RO_desc["instance_action_id"]
2908 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2909
2910 RO_task_done = False
2911 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2912 detailed_status_old = None
2913 self.logger.debug(logging_text + step)
2914
2915 deployment_timeout = 1 * 3600 # One hour
2916 while deployment_timeout > 0:
2917 if not RO_task_done:
2918 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2919 extra_item_id=RO_nslcmop_id)
2920 ns_status, ns_status_info = self.RO.check_action_status(desc)
2921 if ns_status == "ERROR":
2922 raise ROclient.ROClientException(ns_status_info)
2923 elif ns_status == "BUILD":
2924 detailed_status = step + "; {}".format(ns_status_info)
2925 elif ns_status == "ACTIVE":
2926 RO_task_done = True
2927 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2928 self.logger.debug(logging_text + step)
2929 else:
2930 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2931 else:
2932
2933 if ns_status == "ERROR":
2934 raise ROclient.ROClientException(ns_status_info)
2935 elif ns_status == "BUILD":
2936 detailed_status = step + "; {}".format(ns_status_info)
2937 elif ns_status == "ACTIVE":
2938 step = detailed_status = \
2939 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2940 if not vnfr_scaled:
2941 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2942 vnfr_scaled = True
2943 try:
2944 desc = await self.RO.show("ns", RO_nsr_id)
2945 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2946 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2947 break
2948 except LcmExceptionNoMgmtIP:
2949 pass
2950 else:
2951 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2952 if detailed_status != detailed_status_old:
2953 self._update_suboperation_status(
2954 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2955 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2956 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2957
2958 await asyncio.sleep(5, loop=self.loop)
2959 deployment_timeout -= 5
2960 if deployment_timeout <= 0:
2961 self._update_suboperation_status(
2962 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2963 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2964
2965 # update VDU_SCALING_INFO with the obtained ip_addresses
2966 if vdu_scaling_info["scaling_direction"] == "OUT":
2967 for vdur in reversed(db_vnfr["vdur"]):
2968 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
2969 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
2970 vdu_scaling_info["vdu"].append({
2971 "name": vdur["name"],
2972 "vdu_id": vdur["vdu-id-ref"],
2973 "interface": []
2974 })
2975 for interface in vdur["interfaces"]:
2976 vdu_scaling_info["vdu"][-1]["interface"].append({
2977 "name": interface["name"],
2978 "ip_address": interface["ip-address"],
2979 "mac_address": interface.get("mac-address"),
2980 })
2981 del vdu_scaling_info["vdu-create"]
2982
2983 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
2984 # SCALE RO - END
2985
2986 scale_process = None
2987 if db_nsr_update:
2988 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2989
2990 # POST-SCALE BEGIN
2991 # execute primitive service POST-SCALING
2992 step = "Executing post-scale vnf-config-primitive"
2993 if scaling_descriptor.get("scaling-config-action"):
2994 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2995 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
2996 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
2997 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2998 step = db_nslcmop_update["detailed-status"] = \
2999 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3000
3001 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3002 if db_vnfr.get("additionalParamsForVnf"):
3003 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3004
3005 # look for primitive
3006 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3007 if config_primitive["name"] == vnf_config_primitive:
3008 break
3009 else:
3010 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3011 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3012 "match any vnf-configuration:config-primitive".format(scaling_group,
3013 config_primitive))
3014 scale_process = "VCA"
3015 db_nsr_update["config-status"] = "configuring post-scaling"
3016 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3017
3018 # Post-scale reintent check: Check if this sub-operation has been executed before
3019 op_index = self._check_or_add_scale_suboperation(
3020 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3021 if op_index == self.SUBOPERATION_STATUS_SKIP:
3022 # Skip sub-operation
3023 result = 'COMPLETED'
3024 result_detail = 'Done'
3025 self.logger.debug(logging_text +
3026 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3027 format(vnf_config_primitive, result, result_detail))
3028 else:
3029 if op_index == self.SUBOPERATION_STATUS_NEW:
3030 # New sub-operation: Get index of this sub-operation
3031 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3032 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3033 format(vnf_config_primitive))
3034 else:
3035 # Reintent: Get registered params for this existing sub-operation
3036 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3037 vnf_index = op.get('member_vnf_index')
3038 vnf_config_primitive = op.get('primitive')
3039 primitive_params = op.get('primitive_params')
3040 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3041 format(vnf_config_primitive))
3042 # Execute the primitive, either with new (first-time) or registered (reintent) args
3043 result, result_detail = await self._ns_execute_primitive(
3044 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3045 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3046 vnf_config_primitive, result, result_detail))
3047 # Update operationState = COMPLETED | FAILED
3048 self._update_suboperation_status(
3049 db_nslcmop, op_index, result, result_detail)
3050
3051 if result == "FAILED":
3052 raise LcmException(result_detail)
3053 db_nsr_update["config-status"] = old_config_status
3054 scale_process = None
3055 # POST-SCALE END
3056
3057 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3058 db_nslcmop_update["statusEnteredTime"] = time()
3059 db_nslcmop_update["detailed-status"] = "done"
3060 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3061 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3062 else old_operational_status
3063 db_nsr_update["config-status"] = old_config_status
3064 return
3065 except (ROclient.ROClientException, DbException, LcmException) as e:
3066 self.logger.error(logging_text + "Exit Exception {}".format(e))
3067 exc = e
3068 except asyncio.CancelledError:
3069 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3070 exc = "Operation was cancelled"
3071 except Exception as e:
3072 exc = traceback.format_exc()
3073 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3074 finally:
3075 if exc:
3076 if db_nslcmop:
3077 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3078 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3079 db_nslcmop_update["statusEnteredTime"] = time()
3080 if db_nsr:
3081 db_nsr_update["operational-status"] = old_operational_status
3082 db_nsr_update["config-status"] = old_config_status
3083 db_nsr_update["detailed-status"] = ""
3084 db_nsr_update["_admin.nslcmop"] = None
3085 if scale_process:
3086 if "VCA" in scale_process:
3087 db_nsr_update["config-status"] = "failed"
3088 if "RO" in scale_process:
3089 db_nsr_update["operational-status"] = "failed"
3090 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3091 exc)
3092 try:
3093 if db_nslcmop and db_nslcmop_update:
3094 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3095 if db_nsr:
3096 db_nsr_update["_admin.current-operation"] = None
3097 db_nsr_update["_admin.operation-type"] = None
3098 db_nsr_update["_admin.nslcmop"] = None
3099 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3100
3101 self._write_ns_status(
3102 nsr_id=nsr_id,
3103 ns_state=None,
3104 current_operation="IDLE",
3105 current_operation_id=None
3106 )
3107
3108 except DbException as e:
3109 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3110 if nslcmop_operation_state:
3111 try:
3112 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3113 "operationState": nslcmop_operation_state},
3114 loop=self.loop)
3115 # if cooldown_time:
3116 # await asyncio.sleep(cooldown_time, loop=self.loop)
3117 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3118 except Exception as e:
3119 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3120 self.logger.debug(logging_text + "Exit")
3121 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")