initial workflow for n2vc redesign
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
25
26 from osm_lcm import ROclient
27 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
28
29 from osm_common.dbbase import DbException
30 from osm_common.fsbase import FsException
31 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed, NetworkServiceDoesNotExist
32
33 from copy import copy, deepcopy
34 from http import HTTPStatus
35 from time import time
36 from uuid import uuid4
37
38 __author__ = "Alfonso Tierno"
39
40
41 def get_iterable(in_dict, in_key):
42 """
43 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
44 :param in_dict: a dictionary
45 :param in_key: the key to look for at in_dict
46 :return: in_dict[in_var] or () if it is None or not present
47 """
48 if not in_dict.get(in_key):
49 return ()
50 return in_dict[in_key]
51
52
53 def populate_dict(target_dict, key_list, value):
54 """
55 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
56 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
57 :param target_dict: dictionary to be changed
58 :param key_list: list of keys to insert at target_dict
59 :param value:
60 :return: None
61 """
62 for key in key_list[0:-1]:
63 if key not in target_dict:
64 target_dict[key] = {}
65 target_dict = target_dict[key]
66 target_dict[key_list[-1]] = value
67
68
69 def deep_get(target_dict, key_list):
70 """
71 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
72 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
73 :param target_dict: dictionary to be read
74 :param key_list: list of keys to read from target_dict
75 :return: The wanted value if exist, None otherwise
76 """
77 for key in key_list:
78 if not isinstance(target_dict, dict) or key not in target_dict:
79 return None
80 target_dict = target_dict[key]
81 return target_dict
82
83
84 class NsLcm(LcmBase):
85 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
86 total_deploy_timeout = 2 * 3600 # global timeout for deployment
87 timeout_charm_delete = 10 * 60
88 timeout_primitive = 10 * 60 # timeout for primitive execution
89
90 SUBOPERATION_STATUS_NOT_FOUND = -1
91 SUBOPERATION_STATUS_NEW = -2
92 SUBOPERATION_STATUS_SKIP = -3
93
94 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
95 """
96 Init, Connect to database, filesystem storage, and messaging
97 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
98 :return: None
99 """
100 # logging
101 self.logger = logging.getLogger('lcm.ns')
102 self.loop = loop
103 self.lcm_tasks = lcm_tasks
104
105 super().__init__(db, msg, fs, self.logger)
106
107 self.ro_config = ro_config
108
109 self.n2vc = N2VC(
110 log=self.logger,
111 server=vca_config['host'],
112 port=vca_config['port'],
113 user=vca_config['user'],
114 secret=vca_config['secret'],
115 # TODO: This should point to the base folder where charms are stored,
116 # if there is a common one (like object storage). Otherwise, leave
117 # it unset and pass it via DeployCharms
118 # artifacts=vca_config[''],
119 artifacts=None,
120 juju_public_key=vca_config.get('pubkey'),
121 ca_cert=vca_config.get('cacert'),
122 api_proxy=vca_config.get('apiproxy')
123 )
124 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
125
126 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
127 """
128 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
129 :param vnfd: input vnfd
130 :param new_id: overrides vnf id if provided
131 :param additionalParams: Instantiation params for VNFs provided
132 :param nsrId: Id of the NSR
133 :return: copy of vnfd
134 """
135 try:
136 vnfd_RO = deepcopy(vnfd)
137 # remove unused by RO configuration, monitoring, scaling and internal keys
138 vnfd_RO.pop("_id", None)
139 vnfd_RO.pop("_admin", None)
140 vnfd_RO.pop("vnf-configuration", None)
141 vnfd_RO.pop("monitoring-param", None)
142 vnfd_RO.pop("scaling-group-descriptor", None)
143 if new_id:
144 vnfd_RO["id"] = new_id
145
146 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
147 for vdu in get_iterable(vnfd_RO, "vdu"):
148 cloud_init_file = None
149 if vdu.get("cloud-init-file"):
150 base_folder = vnfd["_admin"]["storage"]
151 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
152 vdu["cloud-init-file"])
153 with self.fs.file_open(cloud_init_file, "r") as ci_file:
154 cloud_init_content = ci_file.read()
155 vdu.pop("cloud-init-file", None)
156 elif vdu.get("cloud-init"):
157 cloud_init_content = vdu["cloud-init"]
158 else:
159 continue
160
161 env = Environment()
162 ast = env.parse(cloud_init_content)
163 mandatory_vars = meta.find_undeclared_variables(ast)
164 if mandatory_vars:
165 for var in mandatory_vars:
166 if not additionalParams or var not in additionalParams.keys():
167 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
168 "file, must be provided in the instantiation parameters inside the "
169 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
170 template = Template(cloud_init_content)
171 cloud_init_content = template.render(additionalParams or {})
172 vdu["cloud-init"] = cloud_init_content
173
174 return vnfd_RO
175 except FsException as e:
176 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
177 format(vnfd["id"], vdu["id"], cloud_init_file, e))
178 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
179 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
180 format(vnfd["id"], vdu["id"], e))
181
182 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
183 """
184 Callback both for charm status change and task completion
185 :param model_name: Charm model name
186 :param application_name: Charm application name
187 :param status: Can be
188 - blocked: The unit needs manual intervention
189 - maintenance: The unit is actively deploying/configuring
190 - waiting: The unit is waiting for another charm to be ready
191 - active: The unit is deployed, configured, and ready
192 - error: The charm has failed and needs attention.
193 - terminated: The charm has been destroyed
194 - removing,
195 - removed
196 :param message: detailed message error
197 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
198 nsr_id:
199 nslcmop_id:
200 lcmOperationType: currently "instantiate"
201 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
202 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
203 n2vc_event: event used to notify instantiation task that some change has been produced
204 :param task: None for charm status change, or task for completion task callback
205 :return:
206 """
207 try:
208 nsr_id = n2vc_info["nsr_id"]
209 deployed = n2vc_info["deployed"]
210 db_nsr_update = n2vc_info["db_update"]
211 nslcmop_id = n2vc_info["nslcmop_id"]
212 ns_operation = n2vc_info["lcmOperationType"]
213 n2vc_event = n2vc_info["n2vc_event"]
214 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
215 application_name)
216 for vca_index, vca_deployed in enumerate(deployed):
217 if not vca_deployed:
218 continue
219 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
220 break
221 else:
222 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA. Received model_name={}".
223 format(model_name))
224 return
225 if task:
226 if task.cancelled():
227 self.logger.debug(logging_text + " task Cancelled")
228 vca_deployed['operational-status'] = "error"
229 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
230 vca_deployed['detailed-status'] = "Task Cancelled"
231 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
232
233 elif task.done():
234 exc = task.exception()
235 if exc:
236 self.logger.error(logging_text + " task Exception={}".format(exc))
237 vca_deployed['operational-status'] = "error"
238 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
239 vca_deployed['detailed-status'] = str(exc)
240 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
241 else:
242 self.logger.debug(logging_text + " task Done")
243 # task is Done, but callback is still ongoing. So ignore
244 return
245 elif status:
246 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
247 if vca_deployed['operational-status'] == status:
248 return # same status, ignore
249 vca_deployed['operational-status'] = status
250 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
251 vca_deployed['detailed-status'] = str(message)
252 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
253 else:
254 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
255 return
256 # wake up instantiate task
257 n2vc_event.set()
258 except Exception as e:
259 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
260
261 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
262 """
263 Creates a RO ns descriptor from OSM ns_instantiate params
264 :param ns_params: OSM instantiate params
265 :return: The RO ns descriptor
266 """
267 vim_2_RO = {}
268 wim_2_RO = {}
269 # TODO feature 1417: Check that no instantiation is set over PDU
270 # check if PDU forces a concrete vim-network-id and add it
271 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
272
273 def vim_account_2_RO(vim_account):
274 if vim_account in vim_2_RO:
275 return vim_2_RO[vim_account]
276
277 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
278 if db_vim["_admin"]["operationalState"] != "ENABLED":
279 raise LcmException("VIM={} is not available. operationalState={}".format(
280 vim_account, db_vim["_admin"]["operationalState"]))
281 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
282 vim_2_RO[vim_account] = RO_vim_id
283 return RO_vim_id
284
285 def wim_account_2_RO(wim_account):
286 if isinstance(wim_account, str):
287 if wim_account in wim_2_RO:
288 return wim_2_RO[wim_account]
289
290 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
291 if db_wim["_admin"]["operationalState"] != "ENABLED":
292 raise LcmException("WIM={} is not available. operationalState={}".format(
293 wim_account, db_wim["_admin"]["operationalState"]))
294 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
295 wim_2_RO[wim_account] = RO_wim_id
296 return RO_wim_id
297 else:
298 return wim_account
299
300 def ip_profile_2_RO(ip_profile):
301 RO_ip_profile = deepcopy((ip_profile))
302 if "dns-server" in RO_ip_profile:
303 if isinstance(RO_ip_profile["dns-server"], list):
304 RO_ip_profile["dns-address"] = []
305 for ds in RO_ip_profile.pop("dns-server"):
306 RO_ip_profile["dns-address"].append(ds['address'])
307 else:
308 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
309 if RO_ip_profile.get("ip-version") == "ipv4":
310 RO_ip_profile["ip-version"] = "IPv4"
311 if RO_ip_profile.get("ip-version") == "ipv6":
312 RO_ip_profile["ip-version"] = "IPv6"
313 if "dhcp-params" in RO_ip_profile:
314 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
315 return RO_ip_profile
316
317 if not ns_params:
318 return None
319 RO_ns_params = {
320 # "name": ns_params["nsName"],
321 # "description": ns_params.get("nsDescription"),
322 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
323 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
324 # "scenario": ns_params["nsdId"],
325 }
326 n2vc_key_list = n2vc_key_list or []
327 for vnfd_ref, vnfd in vnfd_dict.items():
328 vdu_needed_access = []
329 mgmt_cp = None
330 if vnfd.get("vnf-configuration"):
331 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
332 if ssh_required and vnfd.get("mgmt-interface"):
333 if vnfd["mgmt-interface"].get("vdu-id"):
334 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
335 elif vnfd["mgmt-interface"].get("cp"):
336 mgmt_cp = vnfd["mgmt-interface"]["cp"]
337
338 for vdu in vnfd.get("vdu", ()):
339 if vdu.get("vdu-configuration"):
340 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
341 if ssh_required:
342 vdu_needed_access.append(vdu["id"])
343 elif mgmt_cp:
344 for vdu_interface in vdu.get("interface"):
345 if vdu_interface.get("external-connection-point-ref") and \
346 vdu_interface["external-connection-point-ref"] == mgmt_cp:
347 vdu_needed_access.append(vdu["id"])
348 mgmt_cp = None
349 break
350
351 if vdu_needed_access:
352 for vnf_member in nsd.get("constituent-vnfd"):
353 if vnf_member["vnfd-id-ref"] != vnfd_ref:
354 continue
355 for vdu in vdu_needed_access:
356 populate_dict(RO_ns_params,
357 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
358 n2vc_key_list)
359
360 if ns_params.get("vduImage"):
361 RO_ns_params["vduImage"] = ns_params["vduImage"]
362
363 if ns_params.get("ssh_keys"):
364 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
365 for vnf_params in get_iterable(ns_params, "vnf"):
366 for constituent_vnfd in nsd["constituent-vnfd"]:
367 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
368 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
369 break
370 else:
371 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
372 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
373 if vnf_params.get("vimAccountId"):
374 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
375 vim_account_2_RO(vnf_params["vimAccountId"]))
376
377 for vdu_params in get_iterable(vnf_params, "vdu"):
378 # TODO feature 1417: check that this VDU exist and it is not a PDU
379 if vdu_params.get("volume"):
380 for volume_params in vdu_params["volume"]:
381 if volume_params.get("vim-volume-id"):
382 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
383 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
384 volume_params["vim-volume-id"])
385 if vdu_params.get("interface"):
386 for interface_params in vdu_params["interface"]:
387 if interface_params.get("ip-address"):
388 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
389 vdu_params["id"], "interfaces", interface_params["name"],
390 "ip_address"),
391 interface_params["ip-address"])
392 if interface_params.get("mac-address"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_params["id"], "interfaces", interface_params["name"],
395 "mac_address"),
396 interface_params["mac-address"])
397 if interface_params.get("floating-ip-required"):
398 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
399 vdu_params["id"], "interfaces", interface_params["name"],
400 "floating-ip"),
401 interface_params["floating-ip-required"])
402
403 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
404 if internal_vld_params.get("vim-network-name"):
405 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
406 internal_vld_params["name"], "vim-network-name"),
407 internal_vld_params["vim-network-name"])
408 if internal_vld_params.get("vim-network-id"):
409 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
410 internal_vld_params["name"], "vim-network-id"),
411 internal_vld_params["vim-network-id"])
412 if internal_vld_params.get("ip-profile"):
413 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
414 internal_vld_params["name"], "ip-profile"),
415 ip_profile_2_RO(internal_vld_params["ip-profile"]))
416
417 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
418 # look for interface
419 iface_found = False
420 for vdu_descriptor in vnf_descriptor["vdu"]:
421 for vdu_interface in vdu_descriptor["interface"]:
422 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
423 if icp_params.get("ip-address"):
424 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
425 vdu_descriptor["id"], "interfaces",
426 vdu_interface["name"], "ip_address"),
427 icp_params["ip-address"])
428
429 if icp_params.get("mac-address"):
430 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
431 vdu_descriptor["id"], "interfaces",
432 vdu_interface["name"], "mac_address"),
433 icp_params["mac-address"])
434 iface_found = True
435 break
436 if iface_found:
437 break
438 else:
439 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
440 "internal-vld:id-ref={} is not present at vnfd:internal-"
441 "connection-point".format(vnf_params["member-vnf-index"],
442 icp_params["id-ref"]))
443
444 for vld_params in get_iterable(ns_params, "vld"):
445 if "ip-profile" in vld_params:
446 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
447 ip_profile_2_RO(vld_params["ip-profile"]))
448
449 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
450 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
451 wim_account_2_RO(vld_params["wimAccountId"])),
452 if vld_params.get("vim-network-name"):
453 RO_vld_sites = []
454 if isinstance(vld_params["vim-network-name"], dict):
455 for vim_account, vim_net in vld_params["vim-network-name"].items():
456 RO_vld_sites.append({
457 "netmap-use": vim_net,
458 "datacenter": vim_account_2_RO(vim_account)
459 })
460 else: # isinstance str
461 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
462 if RO_vld_sites:
463 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
464 if vld_params.get("vim-network-id"):
465 RO_vld_sites = []
466 if isinstance(vld_params["vim-network-id"], dict):
467 for vim_account, vim_net in vld_params["vim-network-id"].items():
468 RO_vld_sites.append({
469 "netmap-use": vim_net,
470 "datacenter": vim_account_2_RO(vim_account)
471 })
472 else: # isinstance str
473 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
474 if RO_vld_sites:
475 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
476 if vld_params.get("ns-net"):
477 if isinstance(vld_params["ns-net"], dict):
478 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
479 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
480 if RO_vld_ns_net:
481 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
482 if "vnfd-connection-point-ref" in vld_params:
483 for cp_params in vld_params["vnfd-connection-point-ref"]:
484 # look for interface
485 for constituent_vnfd in nsd["constituent-vnfd"]:
486 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
487 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
488 break
489 else:
490 raise LcmException(
491 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
492 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
493 match_cp = False
494 for vdu_descriptor in vnf_descriptor["vdu"]:
495 for interface_descriptor in vdu_descriptor["interface"]:
496 if interface_descriptor.get("external-connection-point-ref") == \
497 cp_params["vnfd-connection-point-ref"]:
498 match_cp = True
499 break
500 if match_cp:
501 break
502 else:
503 raise LcmException(
504 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
505 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
506 cp_params["member-vnf-index-ref"],
507 cp_params["vnfd-connection-point-ref"],
508 vnf_descriptor["id"]))
509 if cp_params.get("ip-address"):
510 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
511 vdu_descriptor["id"], "interfaces",
512 interface_descriptor["name"], "ip_address"),
513 cp_params["ip-address"])
514 if cp_params.get("mac-address"):
515 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
516 vdu_descriptor["id"], "interfaces",
517 interface_descriptor["name"], "mac_address"),
518 cp_params["mac-address"])
519 return RO_ns_params
520
521 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
522 # make a copy to do not change
523 vdu_create = copy(vdu_create)
524 vdu_delete = copy(vdu_delete)
525
526 vdurs = db_vnfr.get("vdur")
527 if vdurs is None:
528 vdurs = []
529 vdu_index = len(vdurs)
530 while vdu_index:
531 vdu_index -= 1
532 vdur = vdurs[vdu_index]
533 if vdur.get("pdu-type"):
534 continue
535 vdu_id_ref = vdur["vdu-id-ref"]
536 if vdu_create and vdu_create.get(vdu_id_ref):
537 for index in range(0, vdu_create[vdu_id_ref]):
538 vdur = deepcopy(vdur)
539 vdur["_id"] = str(uuid4())
540 vdur["count-index"] += 1
541 vdurs.insert(vdu_index+1+index, vdur)
542 del vdu_create[vdu_id_ref]
543 if vdu_delete and vdu_delete.get(vdu_id_ref):
544 del vdurs[vdu_index]
545 vdu_delete[vdu_id_ref] -= 1
546 if not vdu_delete[vdu_id_ref]:
547 del vdu_delete[vdu_id_ref]
548 # check all operations are done
549 if vdu_create or vdu_delete:
550 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
551 vdu_create))
552 if vdu_delete:
553 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
554 vdu_delete))
555
556 vnfr_update = {"vdur": vdurs}
557 db_vnfr["vdur"] = vdurs
558 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
559
560 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
561 """
562 Updates database nsr with the RO info for the created vld
563 :param ns_update_nsr: dictionary to be filled with the updated info
564 :param db_nsr: content of db_nsr. This is also modified
565 :param nsr_desc_RO: nsr descriptor from RO
566 :return: Nothing, LcmException is raised on errors
567 """
568
569 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
570 for net_RO in get_iterable(nsr_desc_RO, "nets"):
571 if vld["id"] != net_RO.get("ns_net_osm_id"):
572 continue
573 vld["vim-id"] = net_RO.get("vim_net_id")
574 vld["name"] = net_RO.get("vim_name")
575 vld["status"] = net_RO.get("status")
576 vld["status-detailed"] = net_RO.get("error_msg")
577 ns_update_nsr["vld.{}".format(vld_index)] = vld
578 break
579 else:
580 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
581
582 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
583 """
584 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
585 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
586 :param nsr_desc_RO: nsr descriptor from RO
587 :return: Nothing, LcmException is raised on errors
588 """
589 for vnf_index, db_vnfr in db_vnfrs.items():
590 for vnf_RO in nsr_desc_RO["vnfs"]:
591 if vnf_RO["member_vnf_index"] != vnf_index:
592 continue
593 vnfr_update = {}
594 if vnf_RO.get("ip_address"):
595 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
596 elif not db_vnfr.get("ip-address"):
597 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
598
599 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
600 vdur_RO_count_index = 0
601 if vdur.get("pdu-type"):
602 continue
603 for vdur_RO in get_iterable(vnf_RO, "vms"):
604 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
605 continue
606 if vdur["count-index"] != vdur_RO_count_index:
607 vdur_RO_count_index += 1
608 continue
609 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
610 if vdur_RO.get("ip_address"):
611 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
612 else:
613 vdur["ip-address"] = None
614 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
615 vdur["name"] = vdur_RO.get("vim_name")
616 vdur["status"] = vdur_RO.get("status")
617 vdur["status-detailed"] = vdur_RO.get("error_msg")
618 for ifacer in get_iterable(vdur, "interfaces"):
619 for interface_RO in get_iterable(vdur_RO, "interfaces"):
620 if ifacer["name"] == interface_RO.get("internal_name"):
621 ifacer["ip-address"] = interface_RO.get("ip_address")
622 ifacer["mac-address"] = interface_RO.get("mac_address")
623 break
624 else:
625 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
626 "from VIM info".format(vnf_index, vdur["vdu-id-ref"],
627 ifacer["name"]))
628 vnfr_update["vdur.{}".format(vdu_index)] = vdur
629 break
630 else:
631 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
632 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
633
634 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
635 for net_RO in get_iterable(nsr_desc_RO, "nets"):
636 if vld["id"] != net_RO.get("vnf_net_osm_id"):
637 continue
638 vld["vim-id"] = net_RO.get("vim_net_id")
639 vld["name"] = net_RO.get("vim_name")
640 vld["status"] = net_RO.get("status")
641 vld["status-detailed"] = net_RO.get("error_msg")
642 vnfr_update["vld.{}".format(vld_index)] = vld
643 break
644 else:
645 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
646 vnf_index, vld["id"]))
647
648 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
649 break
650
651 else:
652 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
653
654 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
655 n2vc_key_list):
656 db_nsr_update = {}
657 RO_descriptor_number = 0 # number of descriptors created at RO
658 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
659 start_deploy = time()
660 ns_params = db_nslcmop.get("operationParams")
661
662 # deploy RO
663 # get vnfds, instantiate at RO
664 for c_vnf in nsd.get("constituent-vnfd", ()):
665 member_vnf_index = c_vnf["member-vnf-index"]
666 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
667 vnfd_ref = vnfd["id"]
668 step = db_nsr_update["detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(
669 vnfd_ref, member_vnf_index)
670 # self.logger.debug(logging_text + step)
671 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
672 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
673 RO_descriptor_number += 1
674
675 # look position at deployed.RO.vnfd if not present it will be appended at the end
676 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
677 if vnf_deployed["member-vnf-index"] == member_vnf_index:
678 break
679 else:
680 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
681 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
682
683 # look if present
684 RO_update = {"member-vnf-index": member_vnf_index}
685 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
686 if vnfd_list:
687 RO_update["id"] = vnfd_list[0]["uuid"]
688 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
689 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
690 else:
691 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
692 get("additionalParamsForVnf"), nsr_id)
693 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
694 RO_update["id"] = desc["uuid"]
695 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
696 vnfd_ref, member_vnf_index, desc["uuid"]))
697 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
698 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
699 self.update_db_2("nsrs", nsr_id, db_nsr_update)
700
701 # create nsd at RO
702 nsd_ref = nsd["id"]
703 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
704 # self.logger.debug(logging_text + step)
705
706 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
707 RO_descriptor_number += 1
708 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
709 if nsd_list:
710 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
711 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
712 nsd_ref, RO_nsd_uuid))
713 else:
714 nsd_RO = deepcopy(nsd)
715 nsd_RO["id"] = RO_osm_nsd_id
716 nsd_RO.pop("_id", None)
717 nsd_RO.pop("_admin", None)
718 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
719 member_vnf_index = c_vnf["member-vnf-index"]
720 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
721 for c_vld in nsd_RO.get("vld", ()):
722 for cp in c_vld.get("vnfd-connection-point-ref", ()):
723 member_vnf_index = cp["member-vnf-index-ref"]
724 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
725
726 desc = await self.RO.create("nsd", descriptor=nsd_RO)
727 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
728 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
729 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
730 self.update_db_2("nsrs", nsr_id, db_nsr_update)
731
732 # Crate ns at RO
733 # if present use it unless in error status
734 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
735 if RO_nsr_id:
736 try:
737 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
738 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
739 desc = await self.RO.show("ns", RO_nsr_id)
740 except ROclient.ROClientException as e:
741 if e.http_code != HTTPStatus.NOT_FOUND:
742 raise
743 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
744 if RO_nsr_id:
745 ns_status, ns_status_info = self.RO.check_ns_status(desc)
746 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
747 if ns_status == "ERROR":
748 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
749 self.logger.debug(logging_text + step)
750 await self.RO.delete("ns", RO_nsr_id)
751 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
752 if not RO_nsr_id:
753 step = db_nsr_update["detailed-status"] = "Checking dependencies"
754 # self.logger.debug(logging_text + step)
755
756 # check if VIM is creating and wait look if previous tasks in process
757 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
758 if task_dependency:
759 step = "Waiting for related tasks to be completed: {}".format(task_name)
760 self.logger.debug(logging_text + step)
761 await asyncio.wait(task_dependency, timeout=3600)
762 if ns_params.get("vnf"):
763 for vnf in ns_params["vnf"]:
764 if "vimAccountId" in vnf:
765 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
766 vnf["vimAccountId"])
767 if task_dependency:
768 step = "Waiting for related tasks to be completed: {}".format(task_name)
769 self.logger.debug(logging_text + step)
770 await asyncio.wait(task_dependency, timeout=3600)
771
772 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
773
774 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
775
776 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
777 desc = await self.RO.create("ns", descriptor=RO_ns_params,
778 name=db_nsr["name"],
779 scenario=RO_nsd_uuid)
780 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
781 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
782 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
783 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
784 self.update_db_2("nsrs", nsr_id, db_nsr_update)
785
786 # wait until NS is ready
787 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
788 detailed_status_old = None
789 self.logger.debug(logging_text + step)
790
791 while time() <= start_deploy + self.total_deploy_timeout:
792 desc = await self.RO.show("ns", RO_nsr_id)
793 ns_status, ns_status_info = self.RO.check_ns_status(desc)
794 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
795 if ns_status == "ERROR":
796 raise ROclient.ROClientException(ns_status_info)
797 elif ns_status == "BUILD":
798 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
799 elif ns_status == "ACTIVE":
800 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
801 try:
802 self.ns_update_vnfr(db_vnfrs, desc)
803 break
804 except LcmExceptionNoMgmtIP:
805 pass
806 else:
807 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
808 if detailed_status != detailed_status_old:
809 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
810 self.update_db_2("nsrs", nsr_id, db_nsr_update)
811 await asyncio.sleep(5, loop=self.loop)
812 else: # total_deploy_timeout
813 raise ROclient.ROClientException("Timeout waiting ns to be ready")
814
815 step = "Updating NSR"
816 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
817
818 db_nsr_update["operational-status"] = "running"
819 db_nsr["detailed-status"] = "Configuring vnfr"
820 self.update_db_2("nsrs", nsr_id, db_nsr_update)
821
822 async def insert_key_ro(self, logging_text, pub_key, nsr_id, vnfr_id, vdu_id, vdu_index, user):
823 ro_nsr_id = None
824 ip_address = None
825 nb_tries = 0
826 target_vdu_id = None
827 while True:
828 await asyncio.sleep(10, loop=self.loop)
829 # 1 wait until NS is deployed at RO
830 if not ro_nsr_id:
831 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
832 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
833 if not ro_nsr_id:
834 continue
835 if not target_vdu_id:
836 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
837 if not vdu_id:
838 ip_address = db_vnfr.get("ip-address")
839 if not ip_address:
840 continue
841 for vdur in get_iterable(db_vnfr, "vdur"):
842 if (vdur["vdu-id-ref"] == vdu_id and vdur["count-index"] == vdu_index) or \
843 (ip_address and vdur.get("ip-address") == ip_address):
844 if vdur["status"] == "ACTIVE":
845 target_vdu_id = vdur["vdu-id-ref"]
846 elif vdur["status"] == "ERROR":
847 raise LcmException("Cannot inject ssh-key because target VM is in error state")
848 break
849 else:
850 raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
851 vnfr_id, vdu_id, vdu_index
852 ))
853 if not target_vdu_id:
854 continue
855 try:
856 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
857 result_dict = await self.RO.create_action("ns", ro_nsr_id,
858 {"add_public_key": pub_key, "vms": [ro_vm_id], "user": user})
859 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
860 if not result_dict or not isinstance(result_dict, dict):
861 raise LcmException("Unknown response from RO when injecting key")
862 for result in result_dict.values():
863 if result.get("vim_result") == 200:
864 break
865 else:
866 raise ROclient.ROClientException("error injecting key: {}".format(
867 result.get("description")))
868 return
869 except ROclient.ROClientException as e:
870 nb_tries += 1
871 if nb_tries >= 10:
872 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
873 self.logger.debug(logging_text + "error injecting key: {}".format(e))
874
875 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, vdu_index,
876 deploy_params, config_descriptor, base_folder):
877 nsr_id = db_nsr["_id"]
878 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
879 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
880 logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} ".format(db_vnfr["member-vnf-index-ref"],
881 vdu_id, vdu_index)
882 step = ""
883 try:
884 vnfr_id = None
885 if db_vnfr:
886 vnfr_id = db_vnfr["_id"]
887
888 namespace = "{nsi}.{ns}".format(
889 nsi=nsi_id if nsi_id else "",
890 ns=nsr_id)
891 if vnfr_id:
892 namespace += "." + vnfr_id
893 if vdu_id:
894 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
895
896 # Get artifact path
897 storage_params = self.fs.get_params()
898 artifact_path = "{}{}/{}/charms/{}".format(
899 storage_params["path"],
900 base_folder["folder"],
901 base_folder["pkg-dir"],
902 config_descriptor["juju"]["charm"]
903 )
904
905 is_proxy_charm = True if config_descriptor["juju"]["charm"] else False
906 if config_descriptor["juju"].get("proxy") is False:
907 is_proxy_charm = False
908
909 # n2vc_redesign STEP 3.1
910 ee_id = vca_deployed.get("ee_id")
911 if not ee_id:
912 if is_proxy_charm:
913 step = "create execution envioronment"
914 self.logger.debug(logging_text + step)
915 ee_id = await self.n2vc.CreateExecutionEnvironment(namespace)
916
917 else:
918 step = "register execution envioronment"
919 # TODO wait until deployed by RO, when IP address has been filled. By pooling????
920 credentials = {} # TODO db_credentials["ip_address"]
921 # get username
922 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
923 # merged. Meanwhile let's get username from initial-config-primitive
924 if config_descriptor.get("initial-config-primitive"):
925 for param in config_descriptor["initial-config-primitive"][0].get("parameter", ()):
926 if param["name"] == "ssh-username":
927 credentials["username"] = param["value"]
928 if config_descriptor.get("config-access") and config_descriptor["config-access"].get("ssh-access"):
929 if config_descriptor["config-access"]["ssh-access"].get("required"):
930 credentials["username"] = \
931 config_descriptor["config-access"]["ssh-access"].get("default-user")
932
933 # n2vc_redesign STEP 3.2
934 self.logger.debug(logging_text + step)
935 ee_id = await self.n2vc.RegisterExecutionEnvioronment(credentials)
936 # for compatibility with MON/POL modules, the need model and application name at database
937 # TODO ask to N2VC instead of assuming the format "model_name.application_name"
938 model_name, _, application_name = ee_id.partition(".")
939 self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
940 db_update_entry + "application": application_name,
941 db_update_entry + "ee_id": ee_id})
942
943 # n2vc_redesign STEP 3.3
944 # TODO check if already done
945 step = "Install configuration Software"
946 # TODO db_dict with filter: 'nsr_id', path:'db_update_entry'
947 pub_key = await self.n2vc.InstallConfigurationSW(self, ee_id, artifact_path)
948 if is_proxy_charm and pub_key:
949 # check if ssh key must be injected
950 required = deep_get(config_descriptor, ("config-access", "ssh-access", "required"))
951 # TODO get from config primitive: if required not user:
952 if required:
953 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
954 # insert pub_key into VM
955 # n2vc_redesign STEP 5.1
956 step = "Insert public key into VM"
957 await self.insert_key_ro(logging_text, pub_key, nsr_id, vnfr_id, vdu_id, vdu_index, user)
958 # TODO get ip_address and add to deploy params
959 deploy_params["rw_mgmt_ip"] = "TODO-ip-address"
960
961 # n2vc_redesign STEP 6 Execute initial config primitive
962 initial_config_primitive_list = config_descriptor.get('initial-config-primitive', [])
963 for initial_config_primitive in initial_config_primitive_list:
964 # TODO check if already done
965 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
966 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
967 self.logger.debug(logging_text + step)
968 await self.n2vc.ExecutePrimitive(ee_id, initial_config_primitive["name"], primitive_params_)
969 # TODO register in database that primitive is done
970 except Exception as e: # TODO not use Exception but N2VC exception
971 raise Exception("{} {}".format(step, e))
972 # TODO raise N2VC exception with 'step' extra information
973
974 async def instantiate(self, nsr_id, nslcmop_id):
975 # Try to lock HA task here
976 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
977 if not task_is_locked_by_me:
978 return
979
980 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
981 self.logger.debug(logging_text + "Enter")
982 # get all needed from database
983 # start_deploy = time()
984 db_nsr = None
985 db_nslcmop = None
986 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
987 db_nslcmop_update = {}
988 nslcmop_operation_state = None
989 db_vnfrs = {}
990 task_instantiation_list = []
991 exc = None
992 try:
993 # wait for any previous tasks in process
994 step = "Waiting for previous tasks"
995 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
996 # STEP 0: Reding database
997 step = "Getting nslcmop={} from db".format(nslcmop_id)
998 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
999 step = "Getting nsr={} from db".format(nsr_id)
1000 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1001 nsd = db_nsr["nsd"]
1002 # nsr_name = db_nsr["name"] # TODO short-name??
1003
1004 step = "Getting vnfrs from db"
1005 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1006 db_vnfds_ref = {}
1007 db_vnfds = {}
1008 db_vnfds_index = {}
1009 for vnfr in db_vnfrs_list:
1010 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1011 vnfd_id = vnfr["vnfd-id"]
1012 vnfd_ref = vnfr["vnfd-ref"]
1013 if vnfd_id not in db_vnfds:
1014 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1015 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1016 db_vnfds_ref[vnfd_ref] = vnfd
1017 db_vnfds[vnfd_id] = vnfd
1018 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id]
1019
1020 # Get or generates the _admin.deployed,VCA list
1021 vca_deployed_list = None
1022 # vca_model_name = None
1023 if db_nsr["_admin"].get("deployed"):
1024 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1025 # vca_model_name = db_nsr["_admin"]["deployed"].get("VCA-model-name")
1026 if vca_deployed_list is None:
1027 vca_deployed_list = []
1028 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1029 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1030 elif isinstance(vca_deployed_list, dict):
1031 # maintain backward compatibility. Change a dict to list at database
1032 vca_deployed_list = list(vca_deployed_list.values())
1033 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1034 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1035
1036 db_nsr_update["detailed-status"] = "creating"
1037 db_nsr_update["operational-status"] = "init"
1038 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1039 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1040 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1041
1042 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1043 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1044 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1045
1046 # n2vc_redesign STEP 1 Get VCA public ssh-key
1047 # feature 1429. Add n2vc public key to needed VMs
1048 n2vc_key = await self.n2vc.GetPublicKey()
1049
1050 # n2vc_redesign STEP 2 Deploy Network Scenario
1051 task_ro = asyncio.ensure_future(
1052 self.instantiate_RO(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, [n2vc_key])
1053 )
1054 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1055 task_instantiation_list.append(task_ro)
1056
1057 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1058 step = "Looking for needed vnfd to configure with proxy charm"
1059 self.logger.debug(logging_text + step)
1060
1061 def _deploy_n2vc():
1062 # launch instantiate_N2VC in a asyncio task and register task object
1063 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
1064 # if not found, create one entry and update database
1065 # read variables logging_text, nsi_id, db_nsr, db_vnfr, vdu_id, vdu_index, deploy_params,
1066 # descriptor_config, base_folder, member_vnf_index, vnfd_id
1067 # write variables: db_nsr
1068
1069 # fill db_nsr._admin.deployed.VCA.<index>
1070 vca_index = -1
1071 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
1072 if not vca_deployed:
1073 continue
1074 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
1075 vca_deployed.get("vdu_id") == vdu_id and \
1076 vca_deployed.get("vdu_count_index", 0) == vdu_index:
1077 break
1078 else:
1079 # not found, create one.
1080 vca_deployed = {
1081 "member-vnf-index": member_vnf_index,
1082 "vdu_id": vdu_id,
1083 "vdu_count_index": vdu_index,
1084 "operational-status": "init", # TODO revise
1085 "detailed-status": "", # TODO revise
1086 "step": "initial-deploy", # TODO revise
1087 "vnfd_id": vnfd_id,
1088 "vdu_name": vdu_name,
1089 }
1090 vca_index += 1
1091 self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
1092 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
1093
1094 # Launch task
1095 task_n2vc = asyncio.ensure_future(
1096 self.instantiate_N2VC(logging_text,
1097 vca_index,
1098 nsi_id,
1099 db_nsr,
1100 db_vnfr,
1101 vdu_id,
1102 vdu_index,
1103 deploy_params,
1104 descriptor_config,
1105 base_folder,
1106 )
1107 )
1108 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
1109 task_instantiation_list.append(task_n2vc)
1110
1111 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1112 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1113 vnfd_id = c_vnf["vnfd-id-ref"]
1114 member_vnf_index = str(c_vnf["member-vnf-index"])
1115 db_vnfr = db_vnfrs[member_vnf_index]
1116 base_folder = vnfd["_admin"]["storage"]
1117 vdu_id = None
1118 vdu_index = 0
1119 vdu_name = None
1120 vnfd = db_vnfds_ref[vnfd_id]
1121
1122 # Get additional parameters
1123 deploy_params = {}
1124 if db_vnfr.get("additionalParamsForVnf"):
1125 deploy_params = db_vnfr["additionalParamsForVnf"].copy()
1126 for k, v in deploy_params.items():
1127 if isinstance(v, str) and v.startswith("!!yaml "):
1128 deploy_params[k] = yaml.safe_load(v[7:])
1129
1130 descriptor_config = vnfd.get("vnf-configuration")
1131 if descriptor_config and descriptor_config.get("juju"):
1132 _deploy_n2vc()
1133
1134 # Deploy charms for each VDU that supports one.
1135 for vdud in get_iterable(vnfd, 'vdu'):
1136 vdu_id = vdud["id"]
1137 descriptor_config = vdud.get('vdu-configuration')
1138 if descriptor_config and descriptor_config.get("juju"):
1139 # look for vdu index in the db_vnfr["vdu"] section
1140 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
1141 # if vdur["vdu-id-ref"] == vdu_id:
1142 # break
1143 # else:
1144 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
1145 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
1146 # vdu_name = vdur.get("name")
1147 vdu_name = None
1148 for vdu_index in range(int(vdud.get("count", 1))):
1149 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1150 _deploy_n2vc()
1151
1152 # Check if this NS has a charm configuration
1153 descriptor_config = nsd.get("ns-configuration")
1154 if descriptor_config and descriptor_config.get("juju"):
1155 vnfd_id = None
1156 db_vnfr = None
1157 member_vnf_index = None
1158 vdu_id = None
1159 vdu_index = 0
1160 vdu_name = None
1161 # Get additional parameters
1162 deploy_params = {}
1163 if db_nsr.get("additionalParamsForNs"):
1164 deploy_params = db_nsr["additionalParamsForNs"].copy()
1165 for k, v in deploy_params.items():
1166 if isinstance(v, str) and v.startswith("!!yaml "):
1167 deploy_params[k] = yaml.safe_load(v[7:])
1168 base_folder = nsd["_admin"]["storage"]
1169 _deploy_n2vc()
1170
1171 # Wait until all tasks of "task_instantiation_list" have been finished
1172
1173 # while time() <= start_deploy + self.total_deploy_timeout:
1174 error_text = None
1175 timeout = 3600 # time() - start_deploy
1176 task_instantiation_set = set(task_instantiation_list)
1177 done, pending = await asyncio.wait(task_instantiation_set, timeout=timeout)
1178 # TODO return_when=asyncio.FIRST_COMPLETED)
1179 if pending:
1180 error_text = "timeout"
1181 for task in done:
1182 if task.cancelled():
1183 if not error_text:
1184 error_text = "cancelled"
1185 elif task.done():
1186 exc = task.exception()
1187 if exc:
1188 error_text = str(exc)
1189
1190 if error_text:
1191 db_nsr_update["config-status"] = "failed"
1192 error_text = "fail configuring " + error_text
1193 db_nsr_update["detailed-status"] = error_text
1194 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1195 db_nslcmop_update["detailed-status"] = error_text
1196 db_nslcmop_update["statusEnteredTime"] = time()
1197 else:
1198 # all is done
1199 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1200 db_nslcmop_update["statusEnteredTime"] = time()
1201 db_nslcmop_update["detailed-status"] = "done"
1202 db_nsr_update["config-status"] = "configured"
1203 db_nsr_update["detailed-status"] = "done"
1204
1205 return
1206
1207 except (ROclient.ROClientException, DbException, LcmException) as e:
1208 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1209 exc = e
1210 except asyncio.CancelledError:
1211 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1212 exc = "Operation was cancelled"
1213 except Exception as e:
1214 exc = traceback.format_exc()
1215 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1216 exc_info=True)
1217 finally:
1218 if exc:
1219 if db_nsr:
1220 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1221 db_nsr_update["operational-status"] = "failed"
1222 if db_nslcmop:
1223 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1224 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1225 db_nslcmop_update["statusEnteredTime"] = time()
1226 try:
1227 if db_nsr:
1228 db_nsr_update["_admin.nslcmop"] = None
1229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1230 if db_nslcmop_update:
1231 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1232 except DbException as e:
1233 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1234 if nslcmop_operation_state:
1235 try:
1236 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1237 "operationState": nslcmop_operation_state},
1238 loop=self.loop)
1239 except Exception as e:
1240 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1241
1242 self.logger.debug(logging_text + "Exit")
1243 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1244
1245 async def _destroy_charm(self, model, application):
1246 """
1247 Order N2VC destroy a charm
1248 :param model:
1249 :param application:
1250 :return: True if charm does not exist. False if it exist
1251 """
1252 if not await self.n2vc.HasApplication(model, application):
1253 return True # Already removed
1254 await self.n2vc.RemoveCharms(model, application)
1255 return False
1256
1257 async def _wait_charm_destroyed(self, model, application, timeout):
1258 """
1259 Wait until charm does not exist
1260 :param model:
1261 :param application:
1262 :param timeout:
1263 :return: True if not exist, False if timeout
1264 """
1265 while True:
1266 if not await self.n2vc.HasApplication(model, application):
1267 return True
1268 if timeout < 0:
1269 return False
1270 await asyncio.sleep(10, loop=self.loop)
1271 timeout -= 10
1272
1273 # Check if this VNFD has a configured terminate action
1274 def _has_terminate_config_primitive(self, vnfd):
1275 vnf_config = vnfd.get("vnf-configuration")
1276 if vnf_config and vnf_config.get("terminate-config-primitive"):
1277 return True
1278 else:
1279 return False
1280
1281 @staticmethod
1282 def _get_terminate_config_primitive_seq_list(vnfd):
1283 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1284 # No need to check for existing primitive twice, already done before
1285 vnf_config = vnfd.get("vnf-configuration")
1286 seq_list = vnf_config.get("terminate-config-primitive")
1287 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1288 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1289 return seq_list_sorted
1290
1291 @staticmethod
1292 def _create_nslcmop(nsr_id, operation, params):
1293 """
1294 Creates a ns-lcm-opp content to be stored at database.
1295 :param nsr_id: internal id of the instance
1296 :param operation: instantiate, terminate, scale, action, ...
1297 :param params: user parameters for the operation
1298 :return: dictionary following SOL005 format
1299 """
1300 # Raise exception if invalid arguments
1301 if not (nsr_id and operation and params):
1302 raise LcmException(
1303 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1304 now = time()
1305 _id = str(uuid4())
1306 nslcmop = {
1307 "id": _id,
1308 "_id": _id,
1309 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1310 "operationState": "PROCESSING",
1311 "statusEnteredTime": now,
1312 "nsInstanceId": nsr_id,
1313 "lcmOperationType": operation,
1314 "startTime": now,
1315 "isAutomaticInvocation": False,
1316 "operationParams": params,
1317 "isCancelPending": False,
1318 "links": {
1319 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1320 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1321 }
1322 }
1323 return nslcmop
1324
1325 def _get_terminate_primitive_params(self, seq, vnf_index):
1326 primitive = seq.get('name')
1327 primitive_params = {}
1328 params = {
1329 "member_vnf_index": vnf_index,
1330 "primitive": primitive,
1331 "primitive_params": primitive_params,
1332 }
1333 desc_params = {}
1334 return self._map_primitive_params(seq, params, desc_params)
1335
1336 # sub-operations
1337
1338 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1339 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1340 if (op.get('operationState') == 'COMPLETED'):
1341 # b. Skip sub-operation
1342 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1343 return self.SUBOPERATION_STATUS_SKIP
1344 else:
1345 # c. Reintent executing sub-operation
1346 # The sub-operation exists, and operationState != 'COMPLETED'
1347 # Update operationState = 'PROCESSING' to indicate a reintent.
1348 operationState = 'PROCESSING'
1349 detailed_status = 'In progress'
1350 self._update_suboperation_status(
1351 db_nslcmop, op_index, operationState, detailed_status)
1352 # Return the sub-operation index
1353 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1354 # with arguments extracted from the sub-operation
1355 return op_index
1356
1357 # Find a sub-operation where all keys in a matching dictionary must match
1358 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1359 def _find_suboperation(self, db_nslcmop, match):
1360 if (db_nslcmop and match):
1361 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1362 for i, op in enumerate(op_list):
1363 if all(op.get(k) == match[k] for k in match):
1364 return i
1365 return self.SUBOPERATION_STATUS_NOT_FOUND
1366
1367 # Update status for a sub-operation given its index
1368 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1369 # Update DB for HA tasks
1370 q_filter = {'_id': db_nslcmop['_id']}
1371 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1372 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1373 self.db.set_one("nslcmops",
1374 q_filter=q_filter,
1375 update_dict=update_dict,
1376 fail_on_empty=False)
1377
1378 # Add sub-operation, return the index of the added sub-operation
1379 # Optionally, set operationState, detailed-status, and operationType
1380 # Status and type are currently set for 'scale' sub-operations:
1381 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1382 # 'detailed-status' : status message
1383 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1384 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1385 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index,
1386 vdu_name, primitive, mapped_primitive_params,
1387 operationState=None, detailed_status=None, operationType=None,
1388 RO_nsr_id=None, RO_scaling_info=None):
1389 if not (db_nslcmop):
1390 return self.SUBOPERATION_STATUS_NOT_FOUND
1391 # Get the "_admin.operations" list, if it exists
1392 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1393 op_list = db_nslcmop_admin.get('operations')
1394 # Create or append to the "_admin.operations" list
1395 new_op = {'member_vnf_index': vnf_index,
1396 'vdu_id': vdu_id,
1397 'vdu_count_index': vdu_count_index,
1398 'primitive': primitive,
1399 'primitive_params': mapped_primitive_params}
1400 if operationState:
1401 new_op['operationState'] = operationState
1402 if detailed_status:
1403 new_op['detailed-status'] = detailed_status
1404 if operationType:
1405 new_op['lcmOperationType'] = operationType
1406 if RO_nsr_id:
1407 new_op['RO_nsr_id'] = RO_nsr_id
1408 if RO_scaling_info:
1409 new_op['RO_scaling_info'] = RO_scaling_info
1410 if not op_list:
1411 # No existing operations, create key 'operations' with current operation as first list element
1412 db_nslcmop_admin.update({'operations': [new_op]})
1413 op_list = db_nslcmop_admin.get('operations')
1414 else:
1415 # Existing operations, append operation to list
1416 op_list.append(new_op)
1417
1418 db_nslcmop_update = {'_admin.operations': op_list}
1419 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1420 op_index = len(op_list) - 1
1421 return op_index
1422
1423 # Helper methods for scale() sub-operations
1424
1425 # pre-scale/post-scale:
1426 # Check for 3 different cases:
1427 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1428 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1429 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1430 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index,
1431 vnf_config_primitive, primitive_params, operationType,
1432 RO_nsr_id=None, RO_scaling_info=None):
1433 # Find this sub-operation
1434 if (RO_nsr_id and RO_scaling_info):
1435 operationType = 'SCALE-RO'
1436 match = {
1437 'member_vnf_index': vnf_index,
1438 'RO_nsr_id': RO_nsr_id,
1439 'RO_scaling_info': RO_scaling_info,
1440 }
1441 else:
1442 match = {
1443 'member_vnf_index': vnf_index,
1444 'primitive': vnf_config_primitive,
1445 'primitive_params': primitive_params,
1446 'lcmOperationType': operationType
1447 }
1448 op_index = self._find_suboperation(db_nslcmop, match)
1449 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1450 # a. New sub-operation
1451 # The sub-operation does not exist, add it.
1452 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1453 # The following parameters are set to None for all kind of scaling:
1454 vdu_id = None
1455 vdu_count_index = None
1456 vdu_name = None
1457 if (RO_nsr_id and RO_scaling_info):
1458 vnf_config_primitive = None
1459 primitive_params = None
1460 else:
1461 RO_nsr_id = None
1462 RO_scaling_info = None
1463 # Initial status for sub-operation
1464 operationState = 'PROCESSING'
1465 detailed_status = 'In progress'
1466 # Add sub-operation for pre/post-scaling (zero or more operations)
1467 self._add_suboperation(db_nslcmop,
1468 vnf_index,
1469 vdu_id,
1470 vdu_count_index,
1471 vdu_name,
1472 vnf_config_primitive,
1473 primitive_params,
1474 operationState,
1475 detailed_status,
1476 operationType,
1477 RO_nsr_id,
1478 RO_scaling_info)
1479 return self.SUBOPERATION_STATUS_NEW
1480 else:
1481 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1482 # or op_index (operationState != 'COMPLETED')
1483 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1484
1485 # Helper methods for terminate()
1486
1487 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1488 """ Create a primitive with params from VNFD
1489 Called from terminate() before deleting instance
1490 Calls action() to execute the primitive """
1491 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1492 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1493 db_vnfds = {}
1494 # Loop over VNFRs
1495 for vnfr in db_vnfrs_list:
1496 vnfd_id = vnfr["vnfd-id"]
1497 vnf_index = vnfr["member-vnf-index-ref"]
1498 if vnfd_id not in db_vnfds:
1499 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1500 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1501 db_vnfds[vnfd_id] = vnfd
1502 vnfd = db_vnfds[vnfd_id]
1503 if not self._has_terminate_config_primitive(vnfd):
1504 continue
1505 # Get the primitive's sorted sequence list
1506 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
1507 for seq in seq_list:
1508 # For each sequence in list, get primitive and call _ns_execute_primitive()
1509 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
1510 vnf_index, seq.get("name"))
1511 self.logger.debug(logging_text + step)
1512 # Create the primitive for each sequence, i.e. "primitive": "touch"
1513 primitive = seq.get('name')
1514 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
1515 # The following 3 parameters are currently set to None for 'terminate':
1516 # vdu_id, vdu_count_index, vdu_name
1517 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1518 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1519 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1520 # Add sub-operation
1521 self._add_suboperation(db_nslcmop,
1522 nslcmop_id,
1523 vnf_index,
1524 vdu_id,
1525 vdu_count_index,
1526 vdu_name,
1527 primitive,
1528 mapped_primitive_params)
1529 # Sub-operations: Call _ns_execute_primitive() instead of action()
1530 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1531 nsr_deployed = db_nsr["_admin"]["deployed"]
1532 result, result_detail = await self._ns_execute_primitive(
1533 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1534 mapped_primitive_params)
1535
1536 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
1537 # nsr_id, nslcmop_terminate_action_id)
1538 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
1539 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
1540 if result not in result_ok:
1541 raise LcmException(
1542 "terminate_primitive_action for vnf_member_index={}",
1543 " primitive={} fails with error {}".format(
1544 vnf_index, seq.get("name"), result_detail))
1545
1546 async def terminate(self, nsr_id, nslcmop_id):
1547
1548 # Try to lock HA task here
1549 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1550 if not task_is_locked_by_me:
1551 return
1552
1553 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1554 self.logger.debug(logging_text + "Enter")
1555 db_nsr = None
1556 db_nslcmop = None
1557 exc = None
1558 failed_detail = [] # annotates all failed error messages
1559 vca_time_destroy = None # time of where destroy charm order
1560 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1561 db_nslcmop_update = {}
1562 nslcmop_operation_state = None
1563 autoremove = False # autoremove after terminated
1564 try:
1565 # wait for any previous tasks in process
1566 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
1567
1568 step = "Getting nslcmop={} from db".format(nslcmop_id)
1569 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1570 step = "Getting nsr={} from db".format(nsr_id)
1571 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1572 # nsd = db_nsr["nsd"]
1573 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1574 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1575 return
1576 # #TODO check if VIM is creating and wait
1577 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1578 # Call internal terminate action
1579 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
1580
1581 db_nsr_update["operational-status"] = "terminating"
1582 db_nsr_update["config-status"] = "terminating"
1583
1584 if nsr_deployed and nsr_deployed.get("VCA-model-name"):
1585 vca_model_name = nsr_deployed["VCA-model-name"]
1586 step = "deleting VCA model name '{}' and all charms".format(vca_model_name)
1587 self.logger.debug(logging_text + step)
1588 try:
1589 await self.n2vc.DestroyNetworkService(vca_model_name)
1590 except NetworkServiceDoesNotExist:
1591 pass
1592 db_nsr_update["_admin.deployed.VCA-model-name"] = None
1593 if nsr_deployed.get("VCA"):
1594 for vca_index in range(0, len(nsr_deployed["VCA"])):
1595 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = None
1596 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1597 # for backward compatibility if charm have been created with "default" model name delete one by one
1598 elif nsr_deployed and nsr_deployed.get("VCA"):
1599 try:
1600 step = "Scheduling configuration charms removing"
1601 db_nsr_update["detailed-status"] = "Deleting charms"
1602 self.logger.debug(logging_text + step)
1603 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1604 # for backward compatibility
1605 if isinstance(nsr_deployed["VCA"], dict):
1606 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1607 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1608 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1609
1610 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1611 if vca_deployed:
1612 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1613 vca_deployed.clear()
1614 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1615 else:
1616 vca_time_destroy = time()
1617 except Exception as e:
1618 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1619
1620 # remove from RO
1621 RO_fail = False
1622
1623 # Delete ns
1624 RO_nsr_id = RO_delete_action = None
1625 if nsr_deployed and nsr_deployed.get("RO"):
1626 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1627 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1628 try:
1629 if RO_nsr_id:
1630 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
1631 "Deleting ns from VIM"
1632 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1633 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1634 self.logger.debug(logging_text + step)
1635 desc = await self.RO.delete("ns", RO_nsr_id)
1636 RO_delete_action = desc["action_id"]
1637 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1638 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1639 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1640 if RO_delete_action:
1641 # wait until NS is deleted from VIM
1642 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1643 format(RO_nsr_id, RO_delete_action)
1644 detailed_status_old = None
1645 self.logger.debug(logging_text + step)
1646
1647 delete_timeout = 20 * 60 # 20 minutes
1648 while delete_timeout > 0:
1649 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1650 extra_item_id=RO_delete_action)
1651 ns_status, ns_status_info = self.RO.check_action_status(desc)
1652 if ns_status == "ERROR":
1653 raise ROclient.ROClientException(ns_status_info)
1654 elif ns_status == "BUILD":
1655 detailed_status = step + "; {}".format(ns_status_info)
1656 elif ns_status == "ACTIVE":
1657 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1658 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1659 break
1660 else:
1661 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1662 if detailed_status != detailed_status_old:
1663 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1664 db_nsr_update["detailed-status"] = detailed_status
1665 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1666 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1667 await asyncio.sleep(5, loop=self.loop)
1668 delete_timeout -= 5
1669 else: # delete_timeout <= 0:
1670 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1671
1672 except ROclient.ROClientException as e:
1673 if e.http_code == 404: # not found
1674 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1675 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1676 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1677 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1678 elif e.http_code == 409: # conflict
1679 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1680 self.logger.debug(logging_text + failed_detail[-1])
1681 RO_fail = True
1682 else:
1683 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1684 self.logger.error(logging_text + failed_detail[-1])
1685 RO_fail = True
1686
1687 # Delete nsd
1688 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1689 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1690 try:
1691 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1692 "Deleting nsd from RO"
1693 await self.RO.delete("nsd", RO_nsd_id)
1694 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1695 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1696 except ROclient.ROClientException as e:
1697 if e.http_code == 404: # not found
1698 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1699 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1700 elif e.http_code == 409: # conflict
1701 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1702 self.logger.debug(logging_text + failed_detail[-1])
1703 RO_fail = True
1704 else:
1705 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1706 self.logger.error(logging_text + failed_detail[-1])
1707 RO_fail = True
1708
1709 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
1710 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
1711 if not vnf_deployed or not vnf_deployed["id"]:
1712 continue
1713 try:
1714 RO_vnfd_id = vnf_deployed["id"]
1715 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1716 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
1717 vnf_deployed["member-vnf-index"], RO_vnfd_id)
1718 await self.RO.delete("vnfd", RO_vnfd_id)
1719 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1720 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1721 except ROclient.ROClientException as e:
1722 if e.http_code == 404: # not found
1723 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1724 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1725 elif e.http_code == 409: # conflict
1726 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1727 self.logger.debug(logging_text + failed_detail[-1])
1728 else:
1729 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1730 self.logger.error(logging_text + failed_detail[-1])
1731
1732 # wait until charm deleted
1733 if vca_time_destroy:
1734 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1735 "Waiting for deletion of configuration charms"
1736 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1737 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1738 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1739 if not vca_deployed:
1740 continue
1741 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1742 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1743 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1744 timeout):
1745 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1746 vca_deployed["application"]))
1747 else:
1748 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1749
1750 if failed_detail:
1751 self.logger.error(logging_text + " ;".join(failed_detail))
1752 db_nsr_update["operational-status"] = "failed"
1753 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1754 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1755 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1756 db_nslcmop_update["statusEnteredTime"] = time()
1757 else:
1758 db_nsr_update["operational-status"] = "terminated"
1759 db_nsr_update["detailed-status"] = "Done"
1760 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1761 db_nslcmop_update["detailed-status"] = "Done"
1762 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1763 db_nslcmop_update["statusEnteredTime"] = time()
1764 if db_nslcmop["operationParams"].get("autoremove"):
1765 autoremove = True
1766
1767 except (ROclient.ROClientException, DbException, LcmException) as e:
1768 self.logger.error(logging_text + "Exit Exception {}".format(e))
1769 exc = e
1770 except asyncio.CancelledError:
1771 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1772 exc = "Operation was cancelled"
1773 except Exception as e:
1774 exc = traceback.format_exc()
1775 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1776 finally:
1777 if exc and db_nslcmop:
1778 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1779 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1780 db_nslcmop_update["statusEnteredTime"] = time()
1781 try:
1782 if db_nslcmop and db_nslcmop_update:
1783 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1784 if db_nsr:
1785 db_nsr_update["_admin.nslcmop"] = None
1786 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1787 except DbException as e:
1788 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1789 if nslcmop_operation_state:
1790 try:
1791 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1792 "operationState": nslcmop_operation_state,
1793 "autoremove": autoremove},
1794 loop=self.loop)
1795 except Exception as e:
1796 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1797 self.logger.debug(logging_text + "Exit")
1798 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1799
1800 @staticmethod
1801 def _map_primitive_params(primitive_desc, params, instantiation_params):
1802 """
1803 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
1804 The default-value is used. If it is between < > it look for a value at instantiation_params
1805 :param primitive_desc: portion of VNFD/NSD that describes primitive
1806 :param params: Params provided by user
1807 :param instantiation_params: Instantiation params provided by user
1808 :return: a dictionary with the calculated params
1809 """
1810 calculated_params = {}
1811 for parameter in primitive_desc.get("parameter", ()):
1812 param_name = parameter["name"]
1813 if param_name in params:
1814 calculated_params[param_name] = params[param_name]
1815 elif "default-value" in parameter or "value" in parameter:
1816 if "value" in parameter:
1817 calculated_params[param_name] = parameter["value"]
1818 else:
1819 calculated_params[param_name] = parameter["default-value"]
1820 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
1821 and calculated_params[param_name].endswith(">"):
1822 if calculated_params[param_name][1:-1] in instantiation_params:
1823 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
1824 else:
1825 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1826 format(calculated_params[param_name], primitive_desc["name"]))
1827 else:
1828 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1829 format(param_name, primitive_desc["name"]))
1830
1831 if isinstance(calculated_params[param_name], (dict, list, tuple)):
1832 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
1833 width=256)
1834 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
1835 calculated_params[param_name] = calculated_params[param_name][7:]
1836 return calculated_params
1837
1838 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1839 primitive, primitive_params, retries=0, retries_interval=30):
1840 start_primitive_time = time()
1841 try:
1842 for vca_deployed in db_deployed["VCA"]:
1843 if not vca_deployed:
1844 continue
1845 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1846 continue
1847 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1848 continue
1849 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1850 continue
1851 break
1852 else:
1853 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
1854 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1855 model_name = vca_deployed.get("model")
1856 application_name = vca_deployed.get("application")
1857 if not model_name or not application_name:
1858 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
1859 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
1860 vdu_count_index))
1861 # if vca_deployed["operational-status"] != "active":
1862 # raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1863 # member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1864 callback = None # self.n2vc_callback
1865 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1866 await self.n2vc.login()
1867 if primitive == "config":
1868 primitive_params = {"params": primitive_params}
1869 while retries >= 0:
1870 primitive_id = await self.n2vc.ExecutePrimitive(
1871 model_name,
1872 application_name,
1873 primitive,
1874 callback,
1875 *callback_args,
1876 **primitive_params
1877 )
1878 while time() - start_primitive_time < self.timeout_primitive:
1879 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
1880 if primitive_result_ in ("completed", "failed"):
1881 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
1882 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
1883 break
1884 elif primitive_result_ is None and primitive == "config":
1885 primitive_result = "COMPLETED"
1886 detailed_result = None
1887 break
1888 else: # ("running", "pending", None):
1889 pass
1890 await asyncio.sleep(5, loop=self.loop)
1891 else:
1892 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
1893 if primitive_result == "COMPLETED":
1894 break
1895 retries -= 1
1896 if retries >= 0:
1897 await asyncio.sleep(retries_interval, loop=self.loop)
1898
1899 return primitive_result, detailed_result
1900 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
1901 return "FAILED", str(e)
1902
1903 async def action(self, nsr_id, nslcmop_id):
1904
1905 # Try to lock HA task here
1906 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1907 if not task_is_locked_by_me:
1908 return
1909
1910 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1911 self.logger.debug(logging_text + "Enter")
1912 # get all needed from database
1913 db_nsr = None
1914 db_nslcmop = None
1915 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1916 db_nslcmop_update = {}
1917 nslcmop_operation_state = None
1918 nslcmop_operation_state_detail = None
1919 exc = None
1920 try:
1921 # wait for any previous tasks in process
1922 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1923
1924 step = "Getting information from database"
1925 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1926 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1927
1928 nsr_deployed = db_nsr["_admin"].get("deployed")
1929 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
1930 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1931 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1932 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1933
1934 if vnf_index:
1935 step = "Getting vnfr from database"
1936 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1937 step = "Getting vnfd from database"
1938 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1939 else:
1940 if db_nsr.get("nsd"):
1941 db_nsd = db_nsr.get("nsd") # TODO this will be removed
1942 else:
1943 step = "Getting nsd from database"
1944 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1945
1946 # for backward compatibility
1947 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1948 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1949 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1950 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1951
1952 primitive = db_nslcmop["operationParams"]["primitive"]
1953 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1954
1955 # look for primitive
1956 config_primitive_desc = None
1957 if vdu_id:
1958 for vdu in get_iterable(db_vnfd, "vdu"):
1959 if vdu_id == vdu["id"]:
1960 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
1961 if config_primitive["name"] == primitive:
1962 config_primitive_desc = config_primitive
1963 break
1964 elif vnf_index:
1965 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1966 if config_primitive["name"] == primitive:
1967 config_primitive_desc = config_primitive
1968 break
1969 else:
1970 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
1971 if config_primitive["name"] == primitive:
1972 config_primitive_desc = config_primitive
1973 break
1974
1975 if not config_primitive_desc:
1976 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
1977 format(primitive))
1978
1979 desc_params = {}
1980 if vnf_index:
1981 if db_vnfr.get("additionalParamsForVnf"):
1982 desc_params.update(db_vnfr["additionalParamsForVnf"])
1983 else:
1984 if db_nsr.get("additionalParamsForVnf"):
1985 desc_params.update(db_nsr["additionalParamsForNs"])
1986
1987 # TODO check if ns is in a proper status
1988 result, result_detail = await self._ns_execute_primitive(
1989 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1990 self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
1991 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = result_detail
1992 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1993 db_nslcmop_update["statusEnteredTime"] = time()
1994 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1995 return # database update is called inside finally
1996
1997 except (DbException, LcmException) as e:
1998 self.logger.error(logging_text + "Exit Exception {}".format(e))
1999 exc = e
2000 except asyncio.CancelledError:
2001 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2002 exc = "Operation was cancelled"
2003 except Exception as e:
2004 exc = traceback.format_exc()
2005 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2006 finally:
2007 if exc and db_nslcmop:
2008 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2009 "FAILED {}: {}".format(step, exc)
2010 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2011 db_nslcmop_update["statusEnteredTime"] = time()
2012 try:
2013 if db_nslcmop_update:
2014 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2015 if db_nsr:
2016 db_nsr_update["_admin.nslcmop"] = None
2017 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2018 except DbException as e:
2019 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2020 self.logger.debug(logging_text + "Exit")
2021 if nslcmop_operation_state:
2022 try:
2023 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2024 "operationState": nslcmop_operation_state},
2025 loop=self.loop)
2026 except Exception as e:
2027 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2028 self.logger.debug(logging_text + "Exit")
2029 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2030 return nslcmop_operation_state, nslcmop_operation_state_detail
2031
2032 async def scale(self, nsr_id, nslcmop_id):
2033
2034 # Try to lock HA task here
2035 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2036 if not task_is_locked_by_me:
2037 return
2038
2039 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2040 self.logger.debug(logging_text + "Enter")
2041 # get all needed from database
2042 db_nsr = None
2043 db_nslcmop = None
2044 db_nslcmop_update = {}
2045 nslcmop_operation_state = None
2046 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
2047 exc = None
2048 # in case of error, indicates what part of scale was failed to put nsr at error status
2049 scale_process = None
2050 old_operational_status = ""
2051 old_config_status = ""
2052 vnfr_scaled = False
2053 try:
2054 # wait for any previous tasks in process
2055 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2056
2057 step = "Getting nslcmop from database"
2058 self.logger.debug(step + " after having waited for previous tasks to be completed")
2059 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2060 step = "Getting nsr from database"
2061 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2062
2063 old_operational_status = db_nsr["operational-status"]
2064 old_config_status = db_nsr["config-status"]
2065 step = "Parsing scaling parameters"
2066 # self.logger.debug(step)
2067 db_nsr_update["operational-status"] = "scaling"
2068 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2069 nsr_deployed = db_nsr["_admin"].get("deployed")
2070 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2071 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2072 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2073 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2074 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2075
2076 # for backward compatibility
2077 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2078 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2079 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2080 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2081
2082 step = "Getting vnfr from database"
2083 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2084 step = "Getting vnfd from database"
2085 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2086
2087 step = "Getting scaling-group-descriptor"
2088 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2089 if scaling_descriptor["name"] == scaling_group:
2090 break
2091 else:
2092 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2093 "at vnfd:scaling-group-descriptor".format(scaling_group))
2094
2095 # cooldown_time = 0
2096 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2097 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2098 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2099 # break
2100
2101 # TODO check if ns is in a proper status
2102 step = "Sending scale order to VIM"
2103 nb_scale_op = 0
2104 if not db_nsr["_admin"].get("scaling-group"):
2105 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2106 admin_scale_index = 0
2107 else:
2108 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2109 if admin_scale_info["name"] == scaling_group:
2110 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2111 break
2112 else: # not found, set index one plus last element and add new entry with the name
2113 admin_scale_index += 1
2114 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2115 RO_scaling_info = []
2116 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2117 if scaling_type == "SCALE_OUT":
2118 # count if max-instance-count is reached
2119 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2120 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2121 if nb_scale_op >= max_instance_count:
2122 raise LcmException("reached the limit of {} (max-instance-count) "
2123 "scaling-out operations for the "
2124 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2125
2126 nb_scale_op += 1
2127 vdu_scaling_info["scaling_direction"] = "OUT"
2128 vdu_scaling_info["vdu-create"] = {}
2129 for vdu_scale_info in scaling_descriptor["vdu"]:
2130 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2131 "type": "create", "count": vdu_scale_info.get("count", 1)})
2132 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2133
2134 elif scaling_type == "SCALE_IN":
2135 # count if min-instance-count is reached
2136 min_instance_count = 0
2137 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2138 min_instance_count = int(scaling_descriptor["min-instance-count"])
2139 if nb_scale_op <= min_instance_count:
2140 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2141 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2142 nb_scale_op -= 1
2143 vdu_scaling_info["scaling_direction"] = "IN"
2144 vdu_scaling_info["vdu-delete"] = {}
2145 for vdu_scale_info in scaling_descriptor["vdu"]:
2146 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2147 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2148 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2149
2150 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2151 vdu_create = vdu_scaling_info.get("vdu-create")
2152 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2153 if vdu_scaling_info["scaling_direction"] == "IN":
2154 for vdur in reversed(db_vnfr["vdur"]):
2155 if vdu_delete.get(vdur["vdu-id-ref"]):
2156 vdu_delete[vdur["vdu-id-ref"]] -= 1
2157 vdu_scaling_info["vdu"].append({
2158 "name": vdur["name"],
2159 "vdu_id": vdur["vdu-id-ref"],
2160 "interface": []
2161 })
2162 for interface in vdur["interfaces"]:
2163 vdu_scaling_info["vdu"][-1]["interface"].append({
2164 "name": interface["name"],
2165 "ip_address": interface["ip-address"],
2166 "mac_address": interface.get("mac-address"),
2167 })
2168 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2169
2170 # PRE-SCALE BEGIN
2171 step = "Executing pre-scale vnf-config-primitive"
2172 if scaling_descriptor.get("scaling-config-action"):
2173 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2174 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2175 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2176 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2177 step = db_nslcmop_update["detailed-status"] = \
2178 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2179
2180 # look for primitive
2181 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2182 if config_primitive["name"] == vnf_config_primitive:
2183 break
2184 else:
2185 raise LcmException(
2186 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2187 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2188 "primitive".format(scaling_group, config_primitive))
2189
2190 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2191 if db_vnfr.get("additionalParamsForVnf"):
2192 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2193 scale_process = "VCA"
2194 db_nsr_update["config-status"] = "configuring pre-scaling"
2195 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2196
2197 # Pre-scale reintent check: Check if this sub-operation has been executed before
2198 op_index = self._check_or_add_scale_suboperation(
2199 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2200 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2201 # Skip sub-operation
2202 result = 'COMPLETED'
2203 result_detail = 'Done'
2204 self.logger.debug(logging_text +
2205 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2206 vnf_config_primitive, result, result_detail))
2207 else:
2208 if (op_index == self.SUBOPERATION_STATUS_NEW):
2209 # New sub-operation: Get index of this sub-operation
2210 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2211 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2212 format(vnf_config_primitive))
2213 else:
2214 # Reintent: Get registered params for this existing sub-operation
2215 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2216 vnf_index = op.get('member_vnf_index')
2217 vnf_config_primitive = op.get('primitive')
2218 primitive_params = op.get('primitive_params')
2219 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2220 format(vnf_config_primitive))
2221 # Execute the primitive, either with new (first-time) or registered (reintent) args
2222 result, result_detail = await self._ns_execute_primitive(
2223 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2224 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2225 vnf_config_primitive, result, result_detail))
2226 # Update operationState = COMPLETED | FAILED
2227 self._update_suboperation_status(
2228 db_nslcmop, op_index, result, result_detail)
2229
2230 if result == "FAILED":
2231 raise LcmException(result_detail)
2232 db_nsr_update["config-status"] = old_config_status
2233 scale_process = None
2234 # PRE-SCALE END
2235
2236 # SCALE RO - BEGIN
2237 # Should this block be skipped if 'RO_nsr_id' == None ?
2238 # if (RO_nsr_id and RO_scaling_info):
2239 if RO_scaling_info:
2240 scale_process = "RO"
2241 # Scale RO reintent check: Check if this sub-operation has been executed before
2242 op_index = self._check_or_add_scale_suboperation(
2243 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2244 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2245 # Skip sub-operation
2246 result = 'COMPLETED'
2247 result_detail = 'Done'
2248 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2249 result, result_detail))
2250 else:
2251 if (op_index == self.SUBOPERATION_STATUS_NEW):
2252 # New sub-operation: Get index of this sub-operation
2253 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2254 self.logger.debug(logging_text + "New sub-operation RO")
2255 else:
2256 # Reintent: Get registered params for this existing sub-operation
2257 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2258 RO_nsr_id = op.get('RO_nsr_id')
2259 RO_scaling_info = op.get('RO_scaling_info')
2260 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2261 vnf_config_primitive))
2262
2263 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2264 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2265 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2266 # wait until ready
2267 RO_nslcmop_id = RO_desc["instance_action_id"]
2268 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2269
2270 RO_task_done = False
2271 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2272 detailed_status_old = None
2273 self.logger.debug(logging_text + step)
2274
2275 deployment_timeout = 1 * 3600 # One hour
2276 while deployment_timeout > 0:
2277 if not RO_task_done:
2278 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2279 extra_item_id=RO_nslcmop_id)
2280 ns_status, ns_status_info = self.RO.check_action_status(desc)
2281 if ns_status == "ERROR":
2282 raise ROclient.ROClientException(ns_status_info)
2283 elif ns_status == "BUILD":
2284 detailed_status = step + "; {}".format(ns_status_info)
2285 elif ns_status == "ACTIVE":
2286 RO_task_done = True
2287 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2288 self.logger.debug(logging_text + step)
2289 else:
2290 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2291 else:
2292 if ns_status == "ERROR":
2293 raise ROclient.ROClientException(ns_status_info)
2294 elif ns_status == "BUILD":
2295 detailed_status = step + "; {}".format(ns_status_info)
2296 elif ns_status == "ACTIVE":
2297 step = detailed_status = \
2298 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2299 if not vnfr_scaled:
2300 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2301 vnfr_scaled = True
2302 try:
2303 desc = await self.RO.show("ns", RO_nsr_id)
2304 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2305 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2306 break
2307 except LcmExceptionNoMgmtIP:
2308 pass
2309 else:
2310 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2311 if detailed_status != detailed_status_old:
2312 self._update_suboperation_status(
2313 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2314 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2315 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2316
2317 await asyncio.sleep(5, loop=self.loop)
2318 deployment_timeout -= 5
2319 if deployment_timeout <= 0:
2320 self._update_suboperation_status(
2321 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2322 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2323
2324 # update VDU_SCALING_INFO with the obtained ip_addresses
2325 if vdu_scaling_info["scaling_direction"] == "OUT":
2326 for vdur in reversed(db_vnfr["vdur"]):
2327 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
2328 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
2329 vdu_scaling_info["vdu"].append({
2330 "name": vdur["name"],
2331 "vdu_id": vdur["vdu-id-ref"],
2332 "interface": []
2333 })
2334 for interface in vdur["interfaces"]:
2335 vdu_scaling_info["vdu"][-1]["interface"].append({
2336 "name": interface["name"],
2337 "ip_address": interface["ip-address"],
2338 "mac_address": interface.get("mac-address"),
2339 })
2340 del vdu_scaling_info["vdu-create"]
2341
2342 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
2343 # SCALE RO - END
2344
2345 scale_process = None
2346 if db_nsr_update:
2347 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2348
2349 # POST-SCALE BEGIN
2350 # execute primitive service POST-SCALING
2351 step = "Executing post-scale vnf-config-primitive"
2352 if scaling_descriptor.get("scaling-config-action"):
2353 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2354 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
2355 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
2356 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2357 step = db_nslcmop_update["detailed-status"] = \
2358 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
2359
2360 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2361 if db_vnfr.get("additionalParamsForVnf"):
2362 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2363
2364 # look for primitive
2365 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2366 if config_primitive["name"] == vnf_config_primitive:
2367 break
2368 else:
2369 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
2370 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
2371 "match any vnf-configuration:config-primitive".format(scaling_group,
2372 config_primitive))
2373 scale_process = "VCA"
2374 db_nsr_update["config-status"] = "configuring post-scaling"
2375 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2376
2377 # Post-scale reintent check: Check if this sub-operation has been executed before
2378 op_index = self._check_or_add_scale_suboperation(
2379 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
2380 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2381 # Skip sub-operation
2382 result = 'COMPLETED'
2383 result_detail = 'Done'
2384 self.logger.debug(logging_text +
2385 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
2386 format(vnf_config_primitive, result, result_detail))
2387 else:
2388 if (op_index == self.SUBOPERATION_STATUS_NEW):
2389 # New sub-operation: Get index of this sub-operation
2390 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2391 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2392 format(vnf_config_primitive))
2393 else:
2394 # Reintent: Get registered params for this existing sub-operation
2395 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2396 vnf_index = op.get('member_vnf_index')
2397 vnf_config_primitive = op.get('primitive')
2398 primitive_params = op.get('primitive_params')
2399 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2400 format(vnf_config_primitive))
2401 # Execute the primitive, either with new (first-time) or registered (reintent) args
2402 result, result_detail = await self._ns_execute_primitive(
2403 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2404 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2405 vnf_config_primitive, result, result_detail))
2406 # Update operationState = COMPLETED | FAILED
2407 self._update_suboperation_status(
2408 db_nslcmop, op_index, result, result_detail)
2409
2410 if result == "FAILED":
2411 raise LcmException(result_detail)
2412 db_nsr_update["config-status"] = old_config_status
2413 scale_process = None
2414 # POST-SCALE END
2415
2416 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2417 db_nslcmop_update["statusEnteredTime"] = time()
2418 db_nslcmop_update["detailed-status"] = "done"
2419 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
2420 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
2421 else old_operational_status
2422 db_nsr_update["config-status"] = old_config_status
2423 return
2424 except (ROclient.ROClientException, DbException, LcmException) as e:
2425 self.logger.error(logging_text + "Exit Exception {}".format(e))
2426 exc = e
2427 except asyncio.CancelledError:
2428 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2429 exc = "Operation was cancelled"
2430 except Exception as e:
2431 exc = traceback.format_exc()
2432 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2433 finally:
2434 if exc:
2435 if db_nslcmop:
2436 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2437 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2438 db_nslcmop_update["statusEnteredTime"] = time()
2439 if db_nsr:
2440 db_nsr_update["operational-status"] = old_operational_status
2441 db_nsr_update["config-status"] = old_config_status
2442 db_nsr_update["detailed-status"] = ""
2443 db_nsr_update["_admin.nslcmop"] = None
2444 if scale_process:
2445 if "VCA" in scale_process:
2446 db_nsr_update["config-status"] = "failed"
2447 if "RO" in scale_process:
2448 db_nsr_update["operational-status"] = "failed"
2449 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
2450 exc)
2451 try:
2452 if db_nslcmop and db_nslcmop_update:
2453 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2454 if db_nsr:
2455 db_nsr_update["_admin.nslcmop"] = None
2456 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2457 except DbException as e:
2458 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2459 if nslcmop_operation_state:
2460 try:
2461 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2462 "operationState": nslcmop_operation_state},
2463 loop=self.loop)
2464 # if cooldown_time:
2465 # await asyncio.sleep(cooldown_time, loop=self.loop)
2466 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
2467 except Exception as e:
2468 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2469 self.logger.debug(logging_text + "Exit")
2470 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")