Suboperations for scale-out and scale-in
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed, NetworkServiceDoesNotExist, PrimitiveDoesNotExist
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 def deep_get(target_dict, key_list):
71 """
72 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
73 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
74 :param target_dict: dictionary to be read
75 :param key_list: list of keys to read from target_dict
76 :return: The wanted value if exist, None otherwise
77 """
78 for key in key_list:
79 if not isinstance(target_dict, dict) or key not in target_dict:
80 return None
81 target_dict = target_dict[key]
82 return target_dict
83
84
85 class NsLcm(LcmBase):
86 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
87 total_deploy_timeout = 2 * 3600 # global timeout for deployment
88 timeout_charm_delete = 10 * 60
89 timeout_primitive = 10 * 60 # timeout for primitive execution
90
91 SUBOPERATION_STATUS_NOT_FOUND = -1
92 SUBOPERATION_STATUS_NEW = -2
93 SUBOPERATION_STATUS_SKIP = -3
94
95 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
96 """
97 Init, Connect to database, filesystem storage, and messaging
98 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
99 :return: None
100 """
101 # logging
102 self.logger = logging.getLogger('lcm.ns')
103 self.loop = loop
104 self.lcm_tasks = lcm_tasks
105
106 super().__init__(db, msg, fs, self.logger)
107
108 self.ro_config = ro_config
109
110 self.n2vc = N2VC(
111 log=self.logger,
112 server=vca_config['host'],
113 port=vca_config['port'],
114 user=vca_config['user'],
115 secret=vca_config['secret'],
116 # TODO: This should point to the base folder where charms are stored,
117 # if there is a common one (like object storage). Otherwise, leave
118 # it unset and pass it via DeployCharms
119 # artifacts=vca_config[''],
120 artifacts=None,
121 juju_public_key=vca_config.get('pubkey'),
122 ca_cert=vca_config.get('cacert'),
123 api_proxy=vca_config.get('apiproxy')
124 )
125 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
126
127 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
128 """
129 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
130 :param vnfd: input vnfd
131 :param new_id: overrides vnf id if provided
132 :param additionalParams: Instantiation params for VNFs provided
133 :param nsrId: Id of the NSR
134 :return: copy of vnfd
135 """
136 try:
137 vnfd_RO = deepcopy(vnfd)
138 # remove unused by RO configuration, monitoring, scaling and internal keys
139 vnfd_RO.pop("_id", None)
140 vnfd_RO.pop("_admin", None)
141 vnfd_RO.pop("vnf-configuration", None)
142 vnfd_RO.pop("monitoring-param", None)
143 vnfd_RO.pop("scaling-group-descriptor", None)
144 if new_id:
145 vnfd_RO["id"] = new_id
146
147 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
148 for vdu in get_iterable(vnfd_RO, "vdu"):
149 cloud_init_file = None
150 if vdu.get("cloud-init-file"):
151 base_folder = vnfd["_admin"]["storage"]
152 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
153 vdu["cloud-init-file"])
154 with self.fs.file_open(cloud_init_file, "r") as ci_file:
155 cloud_init_content = ci_file.read()
156 vdu.pop("cloud-init-file", None)
157 elif vdu.get("cloud-init"):
158 cloud_init_content = vdu["cloud-init"]
159 else:
160 continue
161
162 env = Environment()
163 ast = env.parse(cloud_init_content)
164 mandatory_vars = meta.find_undeclared_variables(ast)
165 if mandatory_vars:
166 for var in mandatory_vars:
167 if not additionalParams or var not in additionalParams.keys():
168 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
169 "file, must be provided in the instantiation parameters inside the "
170 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
171 template = Template(cloud_init_content)
172 cloud_init_content = template.render(additionalParams or {})
173 vdu["cloud-init"] = cloud_init_content
174
175 return vnfd_RO
176 except FsException as e:
177 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
178 format(vnfd["id"], vdu["id"], cloud_init_file, e))
179 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
180 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
181 format(vnfd["id"], vdu["id"], e))
182
183 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
184 """
185 Callback both for charm status change and task completion
186 :param model_name: Charm model name
187 :param application_name: Charm application name
188 :param status: Can be
189 - blocked: The unit needs manual intervention
190 - maintenance: The unit is actively deploying/configuring
191 - waiting: The unit is waiting for another charm to be ready
192 - active: The unit is deployed, configured, and ready
193 - error: The charm has failed and needs attention.
194 - terminated: The charm has been destroyed
195 - removing,
196 - removed
197 :param message: detailed message error
198 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
199 nsr_id:
200 nslcmop_id:
201 lcmOperationType: currently "instantiate"
202 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
203 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
204 n2vc_event: event used to notify instantiation task that some change has been produced
205 :param task: None for charm status change, or task for completion task callback
206 :return:
207 """
208 try:
209 nsr_id = n2vc_info["nsr_id"]
210 deployed = n2vc_info["deployed"]
211 db_nsr_update = n2vc_info["db_update"]
212 nslcmop_id = n2vc_info["nslcmop_id"]
213 ns_operation = n2vc_info["lcmOperationType"]
214 n2vc_event = n2vc_info["n2vc_event"]
215 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
216 application_name)
217 for vca_index, vca_deployed in enumerate(deployed):
218 if not vca_deployed:
219 continue
220 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
221 break
222 else:
223 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA. Received model_name={}".
224 format(model_name))
225 return
226 if task:
227 if task.cancelled():
228 self.logger.debug(logging_text + " task Cancelled")
229 vca_deployed['operational-status'] = "error"
230 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
231 vca_deployed['detailed-status'] = "Task Cancelled"
232 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
233
234 elif task.done():
235 exc = task.exception()
236 if exc:
237 self.logger.error(logging_text + " task Exception={}".format(exc))
238 vca_deployed['operational-status'] = "error"
239 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
240 vca_deployed['detailed-status'] = str(exc)
241 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
242 else:
243 self.logger.debug(logging_text + " task Done")
244 # task is Done, but callback is still ongoing. So ignore
245 return
246 elif status:
247 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
248 if vca_deployed['operational-status'] == status:
249 return # same status, ignore
250 vca_deployed['operational-status'] = status
251 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
252 vca_deployed['detailed-status'] = str(message)
253 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
254 else:
255 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
256 return
257 # wake up instantiate task
258 n2vc_event.set()
259 except Exception as e:
260 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
261
262 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
263 """
264 Creates a RO ns descriptor from OSM ns_instantiate params
265 :param ns_params: OSM instantiate params
266 :return: The RO ns descriptor
267 """
268 vim_2_RO = {}
269 wim_2_RO = {}
270 # TODO feature 1417: Check that no instantiation is set over PDU
271 # check if PDU forces a concrete vim-network-id and add it
272 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
273
274 def vim_account_2_RO(vim_account):
275 if vim_account in vim_2_RO:
276 return vim_2_RO[vim_account]
277
278 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
279 if db_vim["_admin"]["operationalState"] != "ENABLED":
280 raise LcmException("VIM={} is not available. operationalState={}".format(
281 vim_account, db_vim["_admin"]["operationalState"]))
282 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
283 vim_2_RO[vim_account] = RO_vim_id
284 return RO_vim_id
285
286 def wim_account_2_RO(wim_account):
287 if isinstance(wim_account, str):
288 if wim_account in wim_2_RO:
289 return wim_2_RO[wim_account]
290
291 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
292 if db_wim["_admin"]["operationalState"] != "ENABLED":
293 raise LcmException("WIM={} is not available. operationalState={}".format(
294 wim_account, db_wim["_admin"]["operationalState"]))
295 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
296 wim_2_RO[wim_account] = RO_wim_id
297 return RO_wim_id
298 else:
299 return wim_account
300
301 def ip_profile_2_RO(ip_profile):
302 RO_ip_profile = deepcopy((ip_profile))
303 if "dns-server" in RO_ip_profile:
304 if isinstance(RO_ip_profile["dns-server"], list):
305 RO_ip_profile["dns-address"] = []
306 for ds in RO_ip_profile.pop("dns-server"):
307 RO_ip_profile["dns-address"].append(ds['address'])
308 else:
309 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
310 if RO_ip_profile.get("ip-version") == "ipv4":
311 RO_ip_profile["ip-version"] = "IPv4"
312 if RO_ip_profile.get("ip-version") == "ipv6":
313 RO_ip_profile["ip-version"] = "IPv6"
314 if "dhcp-params" in RO_ip_profile:
315 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
316 return RO_ip_profile
317
318 if not ns_params:
319 return None
320 RO_ns_params = {
321 # "name": ns_params["nsName"],
322 # "description": ns_params.get("nsDescription"),
323 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
324 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
325 # "scenario": ns_params["nsdId"],
326 }
327 n2vc_key_list = n2vc_key_list or []
328 for vnfd_ref, vnfd in vnfd_dict.items():
329 vdu_needed_access = []
330 mgmt_cp = None
331 if vnfd.get("vnf-configuration"):
332 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
333 if ssh_required and vnfd.get("mgmt-interface"):
334 if vnfd["mgmt-interface"].get("vdu-id"):
335 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
336 elif vnfd["mgmt-interface"].get("cp"):
337 mgmt_cp = vnfd["mgmt-interface"]["cp"]
338
339 for vdu in vnfd.get("vdu", ()):
340 if vdu.get("vdu-configuration"):
341 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
342 if ssh_required:
343 vdu_needed_access.append(vdu["id"])
344 elif mgmt_cp:
345 for vdu_interface in vdu.get("interface"):
346 if vdu_interface.get("external-connection-point-ref") and \
347 vdu_interface["external-connection-point-ref"] == mgmt_cp:
348 vdu_needed_access.append(vdu["id"])
349 mgmt_cp = None
350 break
351
352 if vdu_needed_access:
353 for vnf_member in nsd.get("constituent-vnfd"):
354 if vnf_member["vnfd-id-ref"] != vnfd_ref:
355 continue
356 for vdu in vdu_needed_access:
357 populate_dict(RO_ns_params,
358 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
359 n2vc_key_list)
360
361 if ns_params.get("vduImage"):
362 RO_ns_params["vduImage"] = ns_params["vduImage"]
363
364 if ns_params.get("ssh_keys"):
365 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
366 for vnf_params in get_iterable(ns_params, "vnf"):
367 for constituent_vnfd in nsd["constituent-vnfd"]:
368 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
369 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
370 break
371 else:
372 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
373 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
374 if vnf_params.get("vimAccountId"):
375 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
376 vim_account_2_RO(vnf_params["vimAccountId"]))
377
378 for vdu_params in get_iterable(vnf_params, "vdu"):
379 # TODO feature 1417: check that this VDU exist and it is not a PDU
380 if vdu_params.get("volume"):
381 for volume_params in vdu_params["volume"]:
382 if volume_params.get("vim-volume-id"):
383 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
384 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
385 volume_params["vim-volume-id"])
386 if vdu_params.get("interface"):
387 for interface_params in vdu_params["interface"]:
388 if interface_params.get("ip-address"):
389 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
390 vdu_params["id"], "interfaces", interface_params["name"],
391 "ip_address"),
392 interface_params["ip-address"])
393 if interface_params.get("mac-address"):
394 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
395 vdu_params["id"], "interfaces", interface_params["name"],
396 "mac_address"),
397 interface_params["mac-address"])
398 if interface_params.get("floating-ip-required"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_params["id"], "interfaces", interface_params["name"],
401 "floating-ip"),
402 interface_params["floating-ip-required"])
403
404 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
405 if internal_vld_params.get("vim-network-name"):
406 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
407 internal_vld_params["name"], "vim-network-name"),
408 internal_vld_params["vim-network-name"])
409 if internal_vld_params.get("vim-network-id"):
410 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
411 internal_vld_params["name"], "vim-network-id"),
412 internal_vld_params["vim-network-id"])
413 if internal_vld_params.get("ip-profile"):
414 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
415 internal_vld_params["name"], "ip-profile"),
416 ip_profile_2_RO(internal_vld_params["ip-profile"]))
417
418 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
419 # look for interface
420 iface_found = False
421 for vdu_descriptor in vnf_descriptor["vdu"]:
422 for vdu_interface in vdu_descriptor["interface"]:
423 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
424 if icp_params.get("ip-address"):
425 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
426 vdu_descriptor["id"], "interfaces",
427 vdu_interface["name"], "ip_address"),
428 icp_params["ip-address"])
429
430 if icp_params.get("mac-address"):
431 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
432 vdu_descriptor["id"], "interfaces",
433 vdu_interface["name"], "mac_address"),
434 icp_params["mac-address"])
435 iface_found = True
436 break
437 if iface_found:
438 break
439 else:
440 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
441 "internal-vld:id-ref={} is not present at vnfd:internal-"
442 "connection-point".format(vnf_params["member-vnf-index"],
443 icp_params["id-ref"]))
444
445 for vld_params in get_iterable(ns_params, "vld"):
446 if "ip-profile" in vld_params:
447 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
448 ip_profile_2_RO(vld_params["ip-profile"]))
449
450 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
451 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
452 wim_account_2_RO(vld_params["wimAccountId"])),
453 if vld_params.get("vim-network-name"):
454 RO_vld_sites = []
455 if isinstance(vld_params["vim-network-name"], dict):
456 for vim_account, vim_net in vld_params["vim-network-name"].items():
457 RO_vld_sites.append({
458 "netmap-use": vim_net,
459 "datacenter": vim_account_2_RO(vim_account)
460 })
461 else: # isinstance str
462 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
463 if RO_vld_sites:
464 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
465 if vld_params.get("vim-network-id"):
466 RO_vld_sites = []
467 if isinstance(vld_params["vim-network-id"], dict):
468 for vim_account, vim_net in vld_params["vim-network-id"].items():
469 RO_vld_sites.append({
470 "netmap-use": vim_net,
471 "datacenter": vim_account_2_RO(vim_account)
472 })
473 else: # isinstance str
474 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
475 if RO_vld_sites:
476 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
477 if vld_params.get("ns-net"):
478 if isinstance(vld_params["ns-net"], dict):
479 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
480 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
481 if RO_vld_ns_net:
482 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
483 if "vnfd-connection-point-ref" in vld_params:
484 for cp_params in vld_params["vnfd-connection-point-ref"]:
485 # look for interface
486 for constituent_vnfd in nsd["constituent-vnfd"]:
487 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
488 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
489 break
490 else:
491 raise LcmException(
492 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
493 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
494 match_cp = False
495 for vdu_descriptor in vnf_descriptor["vdu"]:
496 for interface_descriptor in vdu_descriptor["interface"]:
497 if interface_descriptor.get("external-connection-point-ref") == \
498 cp_params["vnfd-connection-point-ref"]:
499 match_cp = True
500 break
501 if match_cp:
502 break
503 else:
504 raise LcmException(
505 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
506 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
507 cp_params["member-vnf-index-ref"],
508 cp_params["vnfd-connection-point-ref"],
509 vnf_descriptor["id"]))
510 if cp_params.get("ip-address"):
511 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
512 vdu_descriptor["id"], "interfaces",
513 interface_descriptor["name"], "ip_address"),
514 cp_params["ip-address"])
515 if cp_params.get("mac-address"):
516 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
517 vdu_descriptor["id"], "interfaces",
518 interface_descriptor["name"], "mac_address"),
519 cp_params["mac-address"])
520 return RO_ns_params
521
522 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
523 # make a copy to do not change
524 vdu_create = copy(vdu_create)
525 vdu_delete = copy(vdu_delete)
526
527 vdurs = db_vnfr.get("vdur")
528 if vdurs is None:
529 vdurs = []
530 vdu_index = len(vdurs)
531 while vdu_index:
532 vdu_index -= 1
533 vdur = vdurs[vdu_index]
534 if vdur.get("pdu-type"):
535 continue
536 vdu_id_ref = vdur["vdu-id-ref"]
537 if vdu_create and vdu_create.get(vdu_id_ref):
538 for index in range(0, vdu_create[vdu_id_ref]):
539 vdur = deepcopy(vdur)
540 vdur["_id"] = str(uuid4())
541 vdur["count-index"] += 1
542 vdurs.insert(vdu_index+1+index, vdur)
543 del vdu_create[vdu_id_ref]
544 if vdu_delete and vdu_delete.get(vdu_id_ref):
545 del vdurs[vdu_index]
546 vdu_delete[vdu_id_ref] -= 1
547 if not vdu_delete[vdu_id_ref]:
548 del vdu_delete[vdu_id_ref]
549 # check all operations are done
550 if vdu_create or vdu_delete:
551 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
552 vdu_create))
553 if vdu_delete:
554 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
555 vdu_delete))
556
557 vnfr_update = {"vdur": vdurs}
558 db_vnfr["vdur"] = vdurs
559 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
560
561 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
562 """
563 Updates database nsr with the RO info for the created vld
564 :param ns_update_nsr: dictionary to be filled with the updated info
565 :param db_nsr: content of db_nsr. This is also modified
566 :param nsr_desc_RO: nsr descriptor from RO
567 :return: Nothing, LcmException is raised on errors
568 """
569
570 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
571 for net_RO in get_iterable(nsr_desc_RO, "nets"):
572 if vld["id"] != net_RO.get("ns_net_osm_id"):
573 continue
574 vld["vim-id"] = net_RO.get("vim_net_id")
575 vld["name"] = net_RO.get("vim_name")
576 vld["status"] = net_RO.get("status")
577 vld["status-detailed"] = net_RO.get("error_msg")
578 ns_update_nsr["vld.{}".format(vld_index)] = vld
579 break
580 else:
581 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
582
583 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
584 """
585 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
586 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
587 :param nsr_desc_RO: nsr descriptor from RO
588 :return: Nothing, LcmException is raised on errors
589 """
590 for vnf_index, db_vnfr in db_vnfrs.items():
591 for vnf_RO in nsr_desc_RO["vnfs"]:
592 if vnf_RO["member_vnf_index"] != vnf_index:
593 continue
594 vnfr_update = {}
595 if vnf_RO.get("ip_address"):
596 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
597 elif not db_vnfr.get("ip-address"):
598 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
599
600 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
601 vdur_RO_count_index = 0
602 if vdur.get("pdu-type"):
603 continue
604 for vdur_RO in get_iterable(vnf_RO, "vms"):
605 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
606 continue
607 if vdur["count-index"] != vdur_RO_count_index:
608 vdur_RO_count_index += 1
609 continue
610 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
611 if vdur_RO.get("ip_address"):
612 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
613 else:
614 vdur["ip-address"] = None
615 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
616 vdur["name"] = vdur_RO.get("vim_name")
617 vdur["status"] = vdur_RO.get("status")
618 vdur["status-detailed"] = vdur_RO.get("error_msg")
619 for ifacer in get_iterable(vdur, "interfaces"):
620 for interface_RO in get_iterable(vdur_RO, "interfaces"):
621 if ifacer["name"] == interface_RO.get("internal_name"):
622 ifacer["ip-address"] = interface_RO.get("ip_address")
623 ifacer["mac-address"] = interface_RO.get("mac_address")
624 break
625 else:
626 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
627 "from VIM info".format(vnf_index, vdur["vdu-id-ref"],
628 ifacer["name"]))
629 vnfr_update["vdur.{}".format(vdu_index)] = vdur
630 break
631 else:
632 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
633 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
634
635 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
636 for net_RO in get_iterable(nsr_desc_RO, "nets"):
637 if vld["id"] != net_RO.get("vnf_net_osm_id"):
638 continue
639 vld["vim-id"] = net_RO.get("vim_net_id")
640 vld["name"] = net_RO.get("vim_name")
641 vld["status"] = net_RO.get("status")
642 vld["status-detailed"] = net_RO.get("error_msg")
643 vnfr_update["vld.{}".format(vld_index)] = vld
644 break
645 else:
646 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
647 vnf_index, vld["id"]))
648
649 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
650 break
651
652 else:
653 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
654
655 async def instantiate(self, nsr_id, nslcmop_id):
656
657 # Try to lock HA task here
658 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
659 if not task_is_locked_by_me:
660 return
661
662 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
663 self.logger.debug(logging_text + "Enter")
664 # get all needed from database
665 start_deploy = time()
666 db_nsr = None
667 db_nslcmop = None
668 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
669 db_nslcmop_update = {}
670 nslcmop_operation_state = None
671 db_vnfrs = {}
672 RO_descriptor_number = 0 # number of descriptors created at RO
673 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
674 n2vc_info = {}
675 n2vc_key_list = [] # list of public keys to be injected as authorized to VMs
676 exc = None
677 try:
678 # wait for any previous tasks in process
679 step = "Waiting for previous tasks"
680 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
681
682 step = "Getting nslcmop={} from db".format(nslcmop_id)
683 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
684 step = "Getting nsr={} from db".format(nsr_id)
685 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
686 ns_params = db_nslcmop.get("operationParams")
687 nsd = db_nsr["nsd"]
688 nsr_name = db_nsr["name"] # TODO short-name??
689
690 step = "Getting vnfrs from db"
691 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
692 db_vnfds_ref = {}
693 db_vnfds = {}
694 db_vnfds_index = {}
695 for vnfr in db_vnfrs_list:
696 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
697 vnfd_id = vnfr["vnfd-id"]
698 vnfd_ref = vnfr["vnfd-ref"]
699 if vnfd_id not in db_vnfds:
700 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
701 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
702 db_vnfds_ref[vnfd_ref] = vnfd
703 db_vnfds[vnfd_id] = vnfd
704 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id]
705
706 # Get or generates the _admin.deployed,VCA list
707 vca_deployed_list = None
708 vca_model_name = None
709 if db_nsr["_admin"].get("deployed"):
710 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
711 vca_model_name = db_nsr["_admin"]["deployed"].get("VCA-model-name")
712 if vca_deployed_list is None:
713 vca_deployed_list = []
714 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
715 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
716 elif isinstance(vca_deployed_list, dict):
717 # maintain backward compatibility. Change a dict to list at database
718 vca_deployed_list = list(vca_deployed_list.values())
719 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
720 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
721
722 db_nsr_update["detailed-status"] = "creating"
723 db_nsr_update["operational-status"] = "init"
724 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
725 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
726 db_nsr_update["_admin.deployed.RO.vnfd"] = []
727
728 # set state to INSTANTIATED. When instantiated NBI will not delete directly
729 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
730 self.update_db_2("nsrs", nsr_id, db_nsr_update)
731
732 # Deploy charms
733 # The parameters we'll need to deploy a charm
734 number_to_configure = 0
735
736 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info, native_charm=False):
737 """An inner function to deploy the charm from either ns, vnf or vdu
738 For ns both vnf_index and vdu_id are None.
739 For vnf only vdu_id is None
740 For vdu both vnf_index and vdu_id contain a value
741 """
742 # if not charm_params.get("rw_mgmt_ip") and vnf_index: # if NS skip mgmt_ip checking
743 # raise LcmException("ns/vnfd/vdu has not management ip address to configure it")
744
745 machine_spec = {}
746 if native_charm:
747 machine_spec["username"] = charm_params.get("username")
748 machine_spec["hostname"] = charm_params.get("rw_mgmt_ip")
749
750 # Note: The charm needs to exist on disk at the location
751 # specified by charm_path.
752 descriptor = vnfd if vnf_index else nsd
753 base_folder = descriptor["_admin"]["storage"]
754 storage_params = self.fs.get_params()
755 charm_path = "{}{}/{}/charms/{}".format(
756 storage_params["path"],
757 base_folder["folder"],
758 base_folder["pkg-dir"],
759 proxy_charm
760 )
761
762 # ns_name will be ignored in the current version of N2VC
763 # but will be implemented for the next point release.
764 model_name = nsr_id
765 vdu_id_text = (str(vdu_id) if vdu_id else "") + "-"
766 vnf_index_text = (str(vnf_index) if vnf_index else "") + "-"
767 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index_text, vdu_id_text)
768
769 vca_index = len(vca_deployed_list)
770 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
771 # 26*26 charm in the same NS
772 application_name = application_name[0:48]
773 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
774 vca_deployed_ = {
775 "member-vnf-index": vnf_index,
776 "vdu_id": vdu_id,
777 "model": model_name,
778 "application": application_name,
779 "operational-status": "init",
780 "detailed-status": "",
781 "step": "initial-deploy",
782 "vnfd_id": vnfd_id,
783 "vdu_name": vdu_name,
784 "vdu_count_index": vdu_count_index,
785 }
786 vca_deployed_list.append(vca_deployed_)
787 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
788 self.update_db_2("nsrs", nsr_id, db_nsr_update)
789
790 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
791 proxy_charm))
792 if not n2vc_info:
793 n2vc_info["nsr_id"] = nsr_id
794 n2vc_info["nslcmop_id"] = nslcmop_id
795 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
796 n2vc_info["lcmOperationType"] = "instantiate"
797 n2vc_info["deployed"] = vca_deployed_list
798 n2vc_info["db_update"] = db_nsr_update
799 task = asyncio.ensure_future(
800 self.n2vc.DeployCharms(
801 model_name, # The network service name
802 application_name, # The application name
803 descriptor, # The vnf/nsd descriptor
804 charm_path, # Path to charm
805 charm_params, # Runtime params, like mgmt ip
806 machine_spec, # for native charms only
807 self.n2vc_callback, # Callback for status changes
808 n2vc_info, # Callback parameter
809 None, # Callback parameter (task)
810 )
811 )
812 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
813 n2vc_info))
814 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
815
816 step = "Looking for needed vnfd to configure with proxy charm"
817 self.logger.debug(logging_text + step)
818
819 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
820 vnfd_id = c_vnf["vnfd-id-ref"]
821 vnf_index = str(c_vnf["member-vnf-index"])
822 vnfd = db_vnfds_ref[vnfd_id]
823
824 # Get additional parameters
825 vnfr_params = {}
826 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
827 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
828 for k, v in vnfr_params.items():
829 if isinstance(v, str) and v.startswith("!!yaml "):
830 vnfr_params[k] = yaml.safe_load(v[7:])
831
832 step = "deploying proxy charms for configuration"
833 # Check if this VNF has a charm configuration
834 vnf_config = vnfd.get("vnf-configuration")
835 if vnf_config and vnf_config.get("juju"):
836 proxy_charm = vnf_config["juju"]["charm"]
837 if vnf_config["juju"].get("proxy") is False:
838 # native_charm, will be deployed after VM. Skip
839 proxy_charm = None
840
841 if proxy_charm:
842 if not vca_model_name:
843 step = "creating VCA model name '{}'".format(nsr_id)
844 self.logger.debug(logging_text + step)
845 await self.n2vc.CreateNetworkService(nsr_id)
846 vca_model_name = nsr_id
847 db_nsr_update["_admin.deployed.VCA-model-name"] = nsr_id
848 self.update_db_2("nsrs", nsr_id, db_nsr_update)
849 step = "deploying proxy charm to configure vnf {}".format(vnf_index)
850 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
851 charm_params = {
852 "user_values": vnfr_params,
853 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
854 "initial-config-primitive": {} # vnf_config.get('initial-config-primitive') or {}
855 }
856
857 # Login to the VCA. If there are multiple calls to login(),
858 # subsequent calls will be a nop and return immediately.
859 await self.n2vc.login()
860
861 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
862 number_to_configure += 1
863
864 # Deploy charms for each VDU that supports one.
865 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
866 vdu_config = vdu.get('vdu-configuration')
867 proxy_charm = None
868
869 if vdu_config and vdu_config.get("juju"):
870 proxy_charm = vdu_config["juju"]["charm"]
871 if vdu_config["juju"].get("proxy") is False:
872 # native_charm, will be deployed after VM. Skip
873 proxy_charm = None
874 if proxy_charm:
875 if not vca_model_name:
876 step = "creating VCA model name"
877 await self.n2vc.CreateNetworkService(nsr_id)
878 vca_model_name = nsr_id
879 db_nsr_update["_admin.deployed.VCA-model-name"] = nsr_id
880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
881 step = "deploying proxy charm to configure member_vnf_index={} vdu={}".format(vnf_index,
882 vdu["id"])
883 await self.n2vc.login()
884 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
885 # TODO for the moment only first vdu_id contains a charm deployed
886 if vdur["vdu-id-ref"] != vdu["id"]:
887 raise LcmException("Mismatch vdur {}, vdu {} at index {} for member_vnf_index={}"
888 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
889 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
890 charm_params = {
891 "user_values": vnfr_params,
892 "rw_mgmt_ip": vdur["ip-address"],
893 "initial-config-primitive": {} # vdu_config.get('initial-config-primitive') or {}
894 }
895 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
896 charm_params, n2vc_info)
897 number_to_configure += 1
898
899 # Check if this NS has a charm configuration
900
901 ns_config = nsd.get("ns-configuration")
902 if ns_config and ns_config.get("juju"):
903 proxy_charm = ns_config["juju"]["charm"]
904 if ns_config["juju"].get("proxy") is False:
905 # native_charm, will be deployed after VM. Skip
906 proxy_charm = None
907 if proxy_charm:
908 step = "deploying proxy charm to configure ns"
909 # TODO is NS magmt IP address needed?
910
911 # Get additional parameters
912 additional_params = {}
913 if db_nsr.get("additionalParamsForNs"):
914 additional_params = db_nsr["additionalParamsForNs"].copy()
915 for k, v in additional_params.items():
916 if isinstance(v, str) and v.startswith("!!yaml "):
917 additional_params[k] = yaml.safe_load(v[7:])
918
919 # additional_params["rw_mgmt_ip"] = db_nsr["ip-address"]
920 charm_params = {
921 "user_values": additional_params,
922 # "rw_mgmt_ip": db_nsr["ip-address"],
923 "initial-config-primitive": {} # ns_config.get('initial-config-primitive') or {}
924 }
925
926 # Login to the VCA. If there are multiple calls to login(),
927 # subsequent calls will be a nop and return immediately.
928 await self.n2vc.login()
929 deploy_charm(None, None, None, None, charm_params, n2vc_info)
930 number_to_configure += 1
931
932 db_nsr_update["operational-status"] = "running"
933
934 # Wait until all charms has reached blocked or active status
935 step = "waiting proxy charms to be ready"
936 if number_to_configure:
937 # wait until all charms are configured.
938 # steps are:
939 # initial-deploy
940 # get-ssh-public-key
941 # generate-ssh-key
942 # retry-get-ssh-public-key
943 # ssh-public-key-obtained
944 while time() <= start_deploy + self.total_deploy_timeout:
945 if db_nsr_update:
946 self.update_db_2("nsrs", nsr_id, db_nsr_update)
947 if db_nslcmop_update:
948 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
949
950 all_active = True
951 for vca_index, vca_deployed in enumerate(vca_deployed_list):
952 database_entry = "_admin.deployed.VCA.{}.".format(vca_index)
953 if vca_deployed["step"] == "initial-deploy":
954 if vca_deployed["operational-status"] in ("active", "blocked"):
955 step = "execute charm primitive get-ssh-public-key for member_vnf_index={} vdu_id={}" \
956 .format(vca_deployed["member-vnf-index"],
957 vca_deployed["vdu_id"])
958 self.logger.debug(logging_text + step)
959 try:
960 primitive_id = await self.n2vc.ExecutePrimitive(
961 vca_deployed["model"],
962 vca_deployed["application"],
963 "get-ssh-public-key",
964 None,
965 )
966 vca_deployed["step"] = db_nsr_update[database_entry + "step"] = "get-ssh-public-key"
967 vca_deployed["primitive_id"] = db_nsr_update[database_entry + "primitive_id"] =\
968 primitive_id
969 db_nsr_update[database_entry + "operational-status"] =\
970 vca_deployed["operational-status"]
971 except PrimitiveDoesNotExist:
972 ssh_public_key = None
973 vca_deployed["step"] = db_nsr_update[database_entry + "step"] =\
974 "ssh-public-key-obtained"
975 vca_deployed["ssh-public-key"] = db_nsr_update[database_entry + "ssh-public-key"] =\
976 ssh_public_key
977 step = "charm ssh-public-key for member_vnf_index={} vdu_id={} not needed".format(
978 vca_deployed["member-vnf-index"], vca_deployed["vdu_id"])
979 self.logger.debug(logging_text + step)
980
981 elif vca_deployed["step"] in ("get-ssh-public-key", "retry-get-ssh-public-key"):
982 primitive_id = vca_deployed["primitive_id"]
983 primitive_status = await self.n2vc.GetPrimitiveStatus(vca_deployed["model"],
984 primitive_id)
985 if primitive_status in ("completed", "failed"):
986 primitive_result = await self.n2vc.GetPrimitiveOutput(vca_deployed["model"],
987 primitive_id)
988 vca_deployed["primitive_id"] = db_nsr_update[database_entry + "primitive_id"] = None
989 if primitive_status == "completed" and isinstance(primitive_result, dict) and \
990 primitive_result.get("pubkey"):
991 ssh_public_key = primitive_result.get("pubkey")
992 vca_deployed["step"] = db_nsr_update[database_entry + "step"] =\
993 "ssh-public-key-obtained"
994 vca_deployed["ssh-public-key"] = db_nsr_update[database_entry + "ssh-public-key"] =\
995 ssh_public_key
996 n2vc_key_list.append(ssh_public_key)
997 step = "charm ssh-public-key for member_vnf_index={} vdu_id={} is '{}'".format(
998 vca_deployed["member-vnf-index"], vca_deployed["vdu_id"], ssh_public_key)
999 self.logger.debug(logging_text + step)
1000 else: # primitive_status == "failed":
1001 if vca_deployed["step"] == "get-ssh-public-key":
1002 step = "execute charm primitive generate-ssh-public-key for member_vnf_index="\
1003 "{} vdu_id={}".format(vca_deployed["member-vnf-index"],
1004 vca_deployed["vdu_id"])
1005 self.logger.debug(logging_text + step)
1006 vca_deployed["step"] = db_nsr_update[database_entry + "step"] =\
1007 "generate-ssh-key"
1008 primitive_id = await self.n2vc.ExecutePrimitive(
1009 vca_deployed["model"],
1010 vca_deployed["application"],
1011 "generate-ssh-key",
1012 None,
1013 )
1014 vca_deployed["primitive_id"] = db_nsr_update[database_entry + "primitive_id"] =\
1015 primitive_id
1016 else: # failed for second time
1017 raise LcmException(
1018 "error executing primitive get-ssh-public-key: {}".format(primitive_result))
1019
1020 elif vca_deployed["step"] == "generate-ssh-key":
1021 primitive_id = vca_deployed["primitive_id"]
1022 primitive_status = await self.n2vc.GetPrimitiveStatus(vca_deployed["model"],
1023 primitive_id)
1024 if primitive_status in ("completed", "failed"):
1025 primitive_result = await self.n2vc.GetPrimitiveOutput(vca_deployed["model"],
1026 primitive_id)
1027 vca_deployed["primitive_id"] = db_nsr_update[
1028 database_entry + "primitive_id"] = None
1029 if primitive_status == "completed":
1030 step = "execute primitive get-ssh-public-key again for member_vnf_index={} "\
1031 "vdu_id={}".format(vca_deployed["member-vnf-index"],
1032 vca_deployed["vdu_id"])
1033 self.logger.debug(logging_text + step)
1034 vca_deployed["step"] = db_nsr_update[database_entry + "step"] = \
1035 "retry-get-ssh-public-key"
1036 primitive_id = await self.n2vc.ExecutePrimitive(
1037 vca_deployed["model"],
1038 vca_deployed["application"],
1039 "get-ssh-public-key",
1040 None,
1041 )
1042 vca_deployed["primitive_id"] = db_nsr_update[database_entry + "primitive_id"] =\
1043 primitive_id
1044
1045 else: # primitive_status == "failed":
1046 raise LcmException("error executing primitive generate-ssh-key: {}"
1047 .format(primitive_result))
1048
1049 if vca_deployed["step"] != "ssh-public-key-obtained":
1050 all_active = False
1051
1052 if all_active:
1053 break
1054 await asyncio.sleep(5)
1055 else: # total_deploy_timeout
1056 raise LcmException("Timeout waiting charm to be initialized for member_vnf_index={} vdu_id={}"
1057 .format(vca_deployed["member-vnf-index"], vca_deployed["vdu_id"]))
1058
1059 # deploy RO
1060 # get vnfds, instantiate at RO
1061 for c_vnf in nsd.get("constituent-vnfd", ()):
1062 member_vnf_index = c_vnf["member-vnf-index"]
1063 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1064 vnfd_ref = vnfd["id"]
1065 step = db_nsr_update["detailed-status"] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(
1066 vnfd_ref, member_vnf_index)
1067 # self.logger.debug(logging_text + step)
1068 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1069 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1070 RO_descriptor_number += 1
1071
1072 # look position at deployed.RO.vnfd if not present it will be appended at the end
1073 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1074 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1075 break
1076 else:
1077 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1078 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1079
1080 # look if present
1081 RO_update = {"member-vnf-index": member_vnf_index}
1082 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1083 if vnfd_list:
1084 RO_update["id"] = vnfd_list[0]["uuid"]
1085 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1086 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1087 else:
1088 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1089 get("additionalParamsForVnf"), nsr_id)
1090 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1091 RO_update["id"] = desc["uuid"]
1092 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1093 vnfd_ref, member_vnf_index, desc["uuid"]))
1094 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1095 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1096 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1097
1098 # create nsd at RO
1099 nsd_ref = nsd["id"]
1100 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
1101 # self.logger.debug(logging_text + step)
1102
1103 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1104 RO_descriptor_number += 1
1105 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1106 if nsd_list:
1107 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1108 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1109 nsd_ref, RO_nsd_uuid))
1110 else:
1111 nsd_RO = deepcopy(nsd)
1112 nsd_RO["id"] = RO_osm_nsd_id
1113 nsd_RO.pop("_id", None)
1114 nsd_RO.pop("_admin", None)
1115 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1116 member_vnf_index = c_vnf["member-vnf-index"]
1117 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1118 for c_vld in nsd_RO.get("vld", ()):
1119 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1120 member_vnf_index = cp["member-vnf-index-ref"]
1121 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1122
1123 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1124 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1125 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1126 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1127 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1128
1129 # Crate ns at RO
1130 # if present use it unless in error status
1131 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1132 if RO_nsr_id:
1133 try:
1134 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
1135 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
1136 desc = await self.RO.show("ns", RO_nsr_id)
1137 except ROclient.ROClientException as e:
1138 if e.http_code != HTTPStatus.NOT_FOUND:
1139 raise
1140 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1141 if RO_nsr_id:
1142 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1143 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1144 if ns_status == "ERROR":
1145 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1146 self.logger.debug(logging_text + step)
1147 await self.RO.delete("ns", RO_nsr_id)
1148 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1149 if not RO_nsr_id:
1150 step = db_nsr_update["detailed-status"] = "Checking dependencies"
1151 # self.logger.debug(logging_text + step)
1152
1153 # check if VIM is creating and wait look if previous tasks in process
1154 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1155 if task_dependency:
1156 step = "Waiting for related tasks to be completed: {}".format(task_name)
1157 self.logger.debug(logging_text + step)
1158 await asyncio.wait(task_dependency, timeout=3600)
1159 if ns_params.get("vnf"):
1160 for vnf in ns_params["vnf"]:
1161 if "vimAccountId" in vnf:
1162 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1163 vnf["vimAccountId"])
1164 if task_dependency:
1165 step = "Waiting for related tasks to be completed: {}".format(task_name)
1166 self.logger.debug(logging_text + step)
1167 await asyncio.wait(task_dependency, timeout=3600)
1168
1169 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
1170
1171 # feature 1429. Add n2vc public key to needed VMs
1172 n2vc_key = await self.n2vc.GetPublicKey()
1173 n2vc_key_list.append(n2vc_key)
1174 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
1175
1176 step = db_nsr_update["detailed-status"] = "Deploying ns at VIM"
1177 desc = await self.RO.create("ns", descriptor=RO_ns_params,
1178 name=db_nsr["name"],
1179 scenario=RO_nsd_uuid)
1180 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1181 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1182 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1183 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1185
1186 # wait until NS is ready
1187 step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_id={}".format(RO_nsr_id)
1188 detailed_status_old = None
1189 self.logger.debug(logging_text + step)
1190
1191 while time() <= start_deploy + self.total_deploy_timeout:
1192 desc = await self.RO.show("ns", RO_nsr_id)
1193 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1194 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1195 if ns_status == "ERROR":
1196 raise ROclient.ROClientException(ns_status_info)
1197 elif ns_status == "BUILD":
1198 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
1199 elif ns_status == "ACTIVE":
1200 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
1201 try:
1202 self.ns_update_vnfr(db_vnfrs, desc)
1203 break
1204 except LcmExceptionNoMgmtIP:
1205 pass
1206 else:
1207 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1208 if detailed_status != detailed_status_old:
1209 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
1210 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1211 await asyncio.sleep(5, loop=self.loop)
1212 else: # total_deploy_timeout
1213 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1214
1215 step = "Updating NSR"
1216 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1217
1218 db_nsr_update["operational-status"] = "running"
1219 db_nsr["detailed-status"] = "Configuring vnfr"
1220 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1221
1222 # Configure proxy charms once VMs are up
1223 for vca_index, vca_deployed in enumerate(vca_deployed_list):
1224 vnf_index = vca_deployed.get("member-vnf-index")
1225 vdu_id = vca_deployed.get("vdu_id")
1226 vdu_name = None
1227 vdu_count_index = None
1228
1229 step = "executing proxy charm initial primitives for member_vnf_index={} vdu_id={}".format(vnf_index,
1230 vdu_id)
1231 add_params = {}
1232 initial_config_primitive_list = []
1233 if vnf_index:
1234 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
1235 add_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
1236 vnfd = db_vnfds_index[vnf_index]
1237
1238 if vdu_id:
1239 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
1240 if vdu["id"] == vdu_id:
1241 initial_config_primitive_list = vdu['vdu-configuration'].get(
1242 'initial-config-primitive', [])
1243 break
1244 else:
1245 raise LcmException("Not found vdu_id={} at vnfd:vdu".format(vdu_id))
1246 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
1247 # TODO for the moment only first vdu_id contains a charm deployed
1248 if vdur["vdu-id-ref"] != vdu["id"]:
1249 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
1250 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
1251 add_params["rw_mgmt_ip"] = vdur["ip-address"]
1252 else:
1253 add_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
1254 initial_config_primitive_list = vnfd["vnf-configuration"].get('initial-config-primitive', [])
1255 else:
1256 if db_nsr.get("additionalParamsForNs"):
1257 add_params = db_nsr["additionalParamsForNs"].copy()
1258 for k, v in add_params.items():
1259 if isinstance(v, str) and v.startswith("!!yaml "):
1260 add_params[k] = yaml.safe_load(v[7:])
1261 add_params["rw_mgmt_ip"] = None
1262 initial_config_primitive_list = nsd["ns-configuration"].get('initial-config-primitive', [])
1263
1264 # add primitive verify-ssh-credentials to the list after config only when is a vnf or vdu charm
1265 initial_config_primitive_list = initial_config_primitive_list.copy()
1266 if initial_config_primitive_list and vnf_index and vca_deployed.get("ssh-public-key"):
1267 initial_config_primitive_list.insert(1, {"name": "verify-ssh-credentials", "parameter": []})
1268
1269 for initial_config_primitive in initial_config_primitive_list:
1270 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, add_params)
1271 self.logger.debug(logging_text + step + " primitive '{}' params '{}'"
1272 .format(initial_config_primitive["name"], primitive_params_))
1273 primitive_result, primitive_detail = await self._ns_execute_primitive(
1274 db_nsr["_admin"]["deployed"], vnf_index, vdu_id, vdu_name, vdu_count_index,
1275 initial_config_primitive["name"],
1276 primitive_params_,
1277 retries=10 if initial_config_primitive["name"] == "verify-ssh-credentials" else 0,
1278 retries_interval=30)
1279 if primitive_result != "COMPLETED":
1280 raise LcmException("charm error executing primitive {} for member_vnf_index={} vdu_id={}: '{}'"
1281 .format(initial_config_primitive["name"], vca_deployed["member-vnf-index"],
1282 vca_deployed["vdu_id"], primitive_detail))
1283
1284 # Deploy native charms
1285 step = "Looking for needed vnfd to configure with native charm"
1286 self.logger.debug(logging_text + step)
1287
1288 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1289 vnfd_id = c_vnf["vnfd-id-ref"]
1290 vnf_index = str(c_vnf["member-vnf-index"])
1291 vnfd = db_vnfds_ref[vnfd_id]
1292
1293 # Get additional parameters
1294 vnfr_params = {}
1295 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
1296 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
1297 for k, v in vnfr_params.items():
1298 if isinstance(v, str) and v.startswith("!!yaml "):
1299 vnfr_params[k] = yaml.safe_load(v[7:])
1300
1301 # Check if this VNF has a charm configuration
1302 vnf_config = vnfd.get("vnf-configuration")
1303 if vnf_config and vnf_config.get("juju"):
1304 native_charm = vnf_config["juju"].get("proxy") is False
1305 proxy_charm = vnf_config["juju"]["charm"]
1306 if native_charm and proxy_charm:
1307 if not vca_model_name:
1308 step = "creating VCA model name '{}'".format(nsr_id)
1309 self.logger.debug(logging_text + step)
1310 await self.n2vc.CreateNetworkService(nsr_id)
1311 vca_model_name = nsr_id
1312 db_nsr_update["_admin.deployed.VCA-model-name"] = nsr_id
1313 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1314 step = "deploying native charm for vnf_member_index={}".format(vnf_index)
1315 self.logger.debug(logging_text + step)
1316
1317 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
1318 charm_params = {
1319 "user_values": vnfr_params,
1320 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
1321 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {},
1322 }
1323
1324 # get username
1325 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1326 # merged. Meanwhile let's get username from initial-config-primitive
1327 if vnf_config.get("initial-config-primitive"):
1328 for param in vnf_config["initial-config-primitive"][0].get("parameter", ()):
1329 if param["name"] == "ssh-username":
1330 charm_params["username"] = param["value"]
1331 if vnf_config.get("config-access") and vnf_config["config-access"].get("ssh-access"):
1332 if vnf_config["config-access"]["ssh-access"].get("required"):
1333 charm_params["username"] = vnf_config["config-access"]["ssh-access"].get("default-user")
1334
1335 # Login to the VCA. If there are multiple calls to login(),
1336 # subsequent calls will be a nop and return immediately.
1337 await self.n2vc.login()
1338
1339 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info, native_charm)
1340 number_to_configure += 1
1341
1342 # Deploy charms for each VDU that supports one.
1343 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
1344 vdu_config = vdu.get('vdu-configuration')
1345 native_charm = False
1346
1347 if vdu_config and vdu_config.get("juju"):
1348 native_charm = vdu_config["juju"].get("proxy") is False
1349 proxy_charm = vdu_config["juju"]["charm"]
1350 if native_charm and proxy_charm:
1351 if not vca_model_name:
1352 step = "creating VCA model name"
1353 await self.n2vc.CreateNetworkService(nsr_id)
1354 vca_model_name = nsr_id
1355 db_nsr_update["_admin.deployed.VCA-model-name"] = nsr_id
1356 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1357 step = "deploying native charm for vnf_member_index={} vdu_id={}".format(vnf_index,
1358 vdu["id"])
1359
1360 self.logger.debug(logging_text + step)
1361
1362 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
1363
1364 # TODO for the moment only first vdu_id contains a charm deployed
1365 if vdur["vdu-id-ref"] != vdu["id"]:
1366 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
1367 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
1368 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1369 charm_params = {
1370 "user_values": vnfr_params,
1371 "rw_mgmt_ip": vdur["ip-address"],
1372 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
1373 }
1374
1375 # get username
1376 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1377 # merged. Meanwhile let's get username from initial-config-primitive
1378 if vdu_config.get("initial-config-primitive"):
1379 for param in vdu_config["initial-config-primitive"][0].get("parameter", ()):
1380 if param["name"] == "ssh-username":
1381 charm_params["username"] = param["value"]
1382 if vdu_config.get("config-access") and vdu_config["config-access"].get("ssh-access"):
1383 if vdu_config["config-access"]["ssh-access"].get("required"):
1384 charm_params["username"] = vdu_config["config-access"]["ssh-access"].get(
1385 "default-user")
1386
1387 await self.n2vc.login()
1388
1389 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
1390 charm_params, n2vc_info, native_charm)
1391 number_to_configure += 1
1392
1393 # Check if this NS has a charm configuration
1394
1395 ns_config = nsd.get("ns-configuration")
1396 if ns_config and ns_config.get("juju"):
1397 native_charm = ns_config["juju"].get("proxy") is False
1398 proxy_charm = ns_config["juju"]["charm"]
1399 if native_charm and proxy_charm:
1400 step = "deploying native charm to configure ns"
1401 # TODO is NS magmt IP address needed?
1402
1403 # Get additional parameters
1404 additional_params = {}
1405 if db_nsr.get("additionalParamsForNs"):
1406 additional_params = db_nsr["additionalParamsForNs"].copy()
1407 for k, v in additional_params.items():
1408 if isinstance(v, str) and v.startswith("!!yaml "):
1409 additional_params[k] = yaml.safe_load(v[7:])
1410
1411 # additional_params["rw_mgmt_ip"] = db_nsr["ip-address"]
1412 charm_params = {
1413 "user_values": additional_params,
1414 "rw_mgmt_ip": db_nsr.get("ip-address"),
1415 "initial-config-primitive": ns_config.get('initial-config-primitive') or {}
1416 }
1417
1418 # get username
1419 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1420 # merged. Meanwhile let's get username from initial-config-primitive
1421 if ns_config.get("initial-config-primitive"):
1422 for param in ns_config["initial-config-primitive"][0].get("parameter", ()):
1423 if param["name"] == "ssh-username":
1424 charm_params["username"] = param["value"]
1425 if ns_config.get("config-access") and ns_config["config-access"].get("ssh-access"):
1426 if ns_config["config-access"]["ssh-access"].get("required"):
1427 charm_params["username"] = ns_config["config-access"]["ssh-access"].get("default-user")
1428
1429 # Login to the VCA. If there are multiple calls to login(),
1430 # subsequent calls will be a nop and return immediately.
1431 await self.n2vc.login()
1432 deploy_charm(None, None, None, None, charm_params, n2vc_info, native_charm)
1433 number_to_configure += 1
1434
1435 # waiting all charms are ok
1436 configuration_failed = False
1437 if number_to_configure:
1438 step = "Waiting all charms are active"
1439 old_status = "configuring: init: {}".format(number_to_configure)
1440 db_nsr_update["config-status"] = old_status
1441 db_nsr_update["detailed-status"] = old_status
1442 db_nslcmop_update["detailed-status"] = old_status
1443
1444 # wait until all are configured.
1445 while time() <= start_deploy + self.total_deploy_timeout:
1446 if db_nsr_update:
1447 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1448 if db_nslcmop_update:
1449 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1450 # TODO add a fake task that set n2vc_event after some time
1451 await n2vc_info["n2vc_event"].wait()
1452 n2vc_info["n2vc_event"].clear()
1453 all_active = True
1454 status_map = {}
1455 n2vc_error_text = [] # contain text error list. If empty no one is in error status
1456 now = time()
1457 for vca_deployed in vca_deployed_list:
1458 vca_status = vca_deployed["operational-status"]
1459 if vca_status not in status_map:
1460 # Initialize it
1461 status_map[vca_status] = 0
1462 status_map[vca_status] += 1
1463
1464 if vca_status == "active":
1465 vca_deployed.pop("time_first_error", None)
1466 vca_deployed.pop("status_first_error", None)
1467 continue
1468
1469 all_active = False
1470 if vca_status in ("error", "blocked"):
1471 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
1472 # if not first time in this status error
1473 if not vca_deployed.get("time_first_error"):
1474 vca_deployed["time_first_error"] = now
1475 continue
1476 if vca_deployed.get("time_first_error") and \
1477 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1478 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1479 .format(vca_deployed["member-vnf-index"],
1480 vca_deployed["vdu_id"], vca_status,
1481 vca_deployed["detailed-status-error"]))
1482
1483 if all_active:
1484 break
1485 elif n2vc_error_text:
1486 db_nsr_update["config-status"] = "failed"
1487 error_text = "fail configuring " + ";".join(n2vc_error_text)
1488 db_nsr_update["detailed-status"] = error_text
1489 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1490 db_nslcmop_update["detailed-status"] = error_text
1491 db_nslcmop_update["statusEnteredTime"] = time()
1492 configuration_failed = True
1493 break
1494 else:
1495 cs = "configuring: "
1496 separator = ""
1497 for status, num in status_map.items():
1498 cs += separator + "{}: {}".format(status, num)
1499 separator = ", "
1500 if old_status != cs:
1501 db_nsr_update["config-status"] = cs
1502 db_nsr_update["detailed-status"] = cs
1503 db_nslcmop_update["detailed-status"] = cs
1504 old_status = cs
1505 else: # total_deploy_timeout
1506 raise LcmException("Timeout waiting ns to be configured")
1507
1508 if not configuration_failed:
1509 # all is done
1510 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1511 db_nslcmop_update["statusEnteredTime"] = time()
1512 db_nslcmop_update["detailed-status"] = "done"
1513 db_nsr_update["config-status"] = "configured"
1514 db_nsr_update["detailed-status"] = "done"
1515
1516 return
1517
1518 except (ROclient.ROClientException, DbException, LcmException) as e:
1519 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1520 exc = e
1521 except asyncio.CancelledError:
1522 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1523 exc = "Operation was cancelled"
1524 except Exception as e:
1525 exc = traceback.format_exc()
1526 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1527 exc_info=True)
1528 finally:
1529 if exc:
1530 if db_nsr:
1531 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1532 db_nsr_update["operational-status"] = "failed"
1533 if db_nslcmop:
1534 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1535 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1536 db_nslcmop_update["statusEnteredTime"] = time()
1537 try:
1538 if db_nsr:
1539 db_nsr_update["_admin.nslcmop"] = None
1540 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1541 if db_nslcmop_update:
1542 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1543 except DbException as e:
1544 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1545 if nslcmop_operation_state:
1546 try:
1547 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1548 "operationState": nslcmop_operation_state},
1549 loop=self.loop)
1550 except Exception as e:
1551 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1552
1553 self.logger.debug(logging_text + "Exit")
1554 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1555
1556 async def _destroy_charm(self, model, application):
1557 """
1558 Order N2VC destroy a charm
1559 :param model:
1560 :param application:
1561 :return: True if charm does not exist. False if it exist
1562 """
1563 if not await self.n2vc.HasApplication(model, application):
1564 return True # Already removed
1565 await self.n2vc.RemoveCharms(model, application)
1566 return False
1567
1568 async def _wait_charm_destroyed(self, model, application, timeout):
1569 """
1570 Wait until charm does not exist
1571 :param model:
1572 :param application:
1573 :param timeout:
1574 :return: True if not exist, False if timeout
1575 """
1576 while True:
1577 if not await self.n2vc.HasApplication(model, application):
1578 return True
1579 if timeout < 0:
1580 return False
1581 await asyncio.sleep(10)
1582 timeout -= 10
1583
1584 # Check if this VNFD has a configured terminate action
1585 def _has_terminate_config_primitive(self, vnfd):
1586 vnf_config = vnfd.get("vnf-configuration")
1587 if vnf_config and vnf_config.get("terminate-config-primitive"):
1588 return True
1589 else:
1590 return False
1591
1592 @staticmethod
1593 def _get_terminate_config_primitive_seq_list(vnfd):
1594 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
1595 # No need to check for existing primitive twice, already done before
1596 vnf_config = vnfd.get("vnf-configuration")
1597 seq_list = vnf_config.get("terminate-config-primitive")
1598 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
1599 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
1600 return seq_list_sorted
1601
1602 @staticmethod
1603 def _create_nslcmop(nsr_id, operation, params):
1604 """
1605 Creates a ns-lcm-opp content to be stored at database.
1606 :param nsr_id: internal id of the instance
1607 :param operation: instantiate, terminate, scale, action, ...
1608 :param params: user parameters for the operation
1609 :return: dictionary following SOL005 format
1610 """
1611 # Raise exception if invalid arguments
1612 if not (nsr_id and operation and params):
1613 raise LcmException(
1614 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
1615 now = time()
1616 _id = str(uuid4())
1617 nslcmop = {
1618 "id": _id,
1619 "_id": _id,
1620 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
1621 "operationState": "PROCESSING",
1622 "statusEnteredTime": now,
1623 "nsInstanceId": nsr_id,
1624 "lcmOperationType": operation,
1625 "startTime": now,
1626 "isAutomaticInvocation": False,
1627 "operationParams": params,
1628 "isCancelPending": False,
1629 "links": {
1630 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
1631 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
1632 }
1633 }
1634 return nslcmop
1635
1636 def _get_terminate_primitive_params(self, seq, vnf_index):
1637 primitive = seq.get('name')
1638 primitive_params = {}
1639 params = {
1640 "member_vnf_index": vnf_index,
1641 "primitive": primitive,
1642 "primitive_params": primitive_params,
1643 }
1644 desc_params = {}
1645 return self._map_primitive_params(seq, params, desc_params)
1646
1647 # sub-operations
1648
1649 def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
1650 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
1651 if (op.get('operationState') == 'COMPLETED'):
1652 # b. Skip sub-operation
1653 # _ns_execute_primitive() or RO.create_action() will NOT be executed
1654 return self.SUBOPERATION_STATUS_SKIP
1655 else:
1656 # c. Reintent executing sub-operation
1657 # The sub-operation exists, and operationState != 'COMPLETED'
1658 # Update operationState = 'PROCESSING' to indicate a reintent.
1659 operationState = 'PROCESSING'
1660 detailed_status = 'In progress'
1661 self._update_suboperation_status(
1662 db_nslcmop, op_index, operationState, detailed_status)
1663 # Return the sub-operation index
1664 # _ns_execute_primitive() or RO.create_action() will be called from scale()
1665 # with arguments extracted from the sub-operation
1666 return op_index
1667
1668 # Find a sub-operation where all keys in a matching dictionary must match
1669 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
1670 def _find_suboperation(self, db_nslcmop, match):
1671 if (db_nslcmop and match):
1672 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
1673 for i, op in enumerate(op_list):
1674 if all(op.get(k) == match[k] for k in match):
1675 return i
1676 return self.SUBOPERATION_STATUS_NOT_FOUND
1677
1678 # Update status for a sub-operation given its index
1679 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
1680 # Update DB for HA tasks
1681 q_filter = {'_id': db_nslcmop['_id']}
1682 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
1683 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
1684 self.db.set_one("nslcmops",
1685 q_filter=q_filter,
1686 update_dict=update_dict,
1687 fail_on_empty=False)
1688
1689 # Add sub-operation, return the index of the added sub-operation
1690 # Optionally, set operationState, detailed-status, and operationType
1691 # Status and type are currently set for 'scale' sub-operations:
1692 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
1693 # 'detailed-status' : status message
1694 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
1695 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
1696 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index,
1697 vdu_name, primitive, mapped_primitive_params,
1698 operationState=None, detailed_status=None, operationType=None,
1699 RO_nsr_id=None, RO_scaling_info=None):
1700 if not (db_nslcmop):
1701 return self.SUBOPERATION_STATUS_NOT_FOUND
1702 # Get the "_admin.operations" list, if it exists
1703 db_nslcmop_admin = db_nslcmop.get('_admin', {})
1704 op_list = db_nslcmop_admin.get('operations')
1705 # Create or append to the "_admin.operations" list
1706 new_op = {'member_vnf_index': vnf_index,
1707 'vdu_id': vdu_id,
1708 'vdu_count_index': vdu_count_index,
1709 'primitive': primitive,
1710 'primitive_params': mapped_primitive_params}
1711 if operationState:
1712 new_op['operationState'] = operationState
1713 if detailed_status:
1714 new_op['detailed-status'] = detailed_status
1715 if operationType:
1716 new_op['lcmOperationType'] = operationType
1717 if RO_nsr_id:
1718 new_op['RO_nsr_id'] = RO_nsr_id
1719 if RO_scaling_info:
1720 new_op['RO_scaling_info'] = RO_scaling_info
1721 if not op_list:
1722 # No existing operations, create key 'operations' with current operation as first list element
1723 db_nslcmop_admin.update({'operations': [new_op]})
1724 op_list = db_nslcmop_admin.get('operations')
1725 else:
1726 # Existing operations, append operation to list
1727 op_list.append(new_op)
1728
1729 db_nslcmop_update = {'_admin.operations': op_list}
1730 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
1731 op_index = len(op_list) - 1
1732 return op_index
1733
1734 # Helper methods for scale() sub-operations
1735
1736 # pre-scale/post-scale:
1737 # Check for 3 different cases:
1738 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
1739 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
1740 # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
1741 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index,
1742 vnf_config_primitive, primitive_params, operationType,
1743 RO_nsr_id=None, RO_scaling_info=None):
1744 # Find this sub-operation
1745 if (RO_nsr_id and RO_scaling_info):
1746 operationType = 'SCALE-RO'
1747 match = {
1748 'member_vnf_index': vnf_index,
1749 'RO_nsr_id': RO_nsr_id,
1750 'RO_scaling_info': RO_scaling_info,
1751 }
1752 else:
1753 match = {
1754 'member_vnf_index': vnf_index,
1755 'primitive': vnf_config_primitive,
1756 'primitive_params': primitive_params,
1757 'lcmOperationType': operationType
1758 }
1759 op_index = self._find_suboperation(db_nslcmop, match)
1760 if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
1761 # a. New sub-operation
1762 # The sub-operation does not exist, add it.
1763 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
1764 # The following parameters are set to None for all kind of scaling:
1765 vdu_id = None
1766 vdu_count_index = None
1767 vdu_name = None
1768 if (RO_nsr_id and RO_scaling_info):
1769 vnf_config_primitive = None
1770 primitive_params = None
1771 else:
1772 RO_nsr_id = None
1773 RO_scaling_info = None
1774 # Initial status for sub-operation
1775 operationState = 'PROCESSING'
1776 detailed_status = 'In progress'
1777 # Add sub-operation for pre/post-scaling (zero or more operations)
1778 self._add_suboperation(db_nslcmop,
1779 vnf_index,
1780 vdu_id,
1781 vdu_count_index,
1782 vdu_name,
1783 vnf_config_primitive,
1784 primitive_params,
1785 operationState,
1786 detailed_status,
1787 operationType,
1788 RO_nsr_id,
1789 RO_scaling_info)
1790 return self.SUBOPERATION_STATUS_NEW
1791 else:
1792 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
1793 # or op_index (operationState != 'COMPLETED')
1794 return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
1795
1796 # Helper methods for terminate()
1797
1798 async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
1799 """ Create a primitive with params from VNFD
1800 Called from terminate() before deleting instance
1801 Calls action() to execute the primitive """
1802 logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
1803 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1804 db_vnfds = {}
1805 # Loop over VNFRs
1806 for vnfr in db_vnfrs_list:
1807 vnfd_id = vnfr["vnfd-id"]
1808 vnf_index = vnfr["member-vnf-index-ref"]
1809 if vnfd_id not in db_vnfds:
1810 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_id)
1811 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1812 db_vnfds[vnfd_id] = vnfd
1813 vnfd = db_vnfds[vnfd_id]
1814 if not self._has_terminate_config_primitive(vnfd):
1815 continue
1816 # Get the primitive's sorted sequence list
1817 seq_list = self._get_terminate_config_primitive_seq_list(vnfd)
1818 for seq in seq_list:
1819 # For each sequence in list, get primitive and call _ns_execute_primitive()
1820 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
1821 vnf_index, seq.get("name"))
1822 self.logger.debug(logging_text + step)
1823 # Create the primitive for each sequence, i.e. "primitive": "touch"
1824 primitive = seq.get('name')
1825 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
1826 # The following 3 parameters are currently set to None for 'terminate':
1827 # vdu_id, vdu_count_index, vdu_name
1828 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1829 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1830 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1831 # Add sub-operation
1832 self._add_suboperation(db_nslcmop,
1833 nslcmop_id,
1834 vnf_index,
1835 vdu_id,
1836 vdu_count_index,
1837 vdu_name,
1838 primitive,
1839 mapped_primitive_params)
1840 # Sub-operations: Call _ns_execute_primitive() instead of action()
1841 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1842 nsr_deployed = db_nsr["_admin"]["deployed"]
1843 result, result_detail = await self._ns_execute_primitive(
1844 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1845 mapped_primitive_params)
1846
1847 # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action(
1848 # nsr_id, nslcmop_terminate_action_id)
1849 # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED']
1850 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
1851 if result not in result_ok:
1852 raise LcmException(
1853 "terminate_primitive_action for vnf_member_index={}",
1854 " primitive={} fails with error {}".format(
1855 vnf_index, seq.get("name"), result_detail))
1856
1857 async def terminate(self, nsr_id, nslcmop_id):
1858
1859 # Try to lock HA task here
1860 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1861 if not task_is_locked_by_me:
1862 return
1863
1864 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1865 self.logger.debug(logging_text + "Enter")
1866 db_nsr = None
1867 db_nslcmop = None
1868 exc = None
1869 failed_detail = [] # annotates all failed error messages
1870 vca_time_destroy = None # time of where destroy charm order
1871 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1872 db_nslcmop_update = {}
1873 nslcmop_operation_state = None
1874 autoremove = False # autoremove after terminated
1875 try:
1876 # wait for any previous tasks in process
1877 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
1878
1879 step = "Getting nslcmop={} from db".format(nslcmop_id)
1880 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1881 step = "Getting nsr={} from db".format(nsr_id)
1882 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1883 # nsd = db_nsr["nsd"]
1884 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1885 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1886 return
1887 # #TODO check if VIM is creating and wait
1888 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1889 # Call internal terminate action
1890 await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id)
1891
1892 db_nsr_update["operational-status"] = "terminating"
1893 db_nsr_update["config-status"] = "terminating"
1894
1895 if nsr_deployed and nsr_deployed.get("VCA-model-name"):
1896 vca_model_name = nsr_deployed["VCA-model-name"]
1897 step = "deleting VCA model name '{}' and all charms".format(vca_model_name)
1898 self.logger.debug(logging_text + step)
1899 try:
1900 await self.n2vc.DestroyNetworkService(vca_model_name)
1901 except NetworkServiceDoesNotExist:
1902 pass
1903 db_nsr_update["_admin.deployed.VCA-model-name"] = None
1904 if nsr_deployed.get("VCA"):
1905 for vca_index in range(0, len(nsr_deployed["VCA"])):
1906 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = None
1907 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1908 # for backward compatibility if charm have been created with "default" model name delete one by one
1909 elif nsr_deployed and nsr_deployed.get("VCA"):
1910 try:
1911 step = "Scheduling configuration charms removing"
1912 db_nsr_update["detailed-status"] = "Deleting charms"
1913 self.logger.debug(logging_text + step)
1914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1915 # for backward compatibility
1916 if isinstance(nsr_deployed["VCA"], dict):
1917 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1918 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1919 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1920
1921 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1922 if vca_deployed:
1923 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1924 vca_deployed.clear()
1925 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1926 else:
1927 vca_time_destroy = time()
1928 except Exception as e:
1929 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1930
1931 # remove from RO
1932 RO_fail = False
1933
1934 # Delete ns
1935 RO_nsr_id = RO_delete_action = None
1936 if nsr_deployed and nsr_deployed.get("RO"):
1937 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1938 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1939 try:
1940 if RO_nsr_id:
1941 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = \
1942 "Deleting ns from VIM"
1943 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1944 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1945 self.logger.debug(logging_text + step)
1946 desc = await self.RO.delete("ns", RO_nsr_id)
1947 RO_delete_action = desc["action_id"]
1948 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1949 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1950 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1951 if RO_delete_action:
1952 # wait until NS is deleted from VIM
1953 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1954 format(RO_nsr_id, RO_delete_action)
1955 detailed_status_old = None
1956 self.logger.debug(logging_text + step)
1957
1958 delete_timeout = 20 * 60 # 20 minutes
1959 while delete_timeout > 0:
1960 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1961 extra_item_id=RO_delete_action)
1962 ns_status, ns_status_info = self.RO.check_action_status(desc)
1963 if ns_status == "ERROR":
1964 raise ROclient.ROClientException(ns_status_info)
1965 elif ns_status == "BUILD":
1966 detailed_status = step + "; {}".format(ns_status_info)
1967 elif ns_status == "ACTIVE":
1968 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1969 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1970 break
1971 else:
1972 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1973 if detailed_status != detailed_status_old:
1974 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1975 db_nsr_update["detailed-status"] = detailed_status
1976 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1977 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1978 await asyncio.sleep(5, loop=self.loop)
1979 delete_timeout -= 5
1980 else: # delete_timeout <= 0:
1981 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1982
1983 except ROclient.ROClientException as e:
1984 if e.http_code == 404: # not found
1985 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1986 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1987 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1988 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1989 elif e.http_code == 409: # conflict
1990 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1991 self.logger.debug(logging_text + failed_detail[-1])
1992 RO_fail = True
1993 else:
1994 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1995 self.logger.error(logging_text + failed_detail[-1])
1996 RO_fail = True
1997
1998 # Delete nsd
1999 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
2000 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
2001 try:
2002 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2003 "Deleting nsd from RO"
2004 await self.RO.delete("nsd", RO_nsd_id)
2005 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
2006 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2007 except ROclient.ROClientException as e:
2008 if e.http_code == 404: # not found
2009 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2010 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
2011 elif e.http_code == 409: # conflict
2012 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
2013 self.logger.debug(logging_text + failed_detail[-1])
2014 RO_fail = True
2015 else:
2016 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
2017 self.logger.error(logging_text + failed_detail[-1])
2018 RO_fail = True
2019
2020 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
2021 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2022 if not vnf_deployed or not vnf_deployed["id"]:
2023 continue
2024 try:
2025 RO_vnfd_id = vnf_deployed["id"]
2026 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
2027 "Deleting member_vnf_index={} RO_vnfd_id={} from RO".format(
2028 vnf_deployed["member-vnf-index"], RO_vnfd_id)
2029 await self.RO.delete("vnfd", RO_vnfd_id)
2030 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
2031 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2032 except ROclient.ROClientException as e:
2033 if e.http_code == 404: # not found
2034 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2035 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
2036 elif e.http_code == 409: # conflict
2037 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
2038 self.logger.debug(logging_text + failed_detail[-1])
2039 else:
2040 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
2041 self.logger.error(logging_text + failed_detail[-1])
2042
2043 # wait until charm deleted
2044 if vca_time_destroy:
2045 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
2046 "Waiting for deletion of configuration charms"
2047 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2048 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2049 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
2050 if not vca_deployed:
2051 continue
2052 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
2053 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
2054 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
2055 timeout):
2056 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
2057 vca_deployed["application"]))
2058 else:
2059 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
2060
2061 if failed_detail:
2062 self.logger.error(logging_text + " ;".join(failed_detail))
2063 db_nsr_update["operational-status"] = "failed"
2064 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
2065 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
2066 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2067 db_nslcmop_update["statusEnteredTime"] = time()
2068 else:
2069 db_nsr_update["operational-status"] = "terminated"
2070 db_nsr_update["detailed-status"] = "Done"
2071 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
2072 db_nslcmop_update["detailed-status"] = "Done"
2073 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2074 db_nslcmop_update["statusEnteredTime"] = time()
2075 if db_nslcmop["operationParams"].get("autoremove"):
2076 autoremove = True
2077
2078 except (ROclient.ROClientException, DbException, LcmException) as e:
2079 self.logger.error(logging_text + "Exit Exception {}".format(e))
2080 exc = e
2081 except asyncio.CancelledError:
2082 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2083 exc = "Operation was cancelled"
2084 except Exception as e:
2085 exc = traceback.format_exc()
2086 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
2087 finally:
2088 if exc and db_nslcmop:
2089 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2090 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2091 db_nslcmop_update["statusEnteredTime"] = time()
2092 try:
2093 if db_nslcmop and db_nslcmop_update:
2094 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2095 if db_nsr:
2096 db_nsr_update["_admin.nslcmop"] = None
2097 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2098 except DbException as e:
2099 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2100 if nslcmop_operation_state:
2101 try:
2102 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2103 "operationState": nslcmop_operation_state,
2104 "autoremove": autoremove},
2105 loop=self.loop)
2106 except Exception as e:
2107 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2108 self.logger.debug(logging_text + "Exit")
2109 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
2110
2111 @staticmethod
2112 def _map_primitive_params(primitive_desc, params, instantiation_params):
2113 """
2114 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
2115 The default-value is used. If it is between < > it look for a value at instantiation_params
2116 :param primitive_desc: portion of VNFD/NSD that describes primitive
2117 :param params: Params provided by user
2118 :param instantiation_params: Instantiation params provided by user
2119 :return: a dictionary with the calculated params
2120 """
2121 calculated_params = {}
2122 for parameter in primitive_desc.get("parameter", ()):
2123 param_name = parameter["name"]
2124 if param_name in params:
2125 calculated_params[param_name] = params[param_name]
2126 elif "default-value" in parameter or "value" in parameter:
2127 if "value" in parameter:
2128 calculated_params[param_name] = parameter["value"]
2129 else:
2130 calculated_params[param_name] = parameter["default-value"]
2131 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
2132 and calculated_params[param_name].endswith(">"):
2133 if calculated_params[param_name][1:-1] in instantiation_params:
2134 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
2135 else:
2136 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2137 format(parameter["default-value"], primitive_desc["name"]))
2138 else:
2139 raise LcmException("Parameter {} needed to execute primitive {} not provided".
2140 format(param_name, primitive_desc["name"]))
2141
2142 if isinstance(calculated_params[param_name], (dict, list, tuple)):
2143 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
2144 width=256)
2145 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
2146 calculated_params[param_name] = calculated_params[param_name][7:]
2147 return calculated_params
2148
2149 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
2150 primitive, primitive_params, retries=0, retries_interval=30):
2151 start_primitive_time = time()
2152 try:
2153 for vca_deployed in db_deployed["VCA"]:
2154 if not vca_deployed:
2155 continue
2156 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
2157 continue
2158 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
2159 continue
2160 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
2161 continue
2162 break
2163 else:
2164 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
2165 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
2166 model_name = vca_deployed.get("model")
2167 application_name = vca_deployed.get("application")
2168 if not model_name or not application_name:
2169 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
2170 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
2171 vdu_count_index))
2172 # if vca_deployed["operational-status"] != "active":
2173 # raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
2174 # member_vnf_index, vdu_id, vca_deployed["operational-status"]))
2175 callback = None # self.n2vc_callback
2176 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
2177 await self.n2vc.login()
2178 if primitive == "config":
2179 primitive_params = {"params": primitive_params}
2180 while retries >= 0:
2181 primitive_id = await self.n2vc.ExecutePrimitive(
2182 model_name,
2183 application_name,
2184 primitive,
2185 callback,
2186 *callback_args,
2187 **primitive_params
2188 )
2189 while time() - start_primitive_time < self.timeout_primitive:
2190 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
2191 if primitive_result_ in ("completed", "failed"):
2192 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
2193 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
2194 break
2195 elif primitive_result_ is None and primitive == "config":
2196 primitive_result = "COMPLETED"
2197 detailed_result = None
2198 break
2199 else: # ("running", "pending", None):
2200 pass
2201 await asyncio.sleep(5)
2202 else:
2203 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
2204 if primitive_result == "COMPLETED":
2205 break
2206 retries -= 1
2207 if retries >= 0:
2208 await asyncio.sleep(retries_interval)
2209
2210 return primitive_result, detailed_result
2211 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
2212 return "FAILED", str(e)
2213
2214 async def action(self, nsr_id, nslcmop_id):
2215
2216 # Try to lock HA task here
2217 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2218 if not task_is_locked_by_me:
2219 return
2220
2221 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
2222 self.logger.debug(logging_text + "Enter")
2223 # get all needed from database
2224 db_nsr = None
2225 db_nslcmop = None
2226 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
2227 db_nslcmop_update = {}
2228 nslcmop_operation_state = None
2229 nslcmop_operation_state_detail = None
2230 exc = None
2231 try:
2232 # wait for any previous tasks in process
2233 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2234
2235 step = "Getting information from database"
2236 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2237 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2238
2239 nsr_deployed = db_nsr["_admin"].get("deployed")
2240 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
2241 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
2242 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
2243 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
2244
2245 if vnf_index:
2246 step = "Getting vnfr from database"
2247 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2248 step = "Getting vnfd from database"
2249 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2250 else:
2251 if db_nsr.get("nsd"):
2252 db_nsd = db_nsr.get("nsd") # TODO this will be removed
2253 else:
2254 step = "Getting nsd from database"
2255 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2256
2257 # for backward compatibility
2258 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2259 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2260 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2261 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2262
2263 primitive = db_nslcmop["operationParams"]["primitive"]
2264 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
2265
2266 # look for primitive
2267 config_primitive_desc = None
2268 if vdu_id:
2269 for vdu in get_iterable(db_vnfd, "vdu"):
2270 if vdu_id == vdu["id"]:
2271 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
2272 if config_primitive["name"] == primitive:
2273 config_primitive_desc = config_primitive
2274 break
2275 elif vnf_index:
2276 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2277 if config_primitive["name"] == primitive:
2278 config_primitive_desc = config_primitive
2279 break
2280 else:
2281 for config_primitive in db_nsd.get("ns-configuration", {}).get("config-primitive", ()):
2282 if config_primitive["name"] == primitive:
2283 config_primitive_desc = config_primitive
2284 break
2285
2286 if not config_primitive_desc:
2287 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
2288 format(primitive))
2289
2290 desc_params = {}
2291 if vnf_index:
2292 if db_vnfr.get("additionalParamsForVnf"):
2293 desc_params.update(db_vnfr["additionalParamsForVnf"])
2294 else:
2295 if db_nsr.get("additionalParamsForVnf"):
2296 desc_params.update(db_nsr["additionalParamsForNs"])
2297
2298 # TODO check if ns is in a proper status
2299 result, result_detail = await self._ns_execute_primitive(
2300 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
2301 self._map_primitive_params(config_primitive_desc, primitive_params, desc_params))
2302 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = result_detail
2303 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
2304 db_nslcmop_update["statusEnteredTime"] = time()
2305 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
2306 return # database update is called inside finally
2307
2308 except (DbException, LcmException) as e:
2309 self.logger.error(logging_text + "Exit Exception {}".format(e))
2310 exc = e
2311 except asyncio.CancelledError:
2312 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2313 exc = "Operation was cancelled"
2314 except Exception as e:
2315 exc = traceback.format_exc()
2316 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2317 finally:
2318 if exc and db_nslcmop:
2319 db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = \
2320 "FAILED {}: {}".format(step, exc)
2321 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2322 db_nslcmop_update["statusEnteredTime"] = time()
2323 try:
2324 if db_nslcmop_update:
2325 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2326 if db_nsr:
2327 db_nsr_update["_admin.nslcmop"] = None
2328 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2329 except DbException as e:
2330 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2331 self.logger.debug(logging_text + "Exit")
2332 if nslcmop_operation_state:
2333 try:
2334 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2335 "operationState": nslcmop_operation_state},
2336 loop=self.loop)
2337 except Exception as e:
2338 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2339 self.logger.debug(logging_text + "Exit")
2340 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
2341 return nslcmop_operation_state, nslcmop_operation_state_detail
2342
2343 async def scale(self, nsr_id, nslcmop_id):
2344
2345 # Try to lock HA task here
2346 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2347 if not task_is_locked_by_me:
2348 return
2349
2350 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
2351 self.logger.debug(logging_text + "Enter")
2352 # get all needed from database
2353 db_nsr = None
2354 db_nslcmop = None
2355 db_nslcmop_update = {}
2356 nslcmop_operation_state = None
2357 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
2358 exc = None
2359 # in case of error, indicates what part of scale was failed to put nsr at error status
2360 scale_process = None
2361 old_operational_status = ""
2362 old_config_status = ""
2363 vnfr_scaled = False
2364 try:
2365 # wait for any previous tasks in process
2366 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2367
2368 step = "Getting nslcmop from database"
2369 self.logger.debug(step + " after having waited for previous tasks to be completed")
2370 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2371 step = "Getting nsr from database"
2372 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2373
2374 old_operational_status = db_nsr["operational-status"]
2375 old_config_status = db_nsr["config-status"]
2376 step = "Parsing scaling parameters"
2377 # self.logger.debug(step)
2378 db_nsr_update["operational-status"] = "scaling"
2379 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2380 nsr_deployed = db_nsr["_admin"].get("deployed")
2381 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
2382 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
2383 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
2384 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
2385 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
2386
2387 # for backward compatibility
2388 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
2389 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
2390 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
2391 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2392
2393 step = "Getting vnfr from database"
2394 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
2395 step = "Getting vnfd from database"
2396 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
2397
2398 step = "Getting scaling-group-descriptor"
2399 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
2400 if scaling_descriptor["name"] == scaling_group:
2401 break
2402 else:
2403 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
2404 "at vnfd:scaling-group-descriptor".format(scaling_group))
2405
2406 # cooldown_time = 0
2407 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
2408 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
2409 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
2410 # break
2411
2412 # TODO check if ns is in a proper status
2413 step = "Sending scale order to VIM"
2414 nb_scale_op = 0
2415 if not db_nsr["_admin"].get("scaling-group"):
2416 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
2417 admin_scale_index = 0
2418 else:
2419 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
2420 if admin_scale_info["name"] == scaling_group:
2421 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
2422 break
2423 else: # not found, set index one plus last element and add new entry with the name
2424 admin_scale_index += 1
2425 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
2426 RO_scaling_info = []
2427 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
2428 if scaling_type == "SCALE_OUT":
2429 # count if max-instance-count is reached
2430 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
2431 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
2432 if nb_scale_op >= max_instance_count:
2433 raise LcmException("reached the limit of {} (max-instance-count) "
2434 "scaling-out operations for the "
2435 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2436
2437 nb_scale_op += 1
2438 vdu_scaling_info["scaling_direction"] = "OUT"
2439 vdu_scaling_info["vdu-create"] = {}
2440 for vdu_scale_info in scaling_descriptor["vdu"]:
2441 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2442 "type": "create", "count": vdu_scale_info.get("count", 1)})
2443 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2444
2445 elif scaling_type == "SCALE_IN":
2446 # count if min-instance-count is reached
2447 min_instance_count = 0
2448 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
2449 min_instance_count = int(scaling_descriptor["min-instance-count"])
2450 if nb_scale_op <= min_instance_count:
2451 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
2452 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
2453 nb_scale_op -= 1
2454 vdu_scaling_info["scaling_direction"] = "IN"
2455 vdu_scaling_info["vdu-delete"] = {}
2456 for vdu_scale_info in scaling_descriptor["vdu"]:
2457 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
2458 "type": "delete", "count": vdu_scale_info.get("count", 1)})
2459 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
2460
2461 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
2462 vdu_create = vdu_scaling_info.get("vdu-create")
2463 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
2464 if vdu_scaling_info["scaling_direction"] == "IN":
2465 for vdur in reversed(db_vnfr["vdur"]):
2466 if vdu_delete.get(vdur["vdu-id-ref"]):
2467 vdu_delete[vdur["vdu-id-ref"]] -= 1
2468 vdu_scaling_info["vdu"].append({
2469 "name": vdur["name"],
2470 "vdu_id": vdur["vdu-id-ref"],
2471 "interface": []
2472 })
2473 for interface in vdur["interfaces"]:
2474 vdu_scaling_info["vdu"][-1]["interface"].append({
2475 "name": interface["name"],
2476 "ip_address": interface["ip-address"],
2477 "mac_address": interface.get("mac-address"),
2478 })
2479 vdu_delete = vdu_scaling_info.pop("vdu-delete")
2480
2481 # PRE-SCALE BEGIN
2482 step = "Executing pre-scale vnf-config-primitive"
2483 if scaling_descriptor.get("scaling-config-action"):
2484 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2485 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
2486 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
2487 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2488 step = db_nslcmop_update["detailed-status"] = \
2489 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
2490
2491 # look for primitive
2492 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2493 if config_primitive["name"] == vnf_config_primitive:
2494 break
2495 else:
2496 raise LcmException(
2497 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
2498 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
2499 "primitive".format(scaling_group, config_primitive))
2500
2501 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2502 if db_vnfr.get("additionalParamsForVnf"):
2503 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2504 scale_process = "VCA"
2505 db_nsr_update["config-status"] = "configuring pre-scaling"
2506 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2507
2508 # Pre-scale reintent check: Check if this sub-operation has been executed before
2509 op_index = self._check_or_add_scale_suboperation(
2510 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
2511 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2512 # Skip sub-operation
2513 result = 'COMPLETED'
2514 result_detail = 'Done'
2515 self.logger.debug(logging_text +
2516 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
2517 vnf_config_primitive, result, result_detail))
2518 else:
2519 if (op_index == self.SUBOPERATION_STATUS_NEW):
2520 # New sub-operation: Get index of this sub-operation
2521 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2522 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2523 format(vnf_config_primitive))
2524 else:
2525 # Reintent: Get registered params for this existing sub-operation
2526 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2527 vnf_index = op.get('member_vnf_index')
2528 vnf_config_primitive = op.get('primitive')
2529 primitive_params = op.get('primitive_params')
2530 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2531 format(vnf_config_primitive))
2532 # Execute the primitive, either with new (first-time) or registered (reintent) args
2533 result, result_detail = await self._ns_execute_primitive(
2534 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2535 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2536 vnf_config_primitive, result, result_detail))
2537 # Update operationState = COMPLETED | FAILED
2538 self._update_suboperation_status(
2539 db_nslcmop, op_index, result, result_detail)
2540
2541 if result == "FAILED":
2542 raise LcmException(result_detail)
2543 db_nsr_update["config-status"] = old_config_status
2544 scale_process = None
2545 # PRE-SCALE END
2546
2547 # SCALE RO - BEGIN
2548 # Should this block be skipped if 'RO_nsr_id' == None ?
2549 # if (RO_nsr_id and RO_scaling_info):
2550 if RO_scaling_info:
2551 scale_process = "RO"
2552 # Scale RO reintent check: Check if this sub-operation has been executed before
2553 op_index = self._check_or_add_scale_suboperation(
2554 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
2555 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2556 # Skip sub-operation
2557 result = 'COMPLETED'
2558 result_detail = 'Done'
2559 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
2560 result, result_detail))
2561 else:
2562 if (op_index == self.SUBOPERATION_STATUS_NEW):
2563 # New sub-operation: Get index of this sub-operation
2564 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2565 self.logger.debug(logging_text + "New sub-operation RO")
2566 else:
2567 # Reintent: Get registered params for this existing sub-operation
2568 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2569 RO_nsr_id = op.get('RO_nsr_id')
2570 RO_scaling_info = op.get('RO_scaling_info')
2571 self.logger.debug(logging_text + "Sub-operation RO reintent".format(
2572 vnf_config_primitive))
2573
2574 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
2575 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
2576 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
2577 # wait until ready
2578 RO_nslcmop_id = RO_desc["instance_action_id"]
2579 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
2580
2581 RO_task_done = False
2582 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
2583 detailed_status_old = None
2584 self.logger.debug(logging_text + step)
2585
2586 deployment_timeout = 1 * 3600 # One hour
2587 while deployment_timeout > 0:
2588 if not RO_task_done:
2589 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
2590 extra_item_id=RO_nslcmop_id)
2591 ns_status, ns_status_info = self.RO.check_action_status(desc)
2592 if ns_status == "ERROR":
2593 raise ROclient.ROClientException(ns_status_info)
2594 elif ns_status == "BUILD":
2595 detailed_status = step + "; {}".format(ns_status_info)
2596 elif ns_status == "ACTIVE":
2597 RO_task_done = True
2598 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
2599 self.logger.debug(logging_text + step)
2600 else:
2601 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2602 else:
2603 if ns_status == "ERROR":
2604 raise ROclient.ROClientException(ns_status_info)
2605 elif ns_status == "BUILD":
2606 detailed_status = step + "; {}".format(ns_status_info)
2607 elif ns_status == "ACTIVE":
2608 step = detailed_status = \
2609 "Waiting for management IP address reported by the VIM. Updating VNFRs"
2610 if not vnfr_scaled:
2611 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
2612 vnfr_scaled = True
2613 try:
2614 desc = await self.RO.show("ns", RO_nsr_id)
2615 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
2616 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
2617 break
2618 except LcmExceptionNoMgmtIP:
2619 pass
2620 else:
2621 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
2622 if detailed_status != detailed_status_old:
2623 self._update_suboperation_status(
2624 db_nslcmop, op_index, 'COMPLETED', detailed_status)
2625 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
2626 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2627
2628 await asyncio.sleep(5, loop=self.loop)
2629 deployment_timeout -= 5
2630 if deployment_timeout <= 0:
2631 self._update_suboperation_status(
2632 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
2633 raise ROclient.ROClientException("Timeout waiting ns to be ready")
2634
2635 # update VDU_SCALING_INFO with the obtained ip_addresses
2636 if vdu_scaling_info["scaling_direction"] == "OUT":
2637 for vdur in reversed(db_vnfr["vdur"]):
2638 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
2639 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
2640 vdu_scaling_info["vdu"].append({
2641 "name": vdur["name"],
2642 "vdu_id": vdur["vdu-id-ref"],
2643 "interface": []
2644 })
2645 for interface in vdur["interfaces"]:
2646 vdu_scaling_info["vdu"][-1]["interface"].append({
2647 "name": interface["name"],
2648 "ip_address": interface["ip-address"],
2649 "mac_address": interface.get("mac-address"),
2650 })
2651 del vdu_scaling_info["vdu-create"]
2652
2653 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
2654 # SCALE RO - END
2655
2656 scale_process = None
2657 if db_nsr_update:
2658 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2659
2660 # POST-SCALE BEGIN
2661 # execute primitive service POST-SCALING
2662 step = "Executing post-scale vnf-config-primitive"
2663 if scaling_descriptor.get("scaling-config-action"):
2664 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
2665 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
2666 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
2667 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
2668 step = db_nslcmop_update["detailed-status"] = \
2669 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
2670
2671 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
2672 if db_vnfr.get("additionalParamsForVnf"):
2673 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
2674
2675 # look for primitive
2676 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
2677 if config_primitive["name"] == vnf_config_primitive:
2678 break
2679 else:
2680 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
2681 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
2682 "match any vnf-configuration:config-primitive".format(scaling_group,
2683 config_primitive))
2684 scale_process = "VCA"
2685 db_nsr_update["config-status"] = "configuring post-scaling"
2686 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
2687
2688 # Post-scale reintent check: Check if this sub-operation has been executed before
2689 op_index = self._check_or_add_scale_suboperation(
2690 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
2691 if (op_index == self.SUBOPERATION_STATUS_SKIP):
2692 # Skip sub-operation
2693 result = 'COMPLETED'
2694 result_detail = 'Done'
2695 self.logger.debug(logging_text +
2696 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
2697 format(vnf_config_primitive, result, result_detail))
2698 else:
2699 if (op_index == self.SUBOPERATION_STATUS_NEW):
2700 # New sub-operation: Get index of this sub-operation
2701 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
2702 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
2703 format(vnf_config_primitive))
2704 else:
2705 # Reintent: Get registered params for this existing sub-operation
2706 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
2707 vnf_index = op.get('member_vnf_index')
2708 vnf_config_primitive = op.get('primitive')
2709 primitive_params = op.get('primitive_params')
2710 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation reintent".
2711 format(vnf_config_primitive))
2712 # Execute the primitive, either with new (first-time) or registered (reintent) args
2713 result, result_detail = await self._ns_execute_primitive(
2714 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive, primitive_params)
2715 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
2716 vnf_config_primitive, result, result_detail))
2717 # Update operationState = COMPLETED | FAILED
2718 self._update_suboperation_status(
2719 db_nslcmop, op_index, result, result_detail)
2720
2721 if result == "FAILED":
2722 raise LcmException(result_detail)
2723 db_nsr_update["config-status"] = old_config_status
2724 scale_process = None
2725 # POST-SCALE END
2726
2727 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
2728 db_nslcmop_update["statusEnteredTime"] = time()
2729 db_nslcmop_update["detailed-status"] = "done"
2730 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
2731 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
2732 else old_operational_status
2733 db_nsr_update["config-status"] = old_config_status
2734 return
2735 except (ROclient.ROClientException, DbException, LcmException) as e:
2736 self.logger.error(logging_text + "Exit Exception {}".format(e))
2737 exc = e
2738 except asyncio.CancelledError:
2739 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
2740 exc = "Operation was cancelled"
2741 except Exception as e:
2742 exc = traceback.format_exc()
2743 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
2744 finally:
2745 if exc:
2746 if db_nslcmop:
2747 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
2748 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
2749 db_nslcmop_update["statusEnteredTime"] = time()
2750 if db_nsr:
2751 db_nsr_update["operational-status"] = old_operational_status
2752 db_nsr_update["config-status"] = old_config_status
2753 db_nsr_update["detailed-status"] = ""
2754 db_nsr_update["_admin.nslcmop"] = None
2755 if scale_process:
2756 if "VCA" in scale_process:
2757 db_nsr_update["config-status"] = "failed"
2758 if "RO" in scale_process:
2759 db_nsr_update["operational-status"] = "failed"
2760 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
2761 exc)
2762 try:
2763 if db_nslcmop and db_nslcmop_update:
2764 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
2765 if db_nsr:
2766 db_nsr_update["_admin.nslcmop"] = None
2767 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2768 except DbException as e:
2769 self.logger.error(logging_text + "Cannot update database: {}".format(e))
2770 if nslcmop_operation_state:
2771 try:
2772 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2773 "operationState": nslcmop_operation_state},
2774 loop=self.loop)
2775 # if cooldown_time:
2776 # await asyncio.sleep(cooldown_time)
2777 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
2778 except Exception as e:
2779 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2780 self.logger.debug(logging_text + "Exit")
2781 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")