fix 931: issue at logging text
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
25
26 from osm_lcm import ROclient
27 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
28 from n2vc.k8s_helm_conn import K8sHelmConnector
29 from n2vc.k8s_juju_conn import K8sJujuConnector
30
31 from osm_common.dbbase import DbException
32 from osm_common.fsbase import FsException
33
34 from n2vc.n2vc_juju_conn import N2VCJujuConnector
35 from n2vc.exceptions import N2VCException
36
37 from copy import copy, deepcopy
38 from http import HTTPStatus
39 from time import time
40 from uuid import uuid4
41
42 __author__ = "Alfonso Tierno"
43
44
45 def get_iterable(in_dict, in_key):
46 """
47 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
48 :param in_dict: a dictionary
49 :param in_key: the key to look for at in_dict
50 :return: in_dict[in_var] or () if it is None or not present
51 """
52 if not in_dict.get(in_key):
53 return ()
54 return in_dict[in_key]
55
56
57 def populate_dict(target_dict, key_list, value):
58 """
59 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
60 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
61 :param target_dict: dictionary to be changed
62 :param key_list: list of keys to insert at target_dict
63 :param value:
64 :return: None
65 """
66 for key in key_list[0:-1]:
67 if key not in target_dict:
68 target_dict[key] = {}
69 target_dict = target_dict[key]
70 target_dict[key_list[-1]] = value
71
72
73 class NsLcm(LcmBase):
74 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
75 total_deploy_timeout = 2 * 3600 # global timeout for deployment
76 timeout_charm_delete = 10 * 60
77 timeout_primitive = 10 * 60 # timeout for primitive execution
78
79 SUBOPERATION_STATUS_NOT_FOUND = -1
80 SUBOPERATION_STATUS_NEW = -2
81 SUBOPERATION_STATUS_SKIP = -3
82
83 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
84 """
85 Init, Connect to database, filesystem storage, and messaging
86 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
87 :return: None
88 """
89 super().__init__(
90 db=db,
91 msg=msg,
92 fs=fs,
93 logger=logging.getLogger('lcm.ns')
94 )
95
96 self.loop = loop
97 self.lcm_tasks = lcm_tasks
98 self.ro_config = ro_config
99 self.vca_config = vca_config
100 if 'pubkey' in self.vca_config:
101 self.vca_config['public_key'] = self.vca_config['pubkey']
102 if 'cacert' in self.vca_config:
103 self.vca_config['ca_cert'] = self.vca_config['cacert']
104 if 'apiproxy' in self.vca_config:
105 self.vca_config['api_proxy'] = self.vca_config['apiproxy']
106
107 # create N2VC connector
108 self.n2vc = N2VCJujuConnector(
109 db=self.db,
110 fs=self.fs,
111 log=self.logger,
112 loop=self.loop,
113 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
114 username=self.vca_config.get('user', None),
115 vca_config=self.vca_config,
116 on_update_db=self._on_update_n2vc_db,
117 # ca_cert=self.vca_config.get('cacert'),
118 # api_proxy=self.vca_config.get('apiproxy'),
119 )
120
121 self.k8sclusterhelm = K8sHelmConnector(
122 kubectl_command=self.vca_config.get("kubectlpath"),
123 helm_command=self.vca_config.get("helmpath"),
124 fs=self.fs,
125 log=self.logger,
126 db=self.db,
127 on_update_db=None,
128 )
129
130 self.k8sclusterjuju = K8sJujuConnector(
131 kubectl_command=self.vca_config.get("kubectlpath"),
132 juju_command=self.vca_config.get("jujupath"),
133 fs=self.fs,
134 log=self.logger,
135 db=self.db,
136 on_update_db=None,
137 )
138
139 # create RO client
140 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
141
142 def _on_update_n2vc_db(self, table, filter, path, updated_data):
143
144 self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
145 .format(table, filter, path, updated_data))
146
147 return
148 # write NS status to database
149 # try:
150 # # nsrs_id = filter.get('_id')
151 # # print(nsrs_id)
152 # # get ns record
153 # nsr = self.db.get_one(table=table, q_filter=filter)
154 # # get VCA deployed list
155 # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
156 # # get RO deployed
157 # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
158 # for vca in vca_list:
159 # # status = vca.get('status')
160 # # print(status)
161 # # detailed_status = vca.get('detailed-status')
162 # # print(detailed_status)
163 # # for ro in ro_list:
164 # # print(ro)
165 #
166 # except Exception as e:
167 # self.logger.error('Error writing NS status to db: {}'.format(e))
168
169 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
170 """
171 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
172 :param vnfd: input vnfd
173 :param new_id: overrides vnf id if provided
174 :param additionalParams: Instantiation params for VNFs provided
175 :param nsrId: Id of the NSR
176 :return: copy of vnfd
177 """
178 try:
179 vnfd_RO = deepcopy(vnfd)
180 # remove unused by RO configuration, monitoring, scaling and internal keys
181 vnfd_RO.pop("_id", None)
182 vnfd_RO.pop("_admin", None)
183 vnfd_RO.pop("vnf-configuration", None)
184 vnfd_RO.pop("monitoring-param", None)
185 vnfd_RO.pop("scaling-group-descriptor", None)
186 vnfd_RO.pop("kdu", None)
187 vnfd_RO.pop("k8s-cluster", None)
188 if new_id:
189 vnfd_RO["id"] = new_id
190
191 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
192 for vdu in get_iterable(vnfd_RO, "vdu"):
193 cloud_init_file = None
194 if vdu.get("cloud-init-file"):
195 base_folder = vnfd["_admin"]["storage"]
196 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
197 vdu["cloud-init-file"])
198 with self.fs.file_open(cloud_init_file, "r") as ci_file:
199 cloud_init_content = ci_file.read()
200 vdu.pop("cloud-init-file", None)
201 elif vdu.get("cloud-init"):
202 cloud_init_content = vdu["cloud-init"]
203 else:
204 continue
205
206 env = Environment()
207 ast = env.parse(cloud_init_content)
208 mandatory_vars = meta.find_undeclared_variables(ast)
209 if mandatory_vars:
210 for var in mandatory_vars:
211 if not additionalParams or var not in additionalParams.keys():
212 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
213 "file, must be provided in the instantiation parameters inside the "
214 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
215 template = Template(cloud_init_content)
216 cloud_init_content = template.render(additionalParams or {})
217 vdu["cloud-init"] = cloud_init_content
218
219 return vnfd_RO
220 except FsException as e:
221 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
222 format(vnfd["id"], vdu["id"], cloud_init_file, e))
223 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
224 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
225 format(vnfd["id"], vdu["id"], e))
226
227 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
228 """
229 Creates a RO ns descriptor from OSM ns_instantiate params
230 :param ns_params: OSM instantiate params
231 :return: The RO ns descriptor
232 """
233 vim_2_RO = {}
234 wim_2_RO = {}
235 # TODO feature 1417: Check that no instantiation is set over PDU
236 # check if PDU forces a concrete vim-network-id and add it
237 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
238
239 def vim_account_2_RO(vim_account):
240 if vim_account in vim_2_RO:
241 return vim_2_RO[vim_account]
242
243 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
244 if db_vim["_admin"]["operationalState"] != "ENABLED":
245 raise LcmException("VIM={} is not available. operationalState={}".format(
246 vim_account, db_vim["_admin"]["operationalState"]))
247 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
248 vim_2_RO[vim_account] = RO_vim_id
249 return RO_vim_id
250
251 def wim_account_2_RO(wim_account):
252 if isinstance(wim_account, str):
253 if wim_account in wim_2_RO:
254 return wim_2_RO[wim_account]
255
256 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
257 if db_wim["_admin"]["operationalState"] != "ENABLED":
258 raise LcmException("WIM={} is not available. operationalState={}".format(
259 wim_account, db_wim["_admin"]["operationalState"]))
260 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
261 wim_2_RO[wim_account] = RO_wim_id
262 return RO_wim_id
263 else:
264 return wim_account
265
266 def ip_profile_2_RO(ip_profile):
267 RO_ip_profile = deepcopy((ip_profile))
268 if "dns-server" in RO_ip_profile:
269 if isinstance(RO_ip_profile["dns-server"], list):
270 RO_ip_profile["dns-address"] = []
271 for ds in RO_ip_profile.pop("dns-server"):
272 RO_ip_profile["dns-address"].append(ds['address'])
273 else:
274 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
275 if RO_ip_profile.get("ip-version") == "ipv4":
276 RO_ip_profile["ip-version"] = "IPv4"
277 if RO_ip_profile.get("ip-version") == "ipv6":
278 RO_ip_profile["ip-version"] = "IPv6"
279 if "dhcp-params" in RO_ip_profile:
280 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
281 return RO_ip_profile
282
283 if not ns_params:
284 return None
285 RO_ns_params = {
286 # "name": ns_params["nsName"],
287 # "description": ns_params.get("nsDescription"),
288 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
289 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
290 # "scenario": ns_params["nsdId"],
291 }
292
293 n2vc_key_list = n2vc_key_list or []
294 for vnfd_ref, vnfd in vnfd_dict.items():
295 vdu_needed_access = []
296 mgmt_cp = None
297 if vnfd.get("vnf-configuration"):
298 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
299 if ssh_required and vnfd.get("mgmt-interface"):
300 if vnfd["mgmt-interface"].get("vdu-id"):
301 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
302 elif vnfd["mgmt-interface"].get("cp"):
303 mgmt_cp = vnfd["mgmt-interface"]["cp"]
304
305 for vdu in vnfd.get("vdu", ()):
306 if vdu.get("vdu-configuration"):
307 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
308 if ssh_required:
309 vdu_needed_access.append(vdu["id"])
310 elif mgmt_cp:
311 for vdu_interface in vdu.get("interface"):
312 if vdu_interface.get("external-connection-point-ref") and \
313 vdu_interface["external-connection-point-ref"] == mgmt_cp:
314 vdu_needed_access.append(vdu["id"])
315 mgmt_cp = None
316 break
317
318 if vdu_needed_access:
319 for vnf_member in nsd.get("constituent-vnfd"):
320 if vnf_member["vnfd-id-ref"] != vnfd_ref:
321 continue
322 for vdu in vdu_needed_access:
323 populate_dict(RO_ns_params,
324 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
325 n2vc_key_list)
326
327 if ns_params.get("vduImage"):
328 RO_ns_params["vduImage"] = ns_params["vduImage"]
329
330 if ns_params.get("ssh_keys"):
331 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
332 for vnf_params in get_iterable(ns_params, "vnf"):
333 for constituent_vnfd in nsd["constituent-vnfd"]:
334 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
335 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
336 break
337 else:
338 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
339 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
340 if vnf_params.get("vimAccountId"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
342 vim_account_2_RO(vnf_params["vimAccountId"]))
343
344 for vdu_params in get_iterable(vnf_params, "vdu"):
345 # TODO feature 1417: check that this VDU exist and it is not a PDU
346 if vdu_params.get("volume"):
347 for volume_params in vdu_params["volume"]:
348 if volume_params.get("vim-volume-id"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
350 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
351 volume_params["vim-volume-id"])
352 if vdu_params.get("interface"):
353 for interface_params in vdu_params["interface"]:
354 if interface_params.get("ip-address"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "ip_address"),
358 interface_params["ip-address"])
359 if interface_params.get("mac-address"):
360 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
361 vdu_params["id"], "interfaces", interface_params["name"],
362 "mac_address"),
363 interface_params["mac-address"])
364 if interface_params.get("floating-ip-required"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_params["id"], "interfaces", interface_params["name"],
367 "floating-ip"),
368 interface_params["floating-ip-required"])
369
370 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
371 if internal_vld_params.get("vim-network-name"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
373 internal_vld_params["name"], "vim-network-name"),
374 internal_vld_params["vim-network-name"])
375 if internal_vld_params.get("vim-network-id"):
376 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
377 internal_vld_params["name"], "vim-network-id"),
378 internal_vld_params["vim-network-id"])
379 if internal_vld_params.get("ip-profile"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
381 internal_vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(internal_vld_params["ip-profile"]))
383 if internal_vld_params.get("provider-network"):
384
385 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
386 internal_vld_params["name"], "provider-network"),
387 internal_vld_params["provider-network"].copy())
388
389 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
390 # look for interface
391 iface_found = False
392 for vdu_descriptor in vnf_descriptor["vdu"]:
393 for vdu_interface in vdu_descriptor["interface"]:
394 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
395 if icp_params.get("ip-address"):
396 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
397 vdu_descriptor["id"], "interfaces",
398 vdu_interface["name"], "ip_address"),
399 icp_params["ip-address"])
400
401 if icp_params.get("mac-address"):
402 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
403 vdu_descriptor["id"], "interfaces",
404 vdu_interface["name"], "mac_address"),
405 icp_params["mac-address"])
406 iface_found = True
407 break
408 if iface_found:
409 break
410 else:
411 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
412 "internal-vld:id-ref={} is not present at vnfd:internal-"
413 "connection-point".format(vnf_params["member-vnf-index"],
414 icp_params["id-ref"]))
415
416 for vld_params in get_iterable(ns_params, "vld"):
417 if "ip-profile" in vld_params:
418 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
419 ip_profile_2_RO(vld_params["ip-profile"]))
420
421 if vld_params.get("provider-network"):
422
423 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
424 vld_params["provider-network"].copy())
425
426 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
427 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
428 wim_account_2_RO(vld_params["wimAccountId"])),
429 if vld_params.get("vim-network-name"):
430 RO_vld_sites = []
431 if isinstance(vld_params["vim-network-name"], dict):
432 for vim_account, vim_net in vld_params["vim-network-name"].items():
433 RO_vld_sites.append({
434 "netmap-use": vim_net,
435 "datacenter": vim_account_2_RO(vim_account)
436 })
437 else: # isinstance str
438 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
439 if RO_vld_sites:
440 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
441
442 if vld_params.get("vim-network-id"):
443 RO_vld_sites = []
444 if isinstance(vld_params["vim-network-id"], dict):
445 for vim_account, vim_net in vld_params["vim-network-id"].items():
446 RO_vld_sites.append({
447 "netmap-use": vim_net,
448 "datacenter": vim_account_2_RO(vim_account)
449 })
450 else: # isinstance str
451 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
452 if RO_vld_sites:
453 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
454 if vld_params.get("ns-net"):
455 if isinstance(vld_params["ns-net"], dict):
456 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
457 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
458 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
459 if "vnfd-connection-point-ref" in vld_params:
460 for cp_params in vld_params["vnfd-connection-point-ref"]:
461 # look for interface
462 for constituent_vnfd in nsd["constituent-vnfd"]:
463 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
464 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
465 break
466 else:
467 raise LcmException(
468 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
469 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
470 match_cp = False
471 for vdu_descriptor in vnf_descriptor["vdu"]:
472 for interface_descriptor in vdu_descriptor["interface"]:
473 if interface_descriptor.get("external-connection-point-ref") == \
474 cp_params["vnfd-connection-point-ref"]:
475 match_cp = True
476 break
477 if match_cp:
478 break
479 else:
480 raise LcmException(
481 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
482 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
483 cp_params["member-vnf-index-ref"],
484 cp_params["vnfd-connection-point-ref"],
485 vnf_descriptor["id"]))
486 if cp_params.get("ip-address"):
487 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
488 vdu_descriptor["id"], "interfaces",
489 interface_descriptor["name"], "ip_address"),
490 cp_params["ip-address"])
491 if cp_params.get("mac-address"):
492 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
493 vdu_descriptor["id"], "interfaces",
494 interface_descriptor["name"], "mac_address"),
495 cp_params["mac-address"])
496 return RO_ns_params
497
498 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
499 # make a copy to do not change
500 vdu_create = copy(vdu_create)
501 vdu_delete = copy(vdu_delete)
502
503 vdurs = db_vnfr.get("vdur")
504 if vdurs is None:
505 vdurs = []
506 vdu_index = len(vdurs)
507 while vdu_index:
508 vdu_index -= 1
509 vdur = vdurs[vdu_index]
510 if vdur.get("pdu-type"):
511 continue
512 vdu_id_ref = vdur["vdu-id-ref"]
513 if vdu_create and vdu_create.get(vdu_id_ref):
514 for index in range(0, vdu_create[vdu_id_ref]):
515 vdur = deepcopy(vdur)
516 vdur["_id"] = str(uuid4())
517 vdur["count-index"] += 1
518 vdurs.insert(vdu_index+1+index, vdur)
519 del vdu_create[vdu_id_ref]
520 if vdu_delete and vdu_delete.get(vdu_id_ref):
521 del vdurs[vdu_index]
522 vdu_delete[vdu_id_ref] -= 1
523 if not vdu_delete[vdu_id_ref]:
524 del vdu_delete[vdu_id_ref]
525 # check all operations are done
526 if vdu_create or vdu_delete:
527 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
528 vdu_create))
529 if vdu_delete:
530 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
531 vdu_delete))
532
533 vnfr_update = {"vdur": vdurs}
534 db_vnfr["vdur"] = vdurs
535 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
536
537 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
538 """
539 Updates database nsr with the RO info for the created vld
540 :param ns_update_nsr: dictionary to be filled with the updated info
541 :param db_nsr: content of db_nsr. This is also modified
542 :param nsr_desc_RO: nsr descriptor from RO
543 :return: Nothing, LcmException is raised on errors
544 """
545
546 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
547 for net_RO in get_iterable(nsr_desc_RO, "nets"):
548 if vld["id"] != net_RO.get("ns_net_osm_id"):
549 continue
550 vld["vim-id"] = net_RO.get("vim_net_id")
551 vld["name"] = net_RO.get("vim_name")
552 vld["status"] = net_RO.get("status")
553 vld["status-detailed"] = net_RO.get("error_msg")
554 ns_update_nsr["vld.{}".format(vld_index)] = vld
555 break
556 else:
557 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
558
559 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
560 """
561 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
562 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
563 :param nsr_desc_RO: nsr descriptor from RO
564 :return: Nothing, LcmException is raised on errors
565 """
566 for vnf_index, db_vnfr in db_vnfrs.items():
567 for vnf_RO in nsr_desc_RO["vnfs"]:
568 if vnf_RO["member_vnf_index"] != vnf_index:
569 continue
570 vnfr_update = {}
571 if vnf_RO.get("ip_address"):
572 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
573 elif not db_vnfr.get("ip-address"):
574 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
575
576 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
577 vdur_RO_count_index = 0
578 if vdur.get("pdu-type"):
579 continue
580 for vdur_RO in get_iterable(vnf_RO, "vms"):
581 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
582 continue
583 if vdur["count-index"] != vdur_RO_count_index:
584 vdur_RO_count_index += 1
585 continue
586 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
587 if vdur_RO.get("ip_address"):
588 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
589 else:
590 vdur["ip-address"] = None
591 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
592 vdur["name"] = vdur_RO.get("vim_name")
593 vdur["status"] = vdur_RO.get("status")
594 vdur["status-detailed"] = vdur_RO.get("error_msg")
595 for ifacer in get_iterable(vdur, "interfaces"):
596 for interface_RO in get_iterable(vdur_RO, "interfaces"):
597 if ifacer["name"] == interface_RO.get("internal_name"):
598 ifacer["ip-address"] = interface_RO.get("ip_address")
599 ifacer["mac-address"] = interface_RO.get("mac_address")
600 break
601 else:
602 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
603 "from VIM info"
604 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
605 vnfr_update["vdur.{}".format(vdu_index)] = vdur
606 break
607 else:
608 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
609 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
610
611 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
612 for net_RO in get_iterable(nsr_desc_RO, "nets"):
613 if vld["id"] != net_RO.get("vnf_net_osm_id"):
614 continue
615 vld["vim-id"] = net_RO.get("vim_net_id")
616 vld["name"] = net_RO.get("vim_name")
617 vld["status"] = net_RO.get("status")
618 vld["status-detailed"] = net_RO.get("error_msg")
619 vnfr_update["vld.{}".format(vld_index)] = vld
620 break
621 else:
622 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
623 vnf_index, vld["id"]))
624
625 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
626 break
627
628 else:
629 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
630
631 @staticmethod
632 def _get_ns_config_info(vca_deployed_list):
633 """
634 Generates a mapping between vnf,vdu elements and the N2VC id
635 :param vca_deployed_list: List of database _admin.deploy.VCA that contains this list
636 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
637 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
638 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
639 """
640 mapping = {}
641 ns_config_info = {"osm-config-mapping": mapping}
642 for vca in vca_deployed_list:
643 if not vca["member-vnf-index"]:
644 continue
645 if not vca["vdu_id"]:
646 mapping[vca["member-vnf-index"]] = vca["application"]
647 else:
648 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
649 vca["application"]
650 return ns_config_info
651
652 @staticmethod
653 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
654 """
655 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
656 primitives as verify-ssh-credentials, or config when needed
657 :param desc_primitive_list: information of the descriptor
658 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
659 this element contains a ssh public key
660 :return: The modified list. Can ba an empty list, but always a list
661 """
662 if desc_primitive_list:
663 primitive_list = desc_primitive_list.copy()
664 else:
665 primitive_list = []
666 # look for primitive config, and get the position. None if not present
667 config_position = None
668 for index, primitive in enumerate(primitive_list):
669 if primitive["name"] == "config":
670 config_position = index
671 break
672
673 # for NS, add always a config primitive if not present (bug 874)
674 if not vca_deployed["member-vnf-index"] and config_position is None:
675 primitive_list.insert(0, {"name": "config", "parameter": []})
676 config_position = 0
677 # for VNF/VDU add verify-ssh-credentials after config
678 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
679 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
680 return primitive_list
681
682 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr,
683 db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list):
684
685 db_nsr_update = {}
686 RO_descriptor_number = 0 # number of descriptors created at RO
687 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
688 start_deploy = time()
689 vdu_flag = False # If any of the VNFDs has VDUs
690 ns_params = db_nslcmop.get("operationParams")
691
692 # deploy RO
693
694 # get vnfds, instantiate at RO
695
696 for c_vnf in nsd.get("constituent-vnfd", ()):
697 member_vnf_index = c_vnf["member-vnf-index"]
698 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
699 if vnfd.get("vdu"):
700 vdu_flag = True
701 vnfd_ref = vnfd["id"]
702 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at" \
703 " RO".format(vnfd_ref, member_vnf_index)
704 # self.logger.debug(logging_text + step)
705 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
706 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
707 RO_descriptor_number += 1
708
709 # look position at deployed.RO.vnfd if not present it will be appended at the end
710 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
711 if vnf_deployed["member-vnf-index"] == member_vnf_index:
712 break
713 else:
714 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
715 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
716
717 # look if present
718 RO_update = {"member-vnf-index": member_vnf_index}
719 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
720 if vnfd_list:
721 RO_update["id"] = vnfd_list[0]["uuid"]
722 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
723 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
724 else:
725 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
726 get("additionalParamsForVnf"), nsr_id)
727 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
728 RO_update["id"] = desc["uuid"]
729 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
730 vnfd_ref, member_vnf_index, desc["uuid"]))
731 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
732 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
733 self.update_db_2("nsrs", nsr_id, db_nsr_update)
734 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
735
736 # create nsd at RO
737 nsd_ref = nsd["id"]
738 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
739 # self.logger.debug(logging_text + step)
740
741 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
742 RO_descriptor_number += 1
743 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
744 if nsd_list:
745 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
746 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
747 nsd_ref, RO_nsd_uuid))
748 else:
749 nsd_RO = deepcopy(nsd)
750 nsd_RO["id"] = RO_osm_nsd_id
751 nsd_RO.pop("_id", None)
752 nsd_RO.pop("_admin", None)
753 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
754 member_vnf_index = c_vnf["member-vnf-index"]
755 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
756 for c_vld in nsd_RO.get("vld", ()):
757 for cp in c_vld.get("vnfd-connection-point-ref", ()):
758 member_vnf_index = cp["member-vnf-index-ref"]
759 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
760
761 desc = await self.RO.create("nsd", descriptor=nsd_RO)
762 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
763 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
764 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
765 self.update_db_2("nsrs", nsr_id, db_nsr_update)
766 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
767
768 # Crate ns at RO
769 # if present use it unless in error status
770 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
771 if RO_nsr_id:
772 try:
773 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
774 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
775 desc = await self.RO.show("ns", RO_nsr_id)
776 except ROclient.ROClientException as e:
777 if e.http_code != HTTPStatus.NOT_FOUND:
778 raise
779 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
780 if RO_nsr_id:
781 ns_status, ns_status_info = self.RO.check_ns_status(desc)
782 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
783 if ns_status == "ERROR":
784 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deleting ns at RO. RO_ns_id={}"\
785 .format(RO_nsr_id)
786 self.logger.debug(logging_text + step)
787 await self.RO.delete("ns", RO_nsr_id)
788 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
789 if not RO_nsr_id:
790 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking dependencies"
791 # self.logger.debug(logging_text + step)
792
793 # check if VIM is creating and wait look if previous tasks in process
794 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
795 if task_dependency:
796 step = "Waiting for related tasks to be completed: {}".format(task_name)
797 self.logger.debug(logging_text + step)
798 await asyncio.wait(task_dependency, timeout=3600)
799 if ns_params.get("vnf"):
800 for vnf in ns_params["vnf"]:
801 if "vimAccountId" in vnf:
802 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
803 vnf["vimAccountId"])
804 if task_dependency:
805 step = "Waiting for related tasks to be completed: {}".format(task_name)
806 self.logger.debug(logging_text + step)
807 await asyncio.wait(task_dependency, timeout=3600)
808
809 step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Checking instantiation parameters"
810
811 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
812
813 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
814 # step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deploying ns at VIM"
815 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
816 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
817 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
818 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
819 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
820 self.update_db_2("nsrs", nsr_id, db_nsr_update)
821 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
822
823 # wait until NS is ready
824 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
825 detailed_status_old = None
826 self.logger.debug(logging_text + step)
827
828 while time() <= start_deploy + self.total_deploy_timeout:
829 desc = await self.RO.show("ns", RO_nsr_id)
830 ns_status, ns_status_info = self.RO.check_ns_status(desc)
831 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
832 if ns_status == "ERROR":
833 raise ROclient.ROClientException(ns_status_info)
834 elif ns_status == "BUILD":
835 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
836 elif ns_status == "ACTIVE":
837 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
838 try:
839 if vdu_flag:
840 self.ns_update_vnfr(db_vnfrs, desc)
841 break
842 except LcmExceptionNoMgmtIP:
843 pass
844 else:
845 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
846 if detailed_status != detailed_status_old:
847 detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
848 self.update_db_2("nsrs", nsr_id, db_nsr_update)
849 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
850 await asyncio.sleep(5, loop=self.loop)
851 else: # total_deploy_timeout
852 raise ROclient.ROClientException("Timeout waiting ns to be ready")
853
854 step = "Updating NSR"
855 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
856
857 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
858 db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
859 db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
860 self.update_db_2("nsrs", nsr_id, db_nsr_update)
861 self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
862
863 step = "Deployed at VIM"
864 self.logger.debug(logging_text + step)
865
866 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
867 """
868 Wait for ip addres at RO, and optionally, insert public key in virtual machine
869 :param logging_text: prefix use for logging
870 :param nsr_id:
871 :param vnfr_id:
872 :param vdu_id:
873 :param vdu_index:
874 :param pub_key: public ssh key to inject, None to skip
875 :param user: user to apply the public ssh key
876 :return: IP address
877 """
878
879 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
880 ro_nsr_id = None
881 ip_address = None
882 nb_tries = 0
883 target_vdu_id = None
884 ro_retries = 0
885
886 while True:
887
888 ro_retries += 1
889 if ro_retries >= 360: # 1 hour
890 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
891
892 await asyncio.sleep(10, loop=self.loop)
893 # wait until NS is deployed at RO
894 if not ro_nsr_id:
895 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
896 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
897 if not ro_nsr_id:
898 continue
899
900 # get ip address
901 if not target_vdu_id:
902 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
903
904 if not vdu_id: # for the VNF case
905 ip_address = db_vnfr.get("ip-address")
906 if not ip_address:
907 continue
908 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
909 else: # VDU case
910 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
911 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
912
913 if not vdur:
914 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
915 vnfr_id, vdu_id, vdu_index
916 ))
917
918 if vdur.get("status") == "ACTIVE":
919 ip_address = vdur.get("ip-address")
920 if not ip_address:
921 continue
922 target_vdu_id = vdur["vdu-id-ref"]
923 elif vdur.get("status") == "ERROR":
924 raise LcmException("Cannot inject ssh-key because target VM is in error state")
925
926 if not target_vdu_id:
927 continue
928
929 # self.logger.debug(logging_text + "IP address={}".format(ip_address))
930
931 # inject public key into machine
932 if pub_key and user:
933 # self.logger.debug(logging_text + "Inserting RO key")
934 try:
935 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
936 result_dict = await self.RO.create_action(
937 item="ns",
938 item_id_name=ro_nsr_id,
939 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
940 )
941 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
942 if not result_dict or not isinstance(result_dict, dict):
943 raise LcmException("Unknown response from RO when injecting key")
944 for result in result_dict.values():
945 if result.get("vim_result") == 200:
946 break
947 else:
948 raise ROclient.ROClientException("error injecting key: {}".format(
949 result.get("description")))
950 break
951 except ROclient.ROClientException as e:
952 if not nb_tries:
953 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
954 format(e, 20*10))
955 nb_tries += 1
956 if nb_tries >= 20:
957 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
958 else:
959 break
960
961 return ip_address
962
963 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
964 kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
965 nsr_id = db_nsr["_id"]
966 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
967 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
968 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
969 db_dict = {
970 'collection': 'nsrs',
971 'filter': {'_id': nsr_id},
972 'path': db_update_entry
973 }
974 step = ""
975 try:
976 vnfr_id = None
977 if db_vnfr:
978 vnfr_id = db_vnfr["_id"]
979
980 namespace = "{nsi}.{ns}".format(
981 nsi=nsi_id if nsi_id else "",
982 ns=nsr_id)
983 if vnfr_id:
984 namespace += "." + vnfr_id
985 if vdu_id:
986 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
987
988 # Get artifact path
989 artifact_path = "{}/{}/charms/{}".format(
990 base_folder["folder"],
991 base_folder["pkg-dir"],
992 config_descriptor["juju"]["charm"]
993 )
994
995 is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None
996 if deep_get(config_descriptor, ('juju', 'proxy')) is False:
997 is_proxy_charm = False
998
999 # n2vc_redesign STEP 3.1
1000
1001 # find old ee_id if exists
1002 ee_id = vca_deployed.get("ee_id")
1003
1004 # create or register execution environment in VCA
1005 if is_proxy_charm:
1006 step = "create execution environment"
1007 self.logger.debug(logging_text + step)
1008 ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
1009 reuse_ee_id=ee_id,
1010 db_dict=db_dict)
1011 else:
1012 step = "Waiting to VM being up and getting IP address"
1013 self.logger.debug(logging_text + step)
1014 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1015 user=None, pub_key=None)
1016 credentials = {"hostname": rw_mgmt_ip}
1017 # get username
1018 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1019 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1020 # merged. Meanwhile let's get username from initial-config-primitive
1021 if not username and config_descriptor.get("initial-config-primitive"):
1022 for config_primitive in config_descriptor["initial-config-primitive"]:
1023 for param in config_primitive.get("parameter", ()):
1024 if param["name"] == "ssh-username":
1025 username = param["value"]
1026 break
1027 if not username:
1028 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1029 "'config-access.ssh-access.default-user'")
1030 credentials["username"] = username
1031 # n2vc_redesign STEP 3.2
1032
1033 step = "register execution environment {}".format(credentials)
1034 self.logger.debug(logging_text + step)
1035 ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
1036 db_dict=db_dict)
1037
1038 # for compatibility with MON/POL modules, the need model and application name at database
1039 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
1040 ee_id_parts = ee_id.split('.')
1041 model_name = ee_id_parts[0]
1042 application_name = ee_id_parts[1]
1043 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
1044 db_update_entry + "application": application_name,
1045 db_update_entry + "ee_id": ee_id})
1046
1047 # n2vc_redesign STEP 3.3
1048
1049 step = "Install configuration Software"
1050 # TODO check if already done
1051 self.logger.debug(logging_text + step)
1052 await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
1053
1054 # if SSH access is required, then get execution environment SSH public
1055 if is_proxy_charm: # if native charm we have waited already to VM be UP
1056 pub_key = None
1057 user = None
1058 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1059 # Needed to inject a ssh key
1060 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1061 step = "Install configuration Software, getting public ssh key"
1062 pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1063
1064 step = "Insert public key into VM"
1065 else:
1066 step = "Waiting to VM being up and getting IP address"
1067 self.logger.debug(logging_text + step)
1068
1069 # n2vc_redesign STEP 5.1
1070 # wait for RO (ip-address) Insert pub_key into VM
1071 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1072 user=user, pub_key=pub_key)
1073
1074 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1075
1076 # store rw_mgmt_ip in deploy params for later replacement
1077 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1078
1079 # n2vc_redesign STEP 6 Execute initial config primitive
1080 step = 'execute initial config primitive'
1081 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1082
1083 # sort initial config primitives by 'seq'
1084 try:
1085 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1086 except Exception as e:
1087 self.logger.error(logging_text + step + ": " + str(e))
1088
1089 # add config if not present for NS charm
1090 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1091 vca_deployed)
1092
1093 for initial_config_primitive in initial_config_primitive_list:
1094 # adding information on the vca_deployed if it is a NS execution environment
1095 if not vca_deployed["member-vnf-index"]:
1096 deploy_params["ns_config_info"] = self._get_ns_config_info(vca_deployed_list)
1097 # TODO check if already done
1098 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1099
1100 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1101 self.logger.debug(logging_text + step)
1102 await self.n2vc.exec_primitive(
1103 ee_id=ee_id,
1104 primitive_name=initial_config_primitive["name"],
1105 params_dict=primitive_params_,
1106 db_dict=db_dict
1107 )
1108 # TODO register in database that primitive is done
1109
1110 step = "instantiated at VCA"
1111 self.logger.debug(logging_text + step)
1112
1113 except Exception as e: # TODO not use Exception but N2VC exception
1114 raise Exception("{} {}".format(step, e)) from e
1115 # TODO raise N2VC exception with 'step' extra information
1116
1117 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1118 error_description: str = None, error_detail: str = None):
1119 try:
1120 db_dict = dict()
1121 if ns_state:
1122 db_dict["nsState"] = ns_state
1123 db_dict["currentOperation"] = current_operation
1124 db_dict["currentOperationID"] = current_operation_id
1125 db_dict["errorDescription"] = error_description
1126 db_dict["errorDetail"] = error_detail
1127 self.update_db_2("nsrs", nsr_id, db_dict)
1128 except Exception as e:
1129 self.logger.warn('Error writing NS status: {}'.format(e))
1130
1131 async def instantiate(self, nsr_id, nslcmop_id):
1132 """
1133
1134 :param nsr_id: ns instance to deploy
1135 :param nslcmop_id: operation to run
1136 :return:
1137 """
1138
1139 # Try to lock HA task here
1140 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1141 if not task_is_locked_by_me:
1142 self.logger.debug('instantiate() task is not locked by me')
1143 return
1144
1145 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1146 self.logger.debug(logging_text + "Enter")
1147
1148 # get all needed from database
1149
1150 # database nsrs record
1151 db_nsr = None
1152
1153 # database nslcmops record
1154 db_nslcmop = None
1155
1156 # update operation on nsrs
1157 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
1158 "_admin.current-operation": nslcmop_id,
1159 "_admin.operation-type": "instantiate"}
1160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1161
1162 # update operation on nslcmops
1163 db_nslcmop_update = {}
1164
1165 nslcmop_operation_state = None
1166 db_vnfrs = {} # vnf's info indexed by member-index
1167 # n2vc_info = {}
1168 task_instantiation_list = []
1169 task_instantiation_info = {} # from task to info text
1170 exc = None
1171 try:
1172 # wait for any previous tasks in process
1173 step = "Waiting for previous operations to terminate"
1174 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1175
1176 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1177
1178 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1179 self._write_ns_status(
1180 nsr_id=nsr_id,
1181 ns_state="BUILDING",
1182 current_operation="INSTANTIATING",
1183 current_operation_id=nslcmop_id
1184 )
1185
1186 # read from db: operation
1187 step = "Getting nslcmop={} from db".format(nslcmop_id)
1188 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1189
1190 # read from db: ns
1191 step = "Getting nsr={} from db".format(nsr_id)
1192 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1193 # nsd is replicated into ns (no db read)
1194 nsd = db_nsr["nsd"]
1195 # nsr_name = db_nsr["name"] # TODO short-name??
1196
1197 # read from db: vnf's of this ns
1198 step = "Getting vnfrs from db"
1199 self.logger.debug(logging_text + step)
1200 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1201
1202 # read from db: vnfd's for every vnf
1203 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1204 db_vnfds = {} # every vnfd data indexed by vnf id
1205 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1206
1207 # for each vnf in ns, read vnfd
1208 for vnfr in db_vnfrs_list:
1209 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1210 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1211 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1212 # if we haven't this vnfd, read it from db
1213 if vnfd_id not in db_vnfds:
1214 # read from cb
1215 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1216 self.logger.debug(logging_text + step)
1217 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1218
1219 # store vnfd
1220 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1221 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1222 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1223
1224 # Get or generates the _admin.deployed.VCA list
1225 vca_deployed_list = None
1226 if db_nsr["_admin"].get("deployed"):
1227 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1228 if vca_deployed_list is None:
1229 vca_deployed_list = []
1230 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1231 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1232 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1233 elif isinstance(vca_deployed_list, dict):
1234 # maintain backward compatibility. Change a dict to list at database
1235 vca_deployed_list = list(vca_deployed_list.values())
1236 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1237 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1238
1239 db_nsr_update["detailed-status"] = "creating"
1240 db_nsr_update["operational-status"] = "init"
1241
1242 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1243 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1244 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1245
1246 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1247 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1248 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1249 self.logger.debug(logging_text + "Before deploy_kdus")
1250 # Call to deploy_kdus in case exists the "vdu:kdu" param
1251 task_kdu = asyncio.ensure_future(
1252 self.deploy_kdus(
1253 logging_text=logging_text,
1254 nsr_id=nsr_id,
1255 db_nsr=db_nsr,
1256 db_vnfrs=db_vnfrs,
1257 )
1258 )
1259 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
1260 task_instantiation_info[task_kdu] = "Deploy KDUs"
1261 task_instantiation_list.append(task_kdu)
1262 # n2vc_redesign STEP 1 Get VCA public ssh-key
1263 # feature 1429. Add n2vc public key to needed VMs
1264 n2vc_key = self.n2vc.get_public_key()
1265 n2vc_key_list = [n2vc_key]
1266 if self.vca_config.get("public_key"):
1267 n2vc_key_list.append(self.vca_config["public_key"])
1268
1269 # n2vc_redesign STEP 2 Deploy Network Scenario
1270 task_ro = asyncio.ensure_future(
1271 self.instantiate_RO(
1272 logging_text=logging_text,
1273 nsr_id=nsr_id,
1274 nsd=nsd,
1275 db_nsr=db_nsr,
1276 db_nslcmop=db_nslcmop,
1277 db_vnfrs=db_vnfrs,
1278 db_vnfds_ref=db_vnfds_ref,
1279 n2vc_key_list=n2vc_key_list
1280 )
1281 )
1282 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1283 task_instantiation_info[task_ro] = "Deploy at VIM"
1284 task_instantiation_list.append(task_ro)
1285
1286 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1287 step = "Looking for needed vnfd to configure with proxy charm"
1288 self.logger.debug(logging_text + step)
1289
1290 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1291 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1292 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1293 vnfd_id = c_vnf["vnfd-id-ref"]
1294 vnfd = db_vnfds_ref[vnfd_id]
1295 member_vnf_index = str(c_vnf["member-vnf-index"])
1296 db_vnfr = db_vnfrs[member_vnf_index]
1297 base_folder = vnfd["_admin"]["storage"]
1298 vdu_id = None
1299 vdu_index = 0
1300 vdu_name = None
1301 kdu_name = None
1302
1303 # Get additional parameters
1304 deploy_params = {}
1305 if db_vnfr.get("additionalParamsForVnf"):
1306 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
1307
1308 descriptor_config = vnfd.get("vnf-configuration")
1309 if descriptor_config and descriptor_config.get("juju"):
1310 self._deploy_n2vc(
1311 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1312 db_nsr=db_nsr,
1313 db_vnfr=db_vnfr,
1314 nslcmop_id=nslcmop_id,
1315 nsr_id=nsr_id,
1316 nsi_id=nsi_id,
1317 vnfd_id=vnfd_id,
1318 vdu_id=vdu_id,
1319 kdu_name=kdu_name,
1320 member_vnf_index=member_vnf_index,
1321 vdu_index=vdu_index,
1322 vdu_name=vdu_name,
1323 deploy_params=deploy_params,
1324 descriptor_config=descriptor_config,
1325 base_folder=base_folder,
1326 task_instantiation_list=task_instantiation_list,
1327 task_instantiation_info=task_instantiation_info
1328 )
1329
1330 # Deploy charms for each VDU that supports one.
1331 for vdud in get_iterable(vnfd, 'vdu'):
1332 vdu_id = vdud["id"]
1333 descriptor_config = vdud.get('vdu-configuration')
1334 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
1335 if vdur.get("additionalParams"):
1336 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
1337 else:
1338 deploy_params_vdu = deploy_params
1339 if descriptor_config and descriptor_config.get("juju"):
1340 # look for vdu index in the db_vnfr["vdu"] section
1341 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1342 # if vdur["vdu-id-ref"] == vdu_id:
1343 # break
1344 # else:
1345 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1346 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1347 # vdu_name = vdur.get("name")
1348 vdu_name = None
1349 kdu_name = None
1350 for vdu_index in range(int(vdud.get("count", 1))):
1351 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1352 self._deploy_n2vc(
1353 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1354 member_vnf_index, vdu_id, vdu_index),
1355 db_nsr=db_nsr,
1356 db_vnfr=db_vnfr,
1357 nslcmop_id=nslcmop_id,
1358 nsr_id=nsr_id,
1359 nsi_id=nsi_id,
1360 vnfd_id=vnfd_id,
1361 vdu_id=vdu_id,
1362 kdu_name=kdu_name,
1363 member_vnf_index=member_vnf_index,
1364 vdu_index=vdu_index,
1365 vdu_name=vdu_name,
1366 deploy_params=deploy_params_vdu,
1367 descriptor_config=descriptor_config,
1368 base_folder=base_folder,
1369 task_instantiation_list=task_instantiation_list,
1370 task_instantiation_info=task_instantiation_info
1371 )
1372 for kdud in get_iterable(vnfd, 'kdu'):
1373 kdu_name = kdud["name"]
1374 descriptor_config = kdud.get('kdu-configuration')
1375 if descriptor_config and descriptor_config.get("juju"):
1376 vdu_id = None
1377 vdu_index = 0
1378 vdu_name = None
1379 # look for vdu index in the db_vnfr["vdu"] section
1380 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1381 # if vdur["vdu-id-ref"] == vdu_id:
1382 # break
1383 # else:
1384 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1385 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1386 # vdu_name = vdur.get("name")
1387 # vdu_name = None
1388
1389 self._deploy_n2vc(
1390 logging_text=logging_text,
1391 db_nsr=db_nsr,
1392 db_vnfr=db_vnfr,
1393 nslcmop_id=nslcmop_id,
1394 nsr_id=nsr_id,
1395 nsi_id=nsi_id,
1396 vnfd_id=vnfd_id,
1397 vdu_id=vdu_id,
1398 kdu_name=kdu_name,
1399 member_vnf_index=member_vnf_index,
1400 vdu_index=vdu_index,
1401 vdu_name=vdu_name,
1402 deploy_params=deploy_params,
1403 descriptor_config=descriptor_config,
1404 base_folder=base_folder,
1405 task_instantiation_list=task_instantiation_list,
1406 task_instantiation_info=task_instantiation_info
1407 )
1408
1409 # Check if this NS has a charm configuration
1410 descriptor_config = nsd.get("ns-configuration")
1411 if descriptor_config and descriptor_config.get("juju"):
1412 vnfd_id = None
1413 db_vnfr = None
1414 member_vnf_index = None
1415 vdu_id = None
1416 kdu_name = None
1417 vdu_index = 0
1418 vdu_name = None
1419
1420 # Get additional parameters
1421 deploy_params = {}
1422 if db_nsr.get("additionalParamsForNs"):
1423 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
1424 base_folder = nsd["_admin"]["storage"]
1425 self._deploy_n2vc(
1426 logging_text=logging_text,
1427 db_nsr=db_nsr,
1428 db_vnfr=db_vnfr,
1429 nslcmop_id=nslcmop_id,
1430 nsr_id=nsr_id,
1431 nsi_id=nsi_id,
1432 vnfd_id=vnfd_id,
1433 vdu_id=vdu_id,
1434 kdu_name=kdu_name,
1435 member_vnf_index=member_vnf_index,
1436 vdu_index=vdu_index,
1437 vdu_name=vdu_name,
1438 deploy_params=deploy_params,
1439 descriptor_config=descriptor_config,
1440 base_folder=base_folder,
1441 task_instantiation_list=task_instantiation_list,
1442 task_instantiation_info=task_instantiation_info
1443 )
1444
1445 # Wait until all tasks of "task_instantiation_list" have been finished
1446
1447 # while time() <= start_deploy + self.total_deploy_timeout:
1448 error_text_list = []
1449 timeout = 3600
1450
1451 # let's begin with all OK
1452 instantiated_ok = True
1453 # let's begin with RO 'running' status (later we can change it)
1454 db_nsr_update["operational-status"] = "running"
1455 # let's begin with VCA 'configured' status (later we can change it)
1456 db_nsr_update["config-status"] = "configured"
1457
1458 if task_instantiation_list:
1459 # wait for all tasks completion
1460 done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout)
1461
1462 for task in pending:
1463 instantiated_ok = False
1464 if task == task_ro:
1465 db_nsr_update["operational-status"] = "failed"
1466 else:
1467 db_nsr_update["config-status"] = "failed"
1468 self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
1469 error_text_list.append(task_instantiation_info[task] + ": Timeout")
1470 for task in done:
1471 if task.cancelled():
1472 instantiated_ok = False
1473 if task == task_ro:
1474 db_nsr_update["operational-status"] = "failed"
1475 else:
1476 db_nsr_update["config-status"] = "failed"
1477 self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
1478 error_text_list.append(task_instantiation_info[task] + ": Cancelled")
1479 else:
1480 exc = task.exception()
1481 if exc:
1482 instantiated_ok = False
1483 if task == task_ro:
1484 db_nsr_update["operational-status"] = "failed"
1485 else:
1486 db_nsr_update["config-status"] = "failed"
1487 self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
1488 if isinstance(exc, (N2VCException, ROclient.ROClientException)):
1489 error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
1490 else:
1491 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
1492 self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
1493 error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
1494 else:
1495 self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
1496
1497 if error_text_list:
1498 error_text = "\n".join(error_text_list)
1499 db_nsr_update["detailed-status"] = error_text
1500 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1501 db_nslcmop_update["detailed-status"] = error_text
1502 db_nslcmop_update["statusEnteredTime"] = time()
1503 else:
1504 # all is done
1505 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1506 db_nslcmop_update["statusEnteredTime"] = time()
1507 db_nslcmop_update["detailed-status"] = "done"
1508 db_nsr_update["detailed-status"] = "done"
1509
1510 except (ROclient.ROClientException, DbException, LcmException) as e:
1511 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1512 exc = e
1513 except asyncio.CancelledError:
1514 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1515 exc = "Operation was cancelled"
1516 except Exception as e:
1517 exc = traceback.format_exc()
1518 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1519 exc_info=True)
1520 finally:
1521 if exc:
1522 if db_nsr:
1523 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1524 db_nsr_update["operational-status"] = "failed"
1525 db_nsr_update["config-status"] = "failed"
1526 if db_nslcmop:
1527 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1528 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1529 db_nslcmop_update["statusEnteredTime"] = time()
1530 try:
1531 if db_nsr:
1532 db_nsr_update["_admin.nslcmop"] = None
1533 db_nsr_update["_admin.current-operation"] = None
1534 db_nsr_update["_admin.operation-type"] = None
1535 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1536
1537 # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
1538 ns_state = None
1539 error_description = None
1540 error_detail = None
1541 if instantiated_ok:
1542 ns_state = "READY"
1543 else:
1544 ns_state = "BROKEN"
1545 error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
1546 error_detail = error_text
1547 self._write_ns_status(
1548 nsr_id=nsr_id,
1549 ns_state=ns_state,
1550 current_operation="IDLE",
1551 current_operation_id=None,
1552 error_description=error_description,
1553 error_detail=error_detail
1554 )
1555
1556 if db_nslcmop_update:
1557 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1558 except DbException as e:
1559 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1560 if nslcmop_operation_state:
1561 try:
1562 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1563 "operationState": nslcmop_operation_state},
1564 loop=self.loop)
1565 except Exception as e:
1566 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1567
1568 self.logger.debug(logging_text + "Exit")
1569 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1570
1571 async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
1572 # Launch kdus if present in the descriptor
1573
1574 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
1575
1576 def _get_cluster_id(cluster_id, cluster_type):
1577 nonlocal k8scluster_id_2_uuic
1578 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
1579 return k8scluster_id_2_uuic[cluster_type][cluster_id]
1580
1581 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
1582 if not db_k8scluster:
1583 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
1584 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
1585 if not k8s_id:
1586 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
1587 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
1588 return k8s_id
1589
1590 logging_text += "Deploy kdus: "
1591 try:
1592 db_nsr_update = {"_admin.deployed.K8s": []}
1593 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1594
1595 # Look for all vnfds
1596 pending_tasks = {}
1597 index = 0
1598 for vnfr_data in db_vnfrs.values():
1599 for kdur in get_iterable(vnfr_data, "kdur"):
1600 desc_params = self._format_additional_params(kdur.get("additionalParams"))
1601 kdumodel = None
1602 k8sclustertype = None
1603 error_text = None
1604 cluster_uuid = None
1605 if kdur.get("helm-chart"):
1606 kdumodel = kdur["helm-chart"]
1607 k8sclustertype = "chart"
1608 k8sclustertype_full = "helm-chart"
1609 elif kdur.get("juju-bundle"):
1610 kdumodel = kdur["juju-bundle"]
1611 k8sclustertype = "juju"
1612 k8sclustertype_full = "juju-bundle"
1613 else:
1614 error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
1615 " running"
1616 try:
1617 if not error_text:
1618 cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
1619 except LcmException as e:
1620 error_text = str(e)
1621 step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
1622
1623 k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
1624 "k8scluster-type": k8sclustertype,
1625 "kdu-name": kdur["kdu-name"], "kdu-model": kdumodel}
1626 if error_text:
1627 k8s_instace_info["detailed-status"] = error_text
1628 db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
1629 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1630 if error_text:
1631 continue
1632
1633 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id}, "path": "_admin.deployed.K8s."
1634 "{}".format(index)}
1635 if k8sclustertype == "chart":
1636 task = asyncio.ensure_future(
1637 self.k8sclusterhelm.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel, atomic=True,
1638 params=desc_params, db_dict=db_dict, timeout=3600)
1639 )
1640 else:
1641 task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
1642 atomic=True, params=desc_params,
1643 db_dict=db_dict, timeout=600)
1644
1645 pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
1646 index += 1
1647 if not pending_tasks:
1648 return
1649 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
1650 pending_list = list(pending_tasks.keys())
1651 while pending_list:
1652 done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
1653 return_when=asyncio.FIRST_COMPLETED)
1654 if not done_list: # timeout
1655 for task in pending_list:
1656 db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
1657 break
1658 for task in done_list:
1659 exc = task.exception()
1660 if exc:
1661 db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
1662 else:
1663 db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
1664
1665 except Exception as e:
1666 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
1667 raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
1668 finally:
1669 # TODO Write in data base
1670 if db_nsr_update:
1671 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1672
1673 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
1674 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
1675 base_folder, task_instantiation_list, task_instantiation_info):
1676 # launch instantiate_N2VC in a asyncio task and register task object
1677 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1678 # if not found, create one entry and update database
1679
1680 # fill db_nsr._admin.deployed.VCA.<index>
1681 vca_index = -1
1682 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1683 if not vca_deployed:
1684 continue
1685 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1686 vca_deployed.get("vdu_id") == vdu_id and \
1687 vca_deployed.get("kdu_name") == kdu_name and \
1688 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1689 break
1690 else:
1691 # not found, create one.
1692 vca_deployed = {
1693 "member-vnf-index": member_vnf_index,
1694 "vdu_id": vdu_id,
1695 "kdu_name": kdu_name,
1696 "vdu_count_index": vdu_index,
1697 "operational-status": "init", # TODO revise
1698 "detailed-status": "", # TODO revise
1699 "step": "initial-deploy", # TODO revise
1700 "vnfd_id": vnfd_id,
1701 "vdu_name": vdu_name,
1702 }
1703 vca_index += 1
1704 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1705 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1706
1707 # Launch task
1708 task_n2vc = asyncio.ensure_future(
1709 self.instantiate_N2VC(
1710 logging_text=logging_text,
1711 vca_index=vca_index,
1712 nsi_id=nsi_id,
1713 db_nsr=db_nsr,
1714 db_vnfr=db_vnfr,
1715 vdu_id=vdu_id,
1716 kdu_name=kdu_name,
1717 vdu_index=vdu_index,
1718 deploy_params=deploy_params,
1719 config_descriptor=descriptor_config,
1720 base_folder=base_folder,
1721 )
1722 )
1723 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1724 task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
1725 task_instantiation_list.append(task_n2vc)
1726
1727 # Check if this VNFD has a configured terminate action
1728 def _has_terminate_config_primitive(self, vnfd):
1729 vnf_config = vnfd.get("vnf-configuration")
1730 if vnf_config and vnf_config.get("terminate-config-primitive"):
1731 return True
1732 else:
1733 return False
1734
1735 @staticmethod
1736 def _get_terminate_config_primitive_seq_list(vnfd):
1737 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1738 # No need to check for existing primitive twice, already done before
1739 vnf_config = vnfd.get("vnf-configuration")
1740 seq_list = vnf_config.get("terminate-config-primitive")
1741 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1742 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1743 return seq_list_sorted
1744
1745 @staticmethod
1746 def _create_nslcmop(nsr_id, operation, params):
1747 """
1748 Creates a ns-lcm-opp content to be stored at database.
1749 :param nsr_id: internal id of the instance
1750 :param operation: instantiate, terminate, scale, action, ...
1751 :param params: user parameters for the operation
1752 :return: dictionary following SOL005 format
1753 """
1754 # Raise exception if invalid arguments
1755 if not (nsr_id and operation and params):
1756 raise LcmException(
1757 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1758 now = time()
1759 _id = str(uuid4())
1760 nslcmop = {
1761 "id": _id,
1762 "_id": _id,
1763 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1764 "operationState": "PROCESSING",
1765 "statusEnteredTime": now,
1766 "nsInstanceId": nsr_id,
1767 "lcmOperationType": operation,
1768 "startTime": now,
1769 "isAutomaticInvocation": False,
1770 "operationParams": params,
1771 "isCancelPending": False,
1772 "links": {
1773 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1774 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1775 }
1776 }
1777 return nslcmop
1778
1779 def _format_additional_params(self, params):
1780 params = params or {}
1781 for key, value in params.items():
1782 if str(value).startswith("!!yaml "):
1783 params[key] = yaml.safe_load(value[7:])
1784 return params
1785
1786 def _get_terminate_primitive_params(self, seq, vnf_index):
1787 primitive = seq.get('name')
1788 primitive_params = {}
1789 params = {
1790 "member_vnf_index": vnf_index,
1791 "primitive": primitive,
1792 "primitive_params": primitive_params,
1793 }
1794 desc_params = {}
1795 return self._map_primitive_params(seq, params, desc_params)
1796
1797 # sub-operations
1798
1799 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1800 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1801 if (op.get('operationState') == 'COMPLETED'):
1802 # b. Skip sub-operation
1803 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1804 return self.SUBOPERATION_STATUS_SKIP
1805 else:
1806 # c. Reintent executing sub-operation
1807 # The sub-operation exists, and operationState != 'COMPLETED'
1808 # Update operationState = 'PROCESSING' to indicate a reintent.
1809 operationState = 'PROCESSING'
1810 detailed_status = 'In progress'
1811 self._update_suboperation_status(
1812 db_nslcmop, op_index, operationState, detailed_status)
1813 # Return the sub-operation index
1814 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1815 # with arguments extracted from the sub-operation
1816 return op_index
1817
1818 # Find a sub-operation where all keys in a matching dictionary must match
1819 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1820 def _find_suboperation(self, db_nslcmop, match):
1821 if (db_nslcmop and match):
1822 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1823 for i, op in enumerate(op_list):
1824 if all(op.get(k) == match[k] for k in match):
1825 return i
1826 return self.SUBOPERATION_STATUS_NOT_FOUND
1827
1828 # Update status for a sub-operation given its index
1829 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1830 # Update DB for HA tasks
1831 q_filter = {'_id': db_nslcmop['_id']}
1832 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1833 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1834 self.db.set_one("nslcmops",
1835 q_filter=q_filter,
1836 update_dict=update_dict,
1837 fail_on_empty=False)
1838
1839 # Add sub-operation, return the index of the added sub-operation
1840 # Optionally, set operationState, detailed-status, and operationType
1841 # Status and type are currently set for 'scale' sub-operations:
1842 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1843 # 'detailed-status' : status message
1844 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1845 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1846 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
1847 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
1848 RO_nsr_id=None, RO_scaling_info=None):
1849 if not (db_nslcmop):
1850 return self.SUBOPERATION_STATUS_NOT_FOUND
1851 # Get the "_admin.operations" list, if it exists
1852 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1853 op_list = db_nslcmop_admin.get('operations')
1854 # Create or append to the "_admin.operations" list
1855 new_op = {'member_vnf_index': vnf_index,
1856 'vdu_id': vdu_id,
1857 'vdu_count_index': vdu_count_index,
1858 'primitive': primitive,
1859 'primitive_params': mapped_primitive_params}
1860 if operationState:
1861 new_op['operationState'] = operationState
1862 if detailed_status:
1863 new_op['detailed-status'] = detailed_status
1864 if operationType:
1865 new_op['lcmOperationType'] = operationType
1866 if RO_nsr_id:
1867 new_op['RO_nsr_id'] = RO_nsr_id
1868 if RO_scaling_info:
1869 new_op['RO_scaling_info'] = RO_scaling_info
1870 if not op_list:
1871 # No existing operations, create key 'operations' with current operation as first list element
1872 db_nslcmop_admin.update({'operations': [new_op]})
1873 op_list = db_nslcmop_admin.get('operations')
1874 else:
1875 # Existing operations, append operation to list
1876 op_list.append(new_op)
1877
1878 db_nslcmop_update = {'_admin.operations': op_list}
1879 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1880 op_index = len(op_list) - 1
1881 return op_index
1882
1883 # Helper methods for scale() sub-operations
1884
1885 # pre-scale/post-scale:
1886 # Check for 3 different cases:
1887 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1888 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1889 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1890 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
1891 operationType, RO_nsr_id=None, RO_scaling_info=None):
1892 # Find this sub-operation
1893 if (RO_nsr_id and RO_scaling_info):
1894 operationType = 'SCALE-RO'
1895 match = {
1896 'member_vnf_index': vnf_index,
1897 'RO_nsr_id': RO_nsr_id,
1898 'RO_scaling_info': RO_scaling_info,
1899 }
1900 else:
1901 match = {
1902 'member_vnf_index': vnf_index,
1903 'primitive': vnf_config_primitive,
1904 'primitive_params': primitive_params,
1905 'lcmOperationType': operationType
1906 }
1907 op_index = self._find_suboperation(db_nslcmop, match)
1908 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1909 # a. New sub-operation
1910 # The sub-operation does not exist, add it.
1911 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1912 # The following parameters are set to None for all kind of scaling:
1913 vdu_id = None
1914 vdu_count_index = None
1915 vdu_name = None
1916 if (RO_nsr_id and RO_scaling_info):
1917 vnf_config_primitive = None
1918 primitive_params = None
1919 else:
1920 RO_nsr_id = None
1921 RO_scaling_info = None
1922 # Initial status for sub-operation
1923 operationState = 'PROCESSING'
1924 detailed_status = 'In progress'
1925 # Add sub-operation for pre/post-scaling (zero or more operations)
1926 self._add_suboperation(db_nslcmop,
1927 vnf_index,
1928 vdu_id,
1929 vdu_count_index,
1930 vdu_name,
1931 vnf_config_primitive,
1932 primitive_params,
1933 operationState,
1934 detailed_status,
1935 operationType,
1936 RO_nsr_id,
1937 RO_scaling_info)
1938 return self.SUBOPERATION_STATUS_NEW
1939 else:
1940 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1941 # or op_index (operationState != 'COMPLETED')
1942 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1943
1944 # Helper methods for terminate()
1945
1946 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1947 """ Create a primitive with params from VNFD
1948 Called from terminate() before deleting instance
1949 Calls action() to execute the primitive """
1950 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1951 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1952 db_vnfds = {}
1953 # Loop over VNFRs
1954 for vnfr in db_vnfrs_list:
1955 vnfd_id = vnfr["vnfd-id"]
1956 vnf_index = vnfr["member-vnf-index-ref"]
1957 if vnfd_id not in db_vnfds:
1958 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1959 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1960 db_vnfds[vnfd_id] = vnfd
1961 vnfd = db_vnfds[vnfd_id]
1962 if not self._has_terminate_config_primitive(vnfd):
1963 continue
1964 # Get the primitive's sorted sequence list
1965 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
1966 for seq in seq_list:
1967 # For each sequence in list, get primitive and call _ns_execute_primitive()
1968 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
1969 vnf_index, seq.get("name"))
1970 self.logger.debug(logging_text + step)
1971 # Create the primitive for each sequence, i.e. "primitive": "touch"
1972 primitive = seq.get('name')
1973 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
1974 # The following 3 parameters are currently set to None for 'terminate':
1975 # vdu_id, vdu_count_index, vdu_name
1976 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1977 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1978 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1979 # Add sub-operation
1980 self._add_suboperation(db_nslcmop,
1981 nslcmop_id,
1982 vnf_index,
1983 vdu_id,
1984 vdu_count_index,
1985 vdu_name,
1986 primitive,
1987 mapped_primitive_params)
1988 # Sub-operations: Call _ns_execute_primitive() instead of action()
1989 # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1990 # nsr_deployed = db_nsr["_admin"]["deployed"]
1991
1992 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
1993 # nsr_id, nslcmop_terminate_action_id)
1994 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
1995 # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
1996 # if result not in result_ok:
1997 # raise LcmException(
1998 # "terminate_primitive_action for vnf_member_index={}",
1999 # " primitive={} fails with error {}".format(
2000 # vnf_index, seq.get("name"), result_detail))
2001
2002 # TODO: find ee_id
2003 ee_id = None
2004 try:
2005 await self.n2vc.exec_primitive(
2006 ee_id=ee_id,
2007 primitive_name=primitive,
2008 params_dict=mapped_primitive_params
2009 )
2010 except Exception as e:
2011 self.logger.error('Error executing primitive {}: {}'.format(primitive, e))
2012 raise LcmException(
2013 "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}"
2014 .format(vnf_index, seq.get("name"), e),
2015 )
2016
2017 async def terminate(self, nsr_id, nslcmop_id):
2018
2019 # Try to lock HA task here
2020 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2021 if not task_is_locked_by_me:
2022 return
2023
2024 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2025 self.logger.debug(logging_text + "Enter")
2026 db_nsr = None
2027 db_nslcmop = None
2028 exc = None
2029 failed_detail = [] # annotates all failed error messages
2030 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2031 "_admin.current-operation": nslcmop_id,
2032 "_admin.operation-type": "terminate"}
2033 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2034 db_nslcmop_update = {}
2035 nslcmop_operation_state = None
2036 autoremove = False # autoremove after terminated
2037 pending_tasks = []
2038 try:
2039 # wait for any previous tasks in process
2040 step = "Waiting for previous operations to terminate"
2041 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2042
2043 self._write_ns_status(
2044 nsr_id=nsr_id,
2045 ns_state="TERMINATING",
2046 current_operation="TERMINATING",
2047 current_operation_id=nslcmop_id
2048 )
2049
2050 step = "Getting nslcmop={} from db".format(nslcmop_id)
2051 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2052 step = "Getting nsr={} from db".format(nsr_id)
2053 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2054 # nsd = db_nsr["nsd"]
2055 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
2056 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
2057 return
2058 # #TODO check if VIM is creating and wait
2059 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
2060 # Call internal terminate action
2061 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
2062
2063 pending_tasks = []
2064
2065 db_nsr_update["operational-status"] = "terminating"
2066 db_nsr_update["config-status"] = "terminating"
2067
2068 # remove NS
2069 try:
2070 step = "delete execution environment"
2071 self.logger.debug(logging_text + step)
2072
2073 task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
2074 pending_tasks.append(task_delete_ee)
2075 except Exception as e:
2076 msg = "Failed while deleting NS in VCA: {}".format(e)
2077 self.logger.error(msg)
2078 failed_detail.append(msg)
2079
2080 try:
2081 # Delete from k8scluster
2082 step = "delete kdus"
2083 self.logger.debug(logging_text + step)
2084 # print(nsr_deployed)
2085 if nsr_deployed:
2086 for kdu in nsr_deployed.get("K8s", ()):
2087 kdu_instance = kdu.get("kdu-instance")
2088 if not kdu_instance:
2089 continue
2090 if kdu.get("k8scluster-type") == "chart":
2091 task_delete_kdu_instance = asyncio.ensure_future(
2092 self.k8sclusterhelm.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2093 kdu_instance=kdu_instance))
2094 elif kdu.get("k8scluster-type") == "juju":
2095 task_delete_kdu_instance = asyncio.ensure_future(
2096 self.k8sclusterjuju.uninstall(cluster_uuid=kdu.get("k8scluster-uuid"),
2097 kdu_instance=kdu_instance))
2098 else:
2099 self.error(logging_text + "Unknown k8s deployment type {}".
2100 format(kdu.get("k8scluster-type")))
2101 continue
2102 pending_tasks.append(task_delete_kdu_instance)
2103 except LcmException as e:
2104 msg = "Failed while deleting KDUs from NS: {}".format(e)
2105 self.logger.error(msg)
2106 failed_detail.append(msg)
2107
2108 # remove from RO
2109 RO_fail = False
2110
2111 # Delete ns
2112 RO_nsr_id = RO_delete_action = None
2113 if nsr_deployed and nsr_deployed.get("RO"):
2114 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
2115 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2116 try:
2117 if RO_nsr_id:
2118 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
2119 "Deleting ns from VIM"
2120 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2121 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2122 self.logger.debug(logging_text + step)
2123 desc = await self.RO.delete("ns", RO_nsr_id)
2124 RO_delete_action = desc["action_id"]
2125 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
2126 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2127 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2128 if RO_delete_action:
2129 # wait until NS is deleted from VIM
2130 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
2131 format(RO_nsr_id, RO_delete_action)
2132 detailed_status_old = None
2133 self.logger.debug(logging_text + step)
2134
2135 delete_timeout = 20 * 60 # 20 minutes
2136 while delete_timeout > 0:
2137 desc = await self.RO.show(
2138 "ns",
2139 item_id_name=RO_nsr_id,
2140 extra_item="action",
2141 extra_item_id=RO_delete_action)
2142 ns_status, ns_status_info = self.RO.check_action_status(desc)
2143 if ns_status == "ERROR":
2144 raise ROclient.ROClientException(ns_status_info)
2145 elif ns_status == "BUILD":
2146 detailed_status = step + "; {}".format(ns_status_info)
2147 elif ns_status == "ACTIVE":
2148 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2149 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2150 break
2151 else:
2152 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2153 if detailed_status != detailed_status_old:
2154 detailed_status_old = db_nslcmop_update["detailed-status"] = \
2155 db_nsr_update["detailed-status"] = detailed_status
2156 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2158 await asyncio.sleep(5, loop=self.loop)
2159 delete_timeout -= 5
2160 else: # delete_timeout <= 0:
2161 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2162
2163 except ROclient.ROClientException as e:
2164 if e.http_code == 404: # not found
2165 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2166 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2167 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2168 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
2169 elif e.http_code == 409: # conflict
2170 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
2171 self.logger.debug(logging_text + failed_detail[-1])
2172 RO_fail = True
2173 else:
2174 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
2175 self.logger.error(logging_text + failed_detail[-1])
2176 RO_fail = True
2177
2178 # Delete nsd
2179 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2180 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2181 try:
2182 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2183 "Deleting nsd from RO"
2184 await self.RO.delete("nsd", RO_nsd_id)
2185 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2186 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2187 except ROclient.ROClientException as e:
2188 if e.http_code == 404: # not found
2189 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2190 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2191 elif e.http_code == 409: # conflict
2192 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2193 self.logger.debug(logging_text + failed_detail[-1])
2194 RO_fail = True
2195 else:
2196 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2197 self.logger.error(logging_text + failed_detail[-1])
2198 RO_fail = True
2199
2200 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2201 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2202 if not vnf_deployed or not vnf_deployed["id"]:
2203 continue
2204 try:
2205 RO_vnfd_id = vnf_deployed["id"]
2206 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2207 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2208 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2209 await self.RO.delete("vnfd", RO_vnfd_id)
2210 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2211 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2212 except ROclient.ROClientException as e:
2213 if e.http_code == 404: # not found
2214 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2215 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2216 elif e.http_code == 409: # conflict
2217 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2218 self.logger.debug(logging_text + failed_detail[-1])
2219 else:
2220 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2221 self.logger.error(logging_text + failed_detail[-1])
2222
2223 if failed_detail:
2224 terminate_ok = False
2225 self.logger.error(logging_text + " ;".join(failed_detail))
2226 db_nsr_update["operational-status"] = "failed"
2227 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2228 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2229 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2230 db_nslcmop_update["statusEnteredTime"] = time()
2231 else:
2232 terminate_ok = True
2233 db_nsr_update["operational-status"] = "terminated"
2234 db_nsr_update["detailed-status"] = "Done"
2235 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2236 db_nslcmop_update["detailed-status"] = "Done"
2237 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2238 db_nslcmop_update["statusEnteredTime"] = time()
2239 if db_nslcmop["operationParams"].get("autoremove"):
2240 autoremove = True
2241
2242 except (ROclient.ROClientException, DbException, LcmException) as e:
2243 self.logger.error(logging_text + "Exit Exception {}".format(e))
2244 exc = e
2245 except asyncio.CancelledError:
2246 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2247 exc = "Operation was cancelled"
2248 except Exception as e:
2249 exc = traceback.format_exc()
2250 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2251 finally:
2252 if exc and db_nslcmop:
2253 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2254 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2255 db_nslcmop_update["statusEnteredTime"] = time()
2256 try:
2257 if db_nslcmop and db_nslcmop_update:
2258 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2259 if db_nsr:
2260 db_nsr_update["_admin.nslcmop"] = None
2261 db_nsr_update["_admin.current-operation"] = None
2262 db_nsr_update["_admin.operation-type"] = None
2263 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2264
2265 if terminate_ok:
2266 ns_state = "IDLE"
2267 error_description = None
2268 error_detail = None
2269 else:
2270 ns_state = "BROKEN"
2271 error_description = 'Operation: TERMINATING.{}, step: {}'.format(nslcmop_id, step)
2272 error_detail = "; ".join(failed_detail)
2273
2274 self._write_ns_status(
2275 nsr_id=nsr_id,
2276 ns_state=ns_state,
2277 current_operation="IDLE",
2278 current_operation_id=None,
2279 error_description=error_description,
2280 error_detail=error_detail
2281 )
2282
2283 except DbException as e:
2284 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2285 if nslcmop_operation_state:
2286 try:
2287 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2288 "operationState": nslcmop_operation_state,
2289 "autoremove": autoremove},
2290 loop=self.loop)
2291 except Exception as e:
2292 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2293
2294 # wait for pending tasks
2295 done = None
2296 pending = None
2297 if pending_tasks:
2298 self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
2299 done, pending = await asyncio.wait(pending_tasks, timeout=3600)
2300 if not pending:
2301 self.logger.debug(logging_text + 'All tasks finished...')
2302 else:
2303 self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending))
2304
2305 self.logger.debug(logging_text + "Exit")
2306 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2307
2308 @staticmethod
2309 def _map_primitive_params(primitive_desc, params, instantiation_params):
2310 """
2311 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2312 The default-value is used. If it is between < > it look for a value at instantiation_params
2313 :param primitive_desc: portion of VNFD/NSD that describes primitive
2314 :param params: Params provided by user
2315 :param instantiation_params: Instantiation params provided by user
2316 :return: a dictionary with the calculated params
2317 """
2318 calculated_params = {}
2319 for parameter in primitive_desc.get("parameter", ()):
2320 param_name = parameter["name"]
2321 if param_name in params:
2322 calculated_params[param_name] = params[param_name]
2323 elif "default-value" in parameter or "value" in parameter:
2324 if "value" in parameter:
2325 calculated_params[param_name] = parameter["value"]
2326 else:
2327 calculated_params[param_name] = parameter["default-value"]
2328 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2329 and calculated_params[param_name].endswith(">"):
2330 if calculated_params[param_name][1:-1] in instantiation_params:
2331 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2332 else:
2333 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2334 format(calculated_params[param_name], primitive_desc["name"]))
2335 else:
2336 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2337 format(param_name, primitive_desc["name"]))
2338
2339 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2340 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2341 width=256)
2342 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2343 calculated_params[param_name] = calculated_params[param_name][7:]
2344
2345 # add always ns_config_info if primitive name is config
2346 if primitive_desc["name"] == "config":
2347 if "ns_config_info" in instantiation_params:
2348 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
2349 return calculated_params
2350
2351 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2352 primitive, primitive_params, retries=0, retries_interval=30) -> (str, str):
2353
2354 # find vca_deployed record for this action
2355 try:
2356 for vca_deployed in db_deployed["VCA"]:
2357 if not vca_deployed:
2358 continue
2359 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2360 continue
2361 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2362 continue
2363 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2364 continue
2365 break
2366 else:
2367 # vca_deployed not found
2368 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2369 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2370
2371 # get ee_id
2372 ee_id = vca_deployed.get("ee_id")
2373 if not ee_id:
2374 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2375 "execution environment"
2376 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2377
2378 if primitive == "config":
2379 primitive_params = {"params": primitive_params}
2380
2381 while retries >= 0:
2382 try:
2383 output = await self.n2vc.exec_primitive(
2384 ee_id=ee_id,
2385 primitive_name=primitive,
2386 params_dict=primitive_params
2387 )
2388 # execution was OK
2389 break
2390 except Exception as e:
2391 retries -= 1
2392 if retries >= 0:
2393 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
2394 # wait and retry
2395 await asyncio.sleep(retries_interval, loop=self.loop)
2396 else:
2397 return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL'
2398
2399 return output, 'OK'
2400
2401 except Exception as e:
2402 return 'Error executing action {}: {}'.format(primitive, e), 'FAIL'
2403
2404 async def action(self, nsr_id, nslcmop_id):
2405
2406 # Try to lock HA task here
2407 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2408 if not task_is_locked_by_me:
2409 return
2410
2411 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2412 self.logger.debug(logging_text + "Enter")
2413 # get all needed from database
2414 db_nsr = None
2415 db_nslcmop = None
2416 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2417 "_admin.current-operation": nslcmop_id,
2418 "_admin.operation-type": "action"}
2419 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2420 db_nslcmop_update = {}
2421 nslcmop_operation_state = None
2422 nslcmop_operation_state_detail = None
2423 exc = None
2424 try:
2425 # wait for any previous tasks in process
2426 step = "Waiting for previous operations to terminate"
2427 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2428
2429 self._write_ns_status(
2430 nsr_id=nsr_id,
2431 ns_state=None,
2432 current_operation="RUNNING ACTION",
2433 current_operation_id=nslcmop_id
2434 )
2435
2436 step = "Getting information from database"
2437 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2438 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2439
2440 nsr_deployed = db_nsr["_admin"].get("deployed")
2441 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2442 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2443 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
2444 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2445 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2446
2447 if vnf_index:
2448 step = "Getting vnfr from database"
2449 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2450 step = "Getting vnfd from database"
2451 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2452 else:
2453 if db_nsr.get("nsd"):
2454 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2455 else:
2456 step = "Getting nsd from database"
2457 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2458
2459 # for backward compatibility
2460 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2461 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2462 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2463 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2464
2465 primitive = db_nslcmop["operationParams"]["primitive"]
2466 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2467
2468 # look for primitive
2469 config_primitive_desc = None
2470 if vdu_id:
2471 for vdu in get_iterable(db_vnfd, "vdu"):
2472 if vdu_id == vdu["id"]:
2473 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2474 if config_primitive["name"] == primitive:
2475 config_primitive_desc = config_primitive
2476 break
2477 elif kdu_name:
2478 self.logger.debug(logging_text + "Checking actions in KDUs")
2479 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
2480 desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
2481 if primitive_params:
2482 desc_params.update(primitive_params)
2483 # TODO Check if we will need something at vnf level
2484 index = 0
2485 for kdu in get_iterable(nsr_deployed, "K8s"):
2486 if kdu_name == kdu["kdu-name"]:
2487 db_dict = {"collection": "nsrs", "filter": {"_id": nsr_id},
2488 "path": "_admin.deployed.K8s.{}".format(index)}
2489 if primitive == "upgrade":
2490 if desc_params.get("kdu_model"):
2491 kdu_model = desc_params.get("kdu_model")
2492 del desc_params["kdu_model"]
2493 else:
2494 kdu_model = kdu.get("kdu-model")
2495 parts = kdu_model.split(sep=":")
2496 if len(parts) == 2:
2497 kdu_model = parts[0]
2498
2499 if kdu.get("k8scluster-type") == "chart":
2500 output = await self.k8sclusterhelm.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2501 kdu_instance=kdu.get("kdu-instance"),
2502 atomic=True, kdu_model=kdu_model,
2503 params=desc_params, db_dict=db_dict,
2504 timeout=300)
2505 elif kdu.get("k8scluster-type") == "juju":
2506 output = await self.k8sclusterjuju.upgrade(cluster_uuid=kdu.get("k8scluster-uuid"),
2507 kdu_instance=kdu.get("kdu-instance"),
2508 atomic=True, kdu_model=kdu_model,
2509 params=desc_params, db_dict=db_dict,
2510 timeout=300)
2511
2512 else:
2513 msg = "k8scluster-type not defined"
2514 raise LcmException(msg)
2515
2516 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(output))
2517 break
2518 elif primitive == "rollback":
2519 if kdu.get("k8scluster-type") == "chart":
2520 output = await self.k8sclusterhelm.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2521 kdu_instance=kdu.get("kdu-instance"),
2522 db_dict=db_dict)
2523 elif kdu.get("k8scluster-type") == "juju":
2524 output = await self.k8sclusterjuju.rollback(cluster_uuid=kdu.get("k8scluster-uuid"),
2525 kdu_instance=kdu.get("kdu-instance"),
2526 db_dict=db_dict)
2527 else:
2528 msg = "k8scluster-type not defined"
2529 raise LcmException(msg)
2530 break
2531 elif primitive == "status":
2532 if kdu.get("k8scluster-type") == "chart":
2533 output = await self.k8sclusterhelm.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2534 kdu_instance=kdu.get("kdu-instance"))
2535 elif kdu.get("k8scluster-type") == "juju":
2536 output = await self.k8sclusterjuju.status_kdu(cluster_uuid=kdu.get("k8scluster-uuid"),
2537 kdu_instance=kdu.get("kdu-instance"))
2538 else:
2539 msg = "k8scluster-type not defined"
2540 raise LcmException(msg)
2541 break
2542 index += 1
2543
2544 else:
2545 raise LcmException("KDU '{}' not found".format(kdu_name))
2546 if output:
2547 db_nslcmop_update["detailed-status"] = output
2548 db_nslcmop_update["operationState"] = 'COMPLETED'
2549 db_nslcmop_update["statusEnteredTime"] = time()
2550 else:
2551 db_nslcmop_update["detailed-status"] = ''
2552 db_nslcmop_update["operationState"] = 'FAILED'
2553 db_nslcmop_update["statusEnteredTime"] = time()
2554 return
2555 elif vnf_index:
2556 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2557 if config_primitive["name"] == primitive:
2558 config_primitive_desc = config_primitive
2559 break
2560 else:
2561 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2562 if config_primitive["name"] == primitive:
2563 config_primitive_desc = config_primitive
2564 break
2565
2566 if not config_primitive_desc:
2567 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2568 format(primitive))
2569
2570 desc_params = {}
2571 if vnf_index:
2572 if db_vnfr.get("additionalParamsForVnf"):
2573 desc_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"])
2574 if vdu_id:
2575 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2576 if vdur.get("additionalParams"):
2577 desc_params = self._format_additional_params(vdur["additionalParams"])
2578 else:
2579 if db_nsr.get("additionalParamsForNs"):
2580 desc_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"]))
2581
2582 # TODO check if ns is in a proper status
2583 output, detail = await self._ns_execute_primitive(
2584 db_deployed=nsr_deployed,
2585 member_vnf_index=vnf_index,
2586 vdu_id=vdu_id,
2587 vdu_name=vdu_name,
2588 vdu_count_index=vdu_count_index,
2589 primitive=primitive,
2590 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2591
2592 detailed_status = output
2593 if detail == 'OK':
2594 result = 'COMPLETED'
2595 else:
2596 result = 'FAILED'
2597
2598 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status
2599 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2600 db_nslcmop_update["statusEnteredTime"] = time()
2601 self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status))
2602 return # database update is called inside finally
2603
2604 except (DbException, LcmException) as e:
2605 self.logger.error(logging_text + "Exit Exception {}".format(e))
2606 exc = e
2607 except asyncio.CancelledError:
2608 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2609 exc = "Operation was cancelled"
2610 except Exception as e:
2611 exc = traceback.format_exc()
2612 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2613 finally:
2614 if exc and db_nslcmop:
2615 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2616 "FAILED {}: {}".format(step, exc)
2617 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2618 db_nslcmop_update["statusEnteredTime"] = time()
2619 try:
2620 if db_nslcmop_update:
2621 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2622 if db_nsr:
2623 db_nsr_update["_admin.nslcmop"] = None
2624 db_nsr_update["_admin.operation-type"] = None
2625 db_nsr_update["_admin.nslcmop"] = None
2626 db_nsr_update["_admin.current-operation"] = None
2627 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2628 self._write_ns_status(
2629 nsr_id=nsr_id,
2630 ns_state=None,
2631 current_operation="IDLE",
2632 current_operation_id=None
2633 )
2634 except DbException as e:
2635 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2636 self.logger.debug(logging_text + "Exit")
2637 if nslcmop_operation_state:
2638 try:
2639 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2640 "operationState": nslcmop_operation_state},
2641 loop=self.loop)
2642 except Exception as e:
2643 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2644 self.logger.debug(logging_text + "Exit")
2645 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2646 return nslcmop_operation_state, nslcmop_operation_state_detail
2647
2648 async def scale(self, nsr_id, nslcmop_id):
2649
2650 # Try to lock HA task here
2651 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2652 if not task_is_locked_by_me:
2653 return
2654
2655 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2656 self.logger.debug(logging_text + "Enter")
2657 # get all needed from database
2658 db_nsr = None
2659 db_nslcmop = None
2660 db_nslcmop_update = {}
2661 nslcmop_operation_state = None
2662 db_nsr_update = {"_admin.nslcmop": nslcmop_id,
2663 "_admin.current-operation": nslcmop_id,
2664 "_admin.operation-type": "scale"}
2665 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2666 exc = None
2667 # in case of error, indicates what part of scale was failed to put nsr at error status
2668 scale_process = None
2669 old_operational_status = ""
2670 old_config_status = ""
2671 vnfr_scaled = False
2672 try:
2673 # wait for any previous tasks in process
2674 step = "Waiting for previous operations to terminate"
2675 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2676
2677 self._write_ns_status(
2678 nsr_id=nsr_id,
2679 ns_state=None,
2680 current_operation="SCALING",
2681 current_operation_id=nslcmop_id
2682 )
2683
2684 step = "Getting nslcmop from database"
2685 self.logger.debug(step + " after having waited for previous tasks to be completed")
2686 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2687 step = "Getting nsr from database"
2688 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2689
2690 old_operational_status = db_nsr["operational-status"]
2691 old_config_status = db_nsr["config-status"]
2692 step = "Parsing scaling parameters"
2693 # self.logger.debug(step)
2694 db_nsr_update["operational-status"] = "scaling"
2695 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2696 nsr_deployed = db_nsr["_admin"].get("deployed")
2697
2698 #######
2699 nsr_deployed = db_nsr["_admin"].get("deployed")
2700 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2701 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2702 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2703 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2704 #######
2705
2706 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2707 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2708 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2709 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2710 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2711
2712 # for backward compatibility
2713 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2714 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2715 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2716 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2717
2718 step = "Getting vnfr from database"
2719 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2720 step = "Getting vnfd from database"
2721 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2722
2723 step = "Getting scaling-group-descriptor"
2724 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2725 if scaling_descriptor["name"] == scaling_group:
2726 break
2727 else:
2728 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2729 "at vnfd:scaling-group-descriptor".format(scaling_group))
2730
2731 # cooldown_time = 0
2732 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2733 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2734 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2735 # break
2736
2737 # TODO check if ns is in a proper status
2738 step = "Sending scale order to VIM"
2739 nb_scale_op = 0
2740 if not db_nsr["_admin"].get("scaling-group"):
2741 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2742 admin_scale_index = 0
2743 else:
2744 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2745 if admin_scale_info["name"] == scaling_group:
2746 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2747 break
2748 else: # not found, set index one plus last element and add new entry with the name
2749 admin_scale_index += 1
2750 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2751 RO_scaling_info = []
2752 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2753 if scaling_type == "SCALE_OUT":
2754 # count if max-instance-count is reached
2755 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2756 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2757 if nb_scale_op >= max_instance_count:
2758 raise LcmException("reached the limit of {} (max-instance-count) "
2759 "scaling-out operations for the "
2760 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2761
2762 nb_scale_op += 1
2763 vdu_scaling_info["scaling_direction"] = "OUT"
2764 vdu_scaling_info["vdu-create"] = {}
2765 for vdu_scale_info in scaling_descriptor["vdu"]:
2766 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2767 "type": "create", "count": vdu_scale_info.get("count", 1)})
2768 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2769
2770 elif scaling_type == "SCALE_IN":
2771 # count if min-instance-count is reached
2772 min_instance_count = 0
2773 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2774 min_instance_count = int(scaling_descriptor["min-instance-count"])
2775 if nb_scale_op <= min_instance_count:
2776 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2777 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2778 nb_scale_op -= 1
2779 vdu_scaling_info["scaling_direction"] = "IN"
2780 vdu_scaling_info["vdu-delete"] = {}
2781 for vdu_scale_info in scaling_descriptor["vdu"]:
2782 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2783 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2784 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2785
2786 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2787 vdu_create = vdu_scaling_info.get("vdu-create")
2788 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2789 if vdu_scaling_info["scaling_direction"] == "IN":
2790 for vdur in reversed(db_vnfr["vdur"]):
2791 if vdu_delete.get(vdur["vdu-id-ref"]):
2792 vdu_delete[vdur["vdu-id-ref"]] -= 1
2793 vdu_scaling_info["vdu"].append({
2794 "name": vdur["name"],
2795 "vdu_id": vdur["vdu-id-ref"],
2796 "interface": []
2797 })
2798 for interface in vdur["interfaces"]:
2799 vdu_scaling_info["vdu"][-1]["interface"].append({
2800 "name": interface["name"],
2801 "ip_address": interface["ip-address"],
2802 "mac_address": interface.get("mac-address"),
2803 })
2804 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2805
2806 # PRE-SCALE BEGIN
2807 step = "Executing pre-scale vnf-config-primitive"
2808 if scaling_descriptor.get("scaling-config-action"):
2809 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2810 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2811 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2812 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2813 step = db_nslcmop_update["detailed-status"] = \
2814 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2815
2816 # look for primitive
2817 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2818 if config_primitive["name"] == vnf_config_primitive:
2819 break
2820 else:
2821 raise LcmException(
2822 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2823 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2824 "primitive".format(scaling_group, config_primitive))
2825
2826 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2827 if db_vnfr.get("additionalParamsForVnf"):
2828 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2829
2830 scale_process = "VCA"
2831 db_nsr_update["config-status"] = "configuring pre-scaling"
2832 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2833
2834 # Pre-scale reintent check: Check if this sub-operation has been executed before
2835 op_index = self._check_or_add_scale_suboperation(
2836 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2837 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2838 # Skip sub-operation
2839 result = 'COMPLETED'
2840 result_detail = 'Done'
2841 self.logger.debug(logging_text +
2842 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2843 vnf_config_primitive, result, result_detail))
2844 else:
2845 if (op_index == self.SUBOPERATION_STATUS_NEW):
2846 # New sub-operation: Get index of this sub-operation
2847 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2848 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2849 format(vnf_config_primitive))
2850 else:
2851 # Reintent: Get registered params for this existing sub-operation
2852 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2853 vnf_index = op.get('member_vnf_index')
2854 vnf_config_primitive = op.get('primitive')
2855 primitive_params = op.get('primitive_params')
2856 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2857 format(vnf_config_primitive))
2858 # Execute the primitive, either with new (first-time) or registered (reintent) args
2859 result, result_detail = await self._ns_execute_primitive(
2860 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2861 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2862 vnf_config_primitive, result, result_detail))
2863 # Update operationState = COMPLETED | FAILED
2864 self._update_suboperation_status(
2865 db_nslcmop, op_index, result, result_detail)
2866
2867 if result == "FAILED":
2868 raise LcmException(result_detail)
2869 db_nsr_update["config-status"] = old_config_status
2870 scale_process = None
2871 # PRE-SCALE END
2872
2873 # SCALE RO - BEGIN
2874 # Should this block be skipped if 'RO_nsr_id' == None ?
2875 # if (RO_nsr_id and RO_scaling_info):
2876 if RO_scaling_info:
2877 scale_process = "RO"
2878 # Scale RO reintent check: Check if this sub-operation has been executed before
2879 op_index = self._check_or_add_scale_suboperation(
2880 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2881 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2882 # Skip sub-operation
2883 result = 'COMPLETED'
2884 result_detail = 'Done'
2885 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2886 result, result_detail))
2887 else:
2888 if (op_index == self.SUBOPERATION_STATUS_NEW):
2889 # New sub-operation: Get index of this sub-operation
2890 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2891 self.logger.debug(logging_text + "New sub-operation RO")
2892 else:
2893 # Reintent: Get registered params for this existing sub-operation
2894 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2895 RO_nsr_id = op.get('RO_nsr_id')
2896 RO_scaling_info = op.get('RO_scaling_info')
2897 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2898 vnf_config_primitive))
2899
2900 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2901 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2902 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2903 # wait until ready
2904 RO_nslcmop_id = RO_desc["instance_action_id"]
2905 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2906
2907 RO_task_done = False
2908 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2909 detailed_status_old = None
2910 self.logger.debug(logging_text + step)
2911
2912 deployment_timeout = 1 * 3600 # One hour
2913 while deployment_timeout > 0:
2914 if not RO_task_done:
2915 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2916 extra_item_id=RO_nslcmop_id)
2917 ns_status, ns_status_info = self.RO.check_action_status(desc)
2918 if ns_status == "ERROR":
2919 raise ROclient.ROClientException(ns_status_info)
2920 elif ns_status == "BUILD":
2921 detailed_status = step + "; {}".format(ns_status_info)
2922 elif ns_status == "ACTIVE":
2923 RO_task_done = True
2924 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2925 self.logger.debug(logging_text + step)
2926 else:
2927 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2928 else:
2929
2930 if ns_status == "ERROR":
2931 raise ROclient.ROClientException(ns_status_info)
2932 elif ns_status == "BUILD":
2933 detailed_status = step + "; {}".format(ns_status_info)
2934 elif ns_status == "ACTIVE":
2935 step = detailed_status = \
2936 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2937 if not vnfr_scaled:
2938 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2939 vnfr_scaled = True
2940 try:
2941 desc = await self.RO.show("ns", RO_nsr_id)
2942 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2943 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2944 break
2945 except LcmExceptionNoMgmtIP:
2946 pass
2947 else:
2948 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2949 if detailed_status != detailed_status_old:
2950 self._update_suboperation_status(
2951 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2952 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2953 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2954
2955 await asyncio.sleep(5, loop=self.loop)
2956 deployment_timeout -= 5
2957 if deployment_timeout <= 0:
2958 self._update_suboperation_status(
2959 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2960 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2961
2962 # update VDU_SCALING_INFO with the obtained ip_addresses
2963 if vdu_scaling_info["scaling_direction"] == "OUT":
2964 for vdur in reversed(db_vnfr["vdur"]):
2965 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
2966 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
2967 vdu_scaling_info["vdu"].append({
2968 "name": vdur["name"],
2969 "vdu_id": vdur["vdu-id-ref"],
2970 "interface": []
2971 })
2972 for interface in vdur["interfaces"]:
2973 vdu_scaling_info["vdu"][-1]["interface"].append({
2974 "name": interface["name"],
2975 "ip_address": interface["ip-address"],
2976 "mac_address": interface.get("mac-address"),
2977 })
2978 del vdu_scaling_info["vdu-create"]
2979
2980 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
2981 # SCALE RO - END
2982
2983 scale_process = None
2984 if db_nsr_update:
2985 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2986
2987 # POST-SCALE BEGIN
2988 # execute primitive service POST-SCALING
2989 step = "Executing post-scale vnf-config-primitive"
2990 if scaling_descriptor.get("scaling-config-action"):
2991 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2992 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
2993 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
2994 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2995 step = db_nslcmop_update["detailed-status"] = \
2996 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
2997
2998 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2999 if db_vnfr.get("additionalParamsForVnf"):
3000 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3001
3002 # look for primitive
3003 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3004 if config_primitive["name"] == vnf_config_primitive:
3005 break
3006 else:
3007 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
3008 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
3009 "match any vnf-configuration:config-primitive".format(scaling_group,
3010 config_primitive))
3011 scale_process = "VCA"
3012 db_nsr_update["config-status"] = "configuring post-scaling"
3013 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3014
3015 # Post-scale reintent check: Check if this sub-operation has been executed before
3016 op_index = self._check_or_add_scale_suboperation(
3017 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3018 if op_index == self.SUBOPERATION_STATUS_SKIP:
3019 # Skip sub-operation
3020 result = 'COMPLETED'
3021 result_detail = 'Done'
3022 self.logger.debug(logging_text +
3023 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3024 format(vnf_config_primitive, result, result_detail))
3025 else:
3026 if op_index == self.SUBOPERATION_STATUS_NEW:
3027 # New sub-operation: Get index of this sub-operation
3028 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3029 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3030 format(vnf_config_primitive))
3031 else:
3032 # Reintent: Get registered params for this existing sub-operation
3033 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3034 vnf_index = op.get('member_vnf_index')
3035 vnf_config_primitive = op.get('primitive')
3036 primitive_params = op.get('primitive_params')
3037 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
3038 format(vnf_config_primitive))
3039 # Execute the primitive, either with new (first-time) or registered (reintent) args
3040 result, result_detail = await self._ns_execute_primitive(
3041 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
3042 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3043 vnf_config_primitive, result, result_detail))
3044 # Update operationState = COMPLETED | FAILED
3045 self._update_suboperation_status(
3046 db_nslcmop, op_index, result, result_detail)
3047
3048 if result == "FAILED":
3049 raise LcmException(result_detail)
3050 db_nsr_update["config-status"] = old_config_status
3051 scale_process = None
3052 # POST-SCALE END
3053
3054 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
3055 db_nslcmop_update["statusEnteredTime"] = time()
3056 db_nslcmop_update["detailed-status"] = "done"
3057 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3058 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3059 else old_operational_status
3060 db_nsr_update["config-status"] = old_config_status
3061 return
3062 except (ROclient.ROClientException, DbException, LcmException) as e:
3063 self.logger.error(logging_text + "Exit Exception {}".format(e))
3064 exc = e
3065 except asyncio.CancelledError:
3066 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3067 exc = "Operation was cancelled"
3068 except Exception as e:
3069 exc = traceback.format_exc()
3070 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3071 finally:
3072 if exc:
3073 if db_nslcmop:
3074 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
3075 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
3076 db_nslcmop_update["statusEnteredTime"] = time()
3077 if db_nsr:
3078 db_nsr_update["operational-status"] = old_operational_status
3079 db_nsr_update["config-status"] = old_config_status
3080 db_nsr_update["detailed-status"] = ""
3081 db_nsr_update["_admin.nslcmop"] = None
3082 if scale_process:
3083 if "VCA" in scale_process:
3084 db_nsr_update["config-status"] = "failed"
3085 if "RO" in scale_process:
3086 db_nsr_update["operational-status"] = "failed"
3087 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3088 exc)
3089 try:
3090 if db_nslcmop and db_nslcmop_update:
3091 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
3092 if db_nsr:
3093 db_nsr_update["_admin.current-operation"] = None
3094 db_nsr_update["_admin.operation-type"] = None
3095 db_nsr_update["_admin.nslcmop"] = None
3096 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3097
3098 self._write_ns_status(
3099 nsr_id=nsr_id,
3100 ns_state=None,
3101 current_operation="IDLE",
3102 current_operation_id=None
3103 )
3104
3105 except DbException as e:
3106 self.logger.error(logging_text + "Cannot update database: {}".format(e))
3107 if nslcmop_operation_state:
3108 try:
3109 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3110 "operationState": nslcmop_operation_state},
3111 loop=self.loop)
3112 # if cooldown_time:
3113 # await asyncio.sleep(cooldown_time, loop=self.loop)
3114 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
3115 except Exception as e:
3116 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3117 self.logger.debug(logging_text + "Exit")
3118 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")