bug 650 adding WIM account instantiation parameter.
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 30 * 60 # global timeout for deployment
73 timeout_charm_delete = 5 * 60
74 timeout_primitive = 5 * 60 # timeout for primitive execution
75
76 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 # logging
83 self.logger = logging.getLogger('lcm.ns')
84 self.loop = loop
85 self.lcm_tasks = lcm_tasks
86
87 super().__init__(db, msg, fs, self.logger)
88
89 self.ro_config = ro_config
90
91 self.n2vc = N2VC(
92 log=self.logger,
93 server=vca_config['host'],
94 port=vca_config['port'],
95 user=vca_config['user'],
96 secret=vca_config['secret'],
97 # TODO: This should point to the base folder where charms are stored,
98 # if there is a common one (like object storage). Otherwise, leave
99 # it unset and pass it via DeployCharms
100 # artifacts=vca_config[''],
101 artifacts=None,
102 )
103
104 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
105 """
106 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
107 :param vnfd: input vnfd
108 :param new_id: overrides vnf id if provided
109 :param additionalParams: Instantiation params for VNFs provided
110 :param nsrId: Id of the NSR
111 :return: copy of vnfd
112 """
113 try:
114 vnfd_RO = deepcopy(vnfd)
115 # remove unused by RO configuration, monitoring, scaling and internal keys
116 vnfd_RO.pop("_id", None)
117 vnfd_RO.pop("_admin", None)
118 vnfd_RO.pop("vnf-configuration", None)
119 vnfd_RO.pop("monitoring-param", None)
120 vnfd_RO.pop("scaling-group-descriptor", None)
121 if new_id:
122 vnfd_RO["id"] = new_id
123
124 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
125 for vdu in get_iterable(vnfd_RO, "vdu"):
126 cloud_init_file = None
127 if vdu.get("cloud-init-file"):
128 base_folder = vnfd["_admin"]["storage"]
129 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
130 vdu["cloud-init-file"])
131 with self.fs.file_open(cloud_init_file, "r") as ci_file:
132 cloud_init_content = ci_file.read()
133 vdu.pop("cloud-init-file", None)
134 elif vdu.get("cloud-init"):
135 cloud_init_content = vdu["cloud-init"]
136 else:
137 continue
138
139 env = Environment()
140 ast = env.parse(cloud_init_content)
141 mandatory_vars = meta.find_undeclared_variables(ast)
142 if mandatory_vars:
143 for var in mandatory_vars:
144 if not additionalParams or var not in additionalParams.keys():
145 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
146 "file, must be provided in the instantiation parameters inside the "
147 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
148 template = Template(cloud_init_content)
149 cloud_init_content = template.render(additionalParams or {})
150 vdu["cloud-init"] = cloud_init_content
151
152 return vnfd_RO
153 except FsException as e:
154 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
155 format(vnfd["id"], vdu["id"], cloud_init_file, e))
156 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
157 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
158 format(vnfd["id"], vdu["id"], e))
159
160 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
161 """
162 Callback both for charm status change and task completion
163 :param model_name: Charm model name
164 :param application_name: Charm application name
165 :param status: Can be
166 - blocked: The unit needs manual intervention
167 - maintenance: The unit is actively deploying/configuring
168 - waiting: The unit is waiting for another charm to be ready
169 - active: The unit is deployed, configured, and ready
170 - error: The charm has failed and needs attention.
171 - terminated: The charm has been destroyed
172 - removing,
173 - removed
174 :param message: detailed message error
175 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
176 nsr_id:
177 nslcmop_id:
178 lcmOperationType: currently "instantiate"
179 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
180 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
181 n2vc_event: event used to notify instantiation task that some change has been produced
182 :param task: None for charm status change, or task for completion task callback
183 :return:
184 """
185 try:
186 nsr_id = n2vc_info["nsr_id"]
187 deployed = n2vc_info["deployed"]
188 db_nsr_update = n2vc_info["db_update"]
189 nslcmop_id = n2vc_info["nslcmop_id"]
190 ns_operation = n2vc_info["lcmOperationType"]
191 n2vc_event = n2vc_info["n2vc_event"]
192 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
193 application_name)
194 for vca_index, vca_deployed in enumerate(deployed):
195 if not vca_deployed:
196 continue
197 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
198 break
199 else:
200 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
201 return
202 if task:
203 if task.cancelled():
204 self.logger.debug(logging_text + " task Cancelled")
205 vca_deployed['operational-status'] = "error"
206 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
207 vca_deployed['detailed-status'] = "Task Cancelled"
208 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
209
210 elif task.done():
211 exc = task.exception()
212 if exc:
213 self.logger.error(logging_text + " task Exception={}".format(exc))
214 vca_deployed['operational-status'] = "error"
215 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
216 vca_deployed['detailed-status'] = str(exc)
217 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
218 else:
219 self.logger.debug(logging_text + " task Done")
220 # task is Done, but callback is still ongoing. So ignore
221 return
222 elif status:
223 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
224 if vca_deployed['operational-status'] == status:
225 return # same status, ignore
226 vca_deployed['operational-status'] = status
227 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
228 vca_deployed['detailed-status'] = str(message)
229 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
230 else:
231 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
232 return
233 # wake up instantiate task
234 n2vc_event.set()
235 except Exception as e:
236 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
237
238 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
239 """
240 Creates a RO ns descriptor from OSM ns_instantiate params
241 :param ns_params: OSM instantiate params
242 :return: The RO ns descriptor
243 """
244 vim_2_RO = {}
245 wim_2_RO = {}
246 # TODO feature 1417: Check that no instantiation is set over PDU
247 # check if PDU forces a concrete vim-network-id and add it
248 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
249
250 def vim_account_2_RO(vim_account):
251 if vim_account in vim_2_RO:
252 return vim_2_RO[vim_account]
253
254 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
255 if db_vim["_admin"]["operationalState"] != "ENABLED":
256 raise LcmException("VIM={} is not available. operationalState={}".format(
257 vim_account, db_vim["_admin"]["operationalState"]))
258 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
259 vim_2_RO[vim_account] = RO_vim_id
260 return RO_vim_id
261
262 def wim_account_2_RO(wim_account):
263 if isinstance(wim_account, str):
264 if wim_account in wim_2_RO:
265 return wim_2_RO[wim_account]
266
267 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
268 if db_wim["_admin"]["operationalState"] != "ENABLED":
269 raise LcmException("WIM={} is not available. operationalState={}".format(
270 wim_account, db_wim["_admin"]["operationalState"]))
271 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
272 wim_2_RO[wim_account] = RO_wim_id
273 return RO_wim_id
274 else:
275 return wim_account
276
277 def ip_profile_2_RO(ip_profile):
278 RO_ip_profile = deepcopy((ip_profile))
279 if "dns-server" in RO_ip_profile:
280 if isinstance(RO_ip_profile["dns-server"], list):
281 RO_ip_profile["dns-address"] = []
282 for ds in RO_ip_profile.pop("dns-server"):
283 RO_ip_profile["dns-address"].append(ds['address'])
284 else:
285 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
286 if RO_ip_profile.get("ip-version") == "ipv4":
287 RO_ip_profile["ip-version"] = "IPv4"
288 if RO_ip_profile.get("ip-version") == "ipv6":
289 RO_ip_profile["ip-version"] = "IPv6"
290 if "dhcp-params" in RO_ip_profile:
291 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
292 return RO_ip_profile
293
294 if not ns_params:
295 return None
296 RO_ns_params = {
297 # "name": ns_params["nsName"],
298 # "description": ns_params.get("nsDescription"),
299 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
300 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
301 # "scenario": ns_params["nsdId"],
302 }
303 if n2vc_key_list:
304 for vnfd_ref, vnfd in vnfd_dict.items():
305 vdu_needed_access = []
306 mgmt_cp = None
307 if vnfd.get("vnf-configuration"):
308 if vnfd.get("mgmt-interface"):
309 if vnfd["mgmt-interface"].get("vdu-id"):
310 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
311 elif vnfd["mgmt-interface"].get("cp"):
312 mgmt_cp = vnfd["mgmt-interface"]["cp"]
313
314 for vdu in vnfd.get("vdu", ()):
315 if vdu.get("vdu-configuration"):
316 vdu_needed_access.append(vdu["id"])
317 elif mgmt_cp:
318 for vdu_interface in vdu.get("interface"):
319 if vdu_interface.get("external-connection-point-ref") and \
320 vdu_interface["external-connection-point-ref"] == mgmt_cp:
321 vdu_needed_access.append(vdu["id"])
322 mgmt_cp = None
323 break
324
325 if vdu_needed_access:
326 for vnf_member in nsd.get("constituent-vnfd"):
327 if vnf_member["vnfd-id-ref"] != vnfd_ref:
328 continue
329 for vdu in vdu_needed_access:
330 populate_dict(RO_ns_params,
331 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
332 n2vc_key_list)
333
334 if ns_params.get("vduImage"):
335 RO_ns_params["vduImage"] = ns_params["vduImage"]
336
337 if ns_params.get("ssh_keys"):
338 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
339 for vnf_params in get_iterable(ns_params, "vnf"):
340 for constituent_vnfd in nsd["constituent-vnfd"]:
341 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
342 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
343 break
344 else:
345 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
346 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
347 if vnf_params.get("vimAccountId"):
348 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
349 vim_account_2_RO(vnf_params["vimAccountId"]))
350
351 for vdu_params in get_iterable(vnf_params, "vdu"):
352 # TODO feature 1417: check that this VDU exist and it is not a PDU
353 if vdu_params.get("volume"):
354 for volume_params in vdu_params["volume"]:
355 if volume_params.get("vim-volume-id"):
356 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
357 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
358 volume_params["vim-volume-id"])
359 if vdu_params.get("interface"):
360 for interface_params in vdu_params["interface"]:
361 if interface_params.get("ip-address"):
362 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
363 vdu_params["id"], "interfaces", interface_params["name"],
364 "ip_address"),
365 interface_params["ip-address"])
366 if interface_params.get("mac-address"):
367 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
368 vdu_params["id"], "interfaces", interface_params["name"],
369 "mac_address"),
370 interface_params["mac-address"])
371 if interface_params.get("floating-ip-required"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
373 vdu_params["id"], "interfaces", interface_params["name"],
374 "floating-ip"),
375 interface_params["floating-ip-required"])
376
377 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
378 if internal_vld_params.get("vim-network-name"):
379 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
380 internal_vld_params["name"], "vim-network-name"),
381 internal_vld_params["vim-network-name"])
382 if internal_vld_params.get("vim-network-id"):
383 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
384 internal_vld_params["name"], "vim-network-id"),
385 internal_vld_params["vim-network-id"])
386 if internal_vld_params.get("ip-profile"):
387 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
388 internal_vld_params["name"], "ip-profile"),
389 ip_profile_2_RO(internal_vld_params["ip-profile"]))
390
391 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
392 # look for interface
393 iface_found = False
394 for vdu_descriptor in vnf_descriptor["vdu"]:
395 for vdu_interface in vdu_descriptor["interface"]:
396 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
397 if icp_params.get("ip-address"):
398 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
399 vdu_descriptor["id"], "interfaces",
400 vdu_interface["name"], "ip_address"),
401 icp_params["ip-address"])
402
403 if icp_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_descriptor["id"], "interfaces",
406 vdu_interface["name"], "mac_address"),
407 icp_params["mac-address"])
408 iface_found = True
409 break
410 if iface_found:
411 break
412 else:
413 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
414 "internal-vld:id-ref={} is not present at vnfd:internal-"
415 "connection-point".format(vnf_params["member-vnf-index"],
416 icp_params["id-ref"]))
417
418 for vld_params in get_iterable(ns_params, "vld"):
419 if "ip-profile" in vld_params:
420 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
421 ip_profile_2_RO(vld_params["ip-profile"]))
422
423 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
424 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
425 wim_account_2_RO(vld_params["wimAccountId"])),
426 if vld_params.get("vim-network-name"):
427 RO_vld_sites = []
428 if isinstance(vld_params["vim-network-name"], dict):
429 for vim_account, vim_net in vld_params["vim-network-name"].items():
430 RO_vld_sites.append({
431 "netmap-use": vim_net,
432 "datacenter": vim_account_2_RO(vim_account)
433 })
434 else: # isinstance str
435 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
436 if RO_vld_sites:
437 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
438 if vld_params.get("vim-network-id"):
439 RO_vld_sites = []
440 if isinstance(vld_params["vim-network-id"], dict):
441 for vim_account, vim_net in vld_params["vim-network-id"].items():
442 RO_vld_sites.append({
443 "netmap-use": vim_net,
444 "datacenter": vim_account_2_RO(vim_account)
445 })
446 else: # isinstance str
447 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
448 if RO_vld_sites:
449 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
450 if "vnfd-connection-point-ref" in vld_params:
451 for cp_params in vld_params["vnfd-connection-point-ref"]:
452 # look for interface
453 for constituent_vnfd in nsd["constituent-vnfd"]:
454 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
455 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
456 break
457 else:
458 raise LcmException(
459 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
460 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
461 match_cp = False
462 for vdu_descriptor in vnf_descriptor["vdu"]:
463 for interface_descriptor in vdu_descriptor["interface"]:
464 if interface_descriptor.get("external-connection-point-ref") == \
465 cp_params["vnfd-connection-point-ref"]:
466 match_cp = True
467 break
468 if match_cp:
469 break
470 else:
471 raise LcmException(
472 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
473 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
474 cp_params["member-vnf-index-ref"],
475 cp_params["vnfd-connection-point-ref"],
476 vnf_descriptor["id"]))
477 if cp_params.get("ip-address"):
478 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
479 vdu_descriptor["id"], "interfaces",
480 interface_descriptor["name"], "ip_address"),
481 cp_params["ip-address"])
482 if cp_params.get("mac-address"):
483 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
484 vdu_descriptor["id"], "interfaces",
485 interface_descriptor["name"], "mac_address"),
486 cp_params["mac-address"])
487 return RO_ns_params
488
489 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
490 # make a copy to do not change
491 vdu_create = copy(vdu_create)
492 vdu_delete = copy(vdu_delete)
493
494 vdurs = db_vnfr.get("vdur")
495 if vdurs is None:
496 vdurs = []
497 vdu_index = len(vdurs)
498 while vdu_index:
499 vdu_index -= 1
500 vdur = vdurs[vdu_index]
501 if vdur.get("pdu-type"):
502 continue
503 vdu_id_ref = vdur["vdu-id-ref"]
504 if vdu_create and vdu_create.get(vdu_id_ref):
505 for index in range(0, vdu_create[vdu_id_ref]):
506 vdur = deepcopy(vdur)
507 vdur["_id"] = str(uuid4())
508 vdur["count-index"] += 1
509 vdurs.insert(vdu_index+1+index, vdur)
510 del vdu_create[vdu_id_ref]
511 if vdu_delete and vdu_delete.get(vdu_id_ref):
512 del vdurs[vdu_index]
513 vdu_delete[vdu_id_ref] -= 1
514 if not vdu_delete[vdu_id_ref]:
515 del vdu_delete[vdu_id_ref]
516 # check all operations are done
517 if vdu_create or vdu_delete:
518 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
519 vdu_create))
520 if vdu_delete:
521 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
522 vdu_delete))
523
524 vnfr_update = {"vdur": vdurs}
525 db_vnfr["vdur"] = vdurs
526 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
527
528 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
529 """
530 Updates database nsr with the RO info for the created vld
531 :param ns_update_nsr: dictionary to be filled with the updated info
532 :param db_nsr: content of db_nsr. This is also modified
533 :param nsr_desc_RO: nsr descriptor from RO
534 :return: Nothing, LcmException is raised on errors
535 """
536
537 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
538 for net_RO in get_iterable(nsr_desc_RO, "nets"):
539 if vld["id"] != net_RO.get("ns_net_osm_id"):
540 continue
541 vld["vim-id"] = net_RO.get("vim_net_id")
542 vld["name"] = net_RO.get("vim_name")
543 vld["status"] = net_RO.get("status")
544 vld["status-detailed"] = net_RO.get("error_msg")
545 ns_update_nsr["vld.{}".format(vld_index)] = vld
546 break
547 else:
548 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
549
550 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
551 """
552 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
553 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
554 :param nsr_desc_RO: nsr descriptor from RO
555 :return: Nothing, LcmException is raised on errors
556 """
557 for vnf_index, db_vnfr in db_vnfrs.items():
558 for vnf_RO in nsr_desc_RO["vnfs"]:
559 if vnf_RO["member_vnf_index"] != vnf_index:
560 continue
561 vnfr_update = {}
562 if vnf_RO.get("ip_address"):
563 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
564 elif not db_vnfr.get("ip-address"):
565 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
566
567 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
568 vdur_RO_count_index = 0
569 if vdur.get("pdu-type"):
570 continue
571 for vdur_RO in get_iterable(vnf_RO, "vms"):
572 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
573 continue
574 if vdur["count-index"] != vdur_RO_count_index:
575 vdur_RO_count_index += 1
576 continue
577 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
578 vdur["ip-address"] = vdur_RO.get("ip_address")
579 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
580 vdur["name"] = vdur_RO.get("vim_name")
581 vdur["status"] = vdur_RO.get("status")
582 vdur["status-detailed"] = vdur_RO.get("error_msg")
583 for ifacer in get_iterable(vdur, "interfaces"):
584 for interface_RO in get_iterable(vdur_RO, "interfaces"):
585 if ifacer["name"] == interface_RO.get("internal_name"):
586 ifacer["ip-address"] = interface_RO.get("ip_address")
587 ifacer["mac-address"] = interface_RO.get("mac_address")
588 break
589 else:
590 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
591 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
592 vnfr_update["vdur.{}".format(vdu_index)] = vdur
593 break
594 else:
595 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
596 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
597
598 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
599 for net_RO in get_iterable(nsr_desc_RO, "nets"):
600 if vld["id"] != net_RO.get("vnf_net_osm_id"):
601 continue
602 vld["vim-id"] = net_RO.get("vim_net_id")
603 vld["name"] = net_RO.get("vim_name")
604 vld["status"] = net_RO.get("status")
605 vld["status-detailed"] = net_RO.get("error_msg")
606 vnfr_update["vld.{}".format(vld_index)] = vld
607 break
608 else:
609 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
610 vnf_index, vld["id"]))
611
612 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
613 break
614
615 else:
616 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
617
618 async def instantiate(self, nsr_id, nslcmop_id):
619 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
620 self.logger.debug(logging_text + "Enter")
621 # get all needed from database
622 start_deploy = time()
623 db_nsr = None
624 db_nslcmop = None
625 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
626 db_nslcmop_update = {}
627 nslcmop_operation_state = None
628 db_vnfrs = {}
629 RO_descriptor_number = 0 # number of descriptors created at RO
630 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
631 n2vc_info = {}
632 exc = None
633 try:
634 step = "Getting nslcmop={} from db".format(nslcmop_id)
635 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
636 step = "Getting nsr={} from db".format(nsr_id)
637 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
638 ns_params = db_nslcmop.get("operationParams")
639 nsd = db_nsr["nsd"]
640 nsr_name = db_nsr["name"] # TODO short-name??
641
642 # look if previous tasks in process
643 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
644 if task_dependency:
645 step = db_nslcmop_update["detailed-status"] = \
646 "Waiting for related tasks to be completed: {}".format(task_name)
647 self.logger.debug(logging_text + step)
648 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
649 _, pending = await asyncio.wait(task_dependency, timeout=3600)
650 if pending:
651 raise LcmException("Timeout waiting related tasks to be completed")
652
653 step = "Getting vnfrs from db"
654 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
655 db_vnfds_ref = {}
656 db_vnfds = {}
657 for vnfr in db_vnfrs_list:
658 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
659 vnfd_id = vnfr["vnfd-id"]
660 vnfd_ref = vnfr["vnfd-ref"]
661 if vnfd_id not in db_vnfds:
662 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
663 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
664 db_vnfds_ref[vnfd_ref] = vnfd
665 db_vnfds[vnfd_id] = vnfd
666
667 # Get or generates the _admin.deployed,VCA list
668 vca_deployed_list = None
669 if db_nsr["_admin"].get("deployed"):
670 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
671 if vca_deployed_list is None:
672 vca_deployed_list = []
673 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
674 elif isinstance(vca_deployed_list, dict):
675 # maintain backward compatibility. Change a dict to list at database
676 vca_deployed_list = list(vca_deployed_list.values())
677 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
678
679 db_nsr_update["detailed-status"] = "creating"
680 db_nsr_update["operational-status"] = "init"
681 if not db_nsr["_admin"].get("deployed") or not db_nsr["_admin"]["deployed"].get("RO") or \
682 not db_nsr["_admin"]["deployed"]["RO"].get("vnfd"):
683 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
684 db_nsr_update["_admin.deployed.RO.vnfd"] = []
685
686 RO = ROclient.ROClient(self.loop, **self.ro_config)
687
688 # set state to INSTANTIATED. When instantiated NBI will not delete directly
689 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
690 self.update_db_2("nsrs", nsr_id, db_nsr_update)
691
692 # get vnfds, instantiate at RO
693 for c_vnf in nsd.get("constituent-vnfd", ()):
694 member_vnf_index = c_vnf["member-vnf-index"]
695 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
696 vnfd_ref = vnfd["id"]
697 step = db_nsr_update["detailed-status"] = "Creating vnfd='{}' member-vnf-index='{}' at RO".format(
698 vnfd_ref, member_vnf_index)
699 # self.logger.debug(logging_text + step)
700 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
701 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
702 RO_descriptor_number += 1
703
704 # look position at deployed.RO.vnfd if not present it will be appended at the end
705 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
706 if vnf_deployed["member-vnf-index"] == member_vnf_index:
707 break
708 else:
709 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
710 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
711
712 # look if present
713 RO_update = {"member-vnf-index": member_vnf_index}
714 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
715 if vnfd_list:
716 RO_update["id"] = vnfd_list[0]["uuid"]
717 self.logger.debug(logging_text + "vnfd='{}' member-vnf-index='{}' exists at RO. Using RO_id={}".
718 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
719 else:
720 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
721 get("additionalParamsForVnf"), nsr_id)
722 desc = await RO.create("vnfd", descriptor=vnfd_RO)
723 RO_update["id"] = desc["uuid"]
724 self.logger.debug(logging_text + "vnfd='{}' member-vnf-index='{}' created at RO. RO_id={}".format(
725 vnfd_ref, member_vnf_index, desc["uuid"]))
726 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
727 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
728 self.update_db_2("nsrs", nsr_id, db_nsr_update)
729
730 # create nsd at RO
731 nsd_ref = nsd["id"]
732 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
733 # self.logger.debug(logging_text + step)
734
735 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
736 RO_descriptor_number += 1
737 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
738 if nsd_list:
739 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
740 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
741 nsd_ref, RO_nsd_uuid))
742 else:
743 nsd_RO = deepcopy(nsd)
744 nsd_RO["id"] = RO_osm_nsd_id
745 nsd_RO.pop("_id", None)
746 nsd_RO.pop("_admin", None)
747 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
748 member_vnf_index = c_vnf["member-vnf-index"]
749 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
750 for c_vld in nsd_RO.get("vld", ()):
751 for cp in c_vld.get("vnfd-connection-point-ref", ()):
752 member_vnf_index = cp["member-vnf-index-ref"]
753 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
754
755 desc = await RO.create("nsd", descriptor=nsd_RO)
756 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
757 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
758 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
759 self.update_db_2("nsrs", nsr_id, db_nsr_update)
760
761 # Crate ns at RO
762 # if present use it unless in error status
763 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
764 if RO_nsr_id:
765 try:
766 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
767 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
768 desc = await RO.show("ns", RO_nsr_id)
769 except ROclient.ROClientException as e:
770 if e.http_code != HTTPStatus.NOT_FOUND:
771 raise
772 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
773 if RO_nsr_id:
774 ns_status, ns_status_info = RO.check_ns_status(desc)
775 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
776 if ns_status == "ERROR":
777 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
778 self.logger.debug(logging_text + step)
779 await RO.delete("ns", RO_nsr_id)
780 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
781 if not RO_nsr_id:
782 step = db_nsr_update["detailed-status"] = "Checking dependencies"
783 # self.logger.debug(logging_text + step)
784
785 # check if VIM is creating and wait look if previous tasks in process
786 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
787 if task_dependency:
788 step = "Waiting for related tasks to be completed: {}".format(task_name)
789 self.logger.debug(logging_text + step)
790 await asyncio.wait(task_dependency, timeout=3600)
791 if ns_params.get("vnf"):
792 for vnf in ns_params["vnf"]:
793 if "vimAccountId" in vnf:
794 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
795 vnf["vimAccountId"])
796 if task_dependency:
797 step = "Waiting for related tasks to be completed: {}".format(task_name)
798 self.logger.debug(logging_text + step)
799 await asyncio.wait(task_dependency, timeout=3600)
800
801 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
802
803 # feature 1429. Add n2vc public key to needed VMs
804 n2vc_key = await self.n2vc.GetPublicKey()
805 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
806
807 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
808 desc = await RO.create("ns", descriptor=RO_ns_params,
809 name=db_nsr["name"],
810 scenario=RO_nsd_uuid)
811 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
812 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
813 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
814 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
815 self.update_db_2("nsrs", nsr_id, db_nsr_update)
816
817 # wait until NS is ready
818 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
819 detailed_status_old = None
820 self.logger.debug(logging_text + step)
821
822 while time() <= start_deploy + self.total_deploy_timeout:
823 desc = await RO.show("ns", RO_nsr_id)
824 ns_status, ns_status_info = RO.check_ns_status(desc)
825 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
826 if ns_status == "ERROR":
827 raise ROclient.ROClientException(ns_status_info)
828 elif ns_status == "BUILD":
829 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
830 elif ns_status == "ACTIVE":
831 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
832 try:
833 self.ns_update_vnfr(db_vnfrs, desc)
834 break
835 except LcmExceptionNoMgmtIP:
836 pass
837 else:
838 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
839 if detailed_status != detailed_status_old:
840 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
841 self.update_db_2("nsrs", nsr_id, db_nsr_update)
842 await asyncio.sleep(5, loop=self.loop)
843 else: # total_deploy_timeout
844 raise ROclient.ROClientException("Timeout waiting ns to be ready")
845
846 step = "Updating NSR"
847 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
848
849 db_nsr["detailed-status"] = "Configuring vnfr"
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851
852 # The parameters we'll need to deploy a charm
853 number_to_configure = 0
854
855 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info):
856 """An inner function to deploy the charm from either vnf or vdu
857 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
858 """
859 if not charm_params["rw_mgmt_ip"]:
860 raise LcmException("vnfd/vdu has not management ip address to configure it")
861 # Login to the VCA.
862 # if number_to_configure == 0:
863 # self.logger.debug("Logging into N2VC...")
864 # task = asyncio.ensure_future(self.n2vc.login())
865 # yield from asyncio.wait_for(task, 30.0)
866 # self.logger.debug("Logged into N2VC!")
867
868 # # await self.n2vc.login()
869
870 # Note: The charm needs to exist on disk at the location
871 # specified by charm_path.
872 base_folder = vnfd["_admin"]["storage"]
873 storage_params = self.fs.get_params()
874 charm_path = "{}{}/{}/charms/{}".format(
875 storage_params["path"],
876 base_folder["folder"],
877 base_folder["pkg-dir"],
878 proxy_charm
879 )
880
881 # ns_name will be ignored in the current version of N2VC
882 # but will be implemented for the next point release.
883 model_name = "default" # TODO bug 585 nsr_id
884 if vdu_id:
885 vdu_id_text = vdu_id + "-"
886 else:
887 vdu_id_text = "-"
888 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
889
890 vca_index = len(vca_deployed_list)
891 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
892 # 26*26 charm in the same NS
893 application_name = application_name[0:48]
894 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
895 vca_deployed_ = {
896 "member-vnf-index": vnf_index,
897 "vdu_id": vdu_id,
898 "model": model_name,
899 "application": application_name,
900 "operational-status": "init",
901 "detailed-status": "",
902 "vnfd_id": vnfd_id,
903 "vdu_name": vdu_name,
904 "vdu_count_index": vdu_count_index,
905 }
906 vca_deployed_list.append(vca_deployed_)
907 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
908 self.update_db_2("nsrs", nsr_id, db_nsr_update)
909
910 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
911 proxy_charm))
912 if not n2vc_info:
913 n2vc_info["nsr_id"] = nsr_id
914 n2vc_info["nslcmop_id"] = nslcmop_id
915 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
916 n2vc_info["lcmOperationType"] = "instantiate"
917 n2vc_info["deployed"] = vca_deployed_list
918 n2vc_info["db_update"] = db_nsr_update
919 task = asyncio.ensure_future(
920 self.n2vc.DeployCharms(
921 model_name, # The network service name
922 application_name, # The application name
923 vnfd, # The vnf descriptor
924 charm_path, # Path to charm
925 charm_params, # Runtime params, like mgmt ip
926 {}, # for native charms only
927 self.n2vc_callback, # Callback for status changes
928 n2vc_info, # Callback parameter
929 None, # Callback parameter (task)
930 )
931 )
932 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
933 n2vc_info))
934 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
935
936 step = "Looking for needed vnfd to configure"
937 self.logger.debug(logging_text + step)
938
939 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
940 vnfd_id = c_vnf["vnfd-id-ref"]
941 vnf_index = str(c_vnf["member-vnf-index"])
942 vnfd = db_vnfds_ref[vnfd_id]
943
944 # Get additional parameters
945 vnfr_params = {}
946 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
947 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
948 for k, v in vnfr_params.items():
949 if isinstance(v, str) and v.startswith("!!yaml "):
950 vnfr_params[k] = yaml.safe_load(v[7:])
951
952 # Check if this VNF has a charm configuration
953 vnf_config = vnfd.get("vnf-configuration")
954 if vnf_config and vnf_config.get("juju"):
955 proxy_charm = vnf_config["juju"]["charm"]
956
957 if proxy_charm:
958 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
959 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
960 charm_params = {
961 "user_values": vnfr_params,
962 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
963 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {}
964 }
965
966 # Login to the VCA. If there are multiple calls to login(),
967 # subsequent calls will be a nop and return immediately.
968 await self.n2vc.login()
969 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
970 number_to_configure += 1
971
972 # Deploy charms for each VDU that supports one.
973 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
974 vdu_config = vdu.get('vdu-configuration')
975 proxy_charm = None
976
977 if vdu_config and vdu_config.get("juju"):
978 proxy_charm = vdu_config["juju"]["charm"]
979
980 if proxy_charm:
981 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
982 await self.n2vc.login()
983 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
984 # TODO for the moment only first vdu_id contains a charm deployed
985 if vdur["vdu-id-ref"] != vdu["id"]:
986 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
987 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
988 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
989 charm_params = {
990 "user_values": vnfr_params,
991 "rw_mgmt_ip": vdur["ip-address"],
992 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
993 }
994 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
995 charm_params, n2vc_info)
996 number_to_configure += 1
997
998 db_nsr_update["operational-status"] = "running"
999 configuration_failed = False
1000 if number_to_configure:
1001 old_status = "configuring: init: {}".format(number_to_configure)
1002 db_nsr_update["config-status"] = old_status
1003 db_nsr_update["detailed-status"] = old_status
1004 db_nslcmop_update["detailed-status"] = old_status
1005
1006 # wait until all are configured.
1007 while time() <= start_deploy + self.total_deploy_timeout:
1008 if db_nsr_update:
1009 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1010 if db_nslcmop_update:
1011 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1012 # TODO add a fake task that set n2vc_event after some time
1013 await n2vc_info["n2vc_event"].wait()
1014 n2vc_info["n2vc_event"].clear()
1015 all_active = True
1016 status_map = {}
1017 n2vc_error_text = [] # contain text error list. If empty no one is in error status
1018 now = time()
1019 for vca_deployed in vca_deployed_list:
1020 vca_status = vca_deployed["operational-status"]
1021 if vca_status not in status_map:
1022 # Initialize it
1023 status_map[vca_status] = 0
1024 status_map[vca_status] += 1
1025
1026 if vca_status == "active":
1027 vca_deployed.pop("time_first_error", None)
1028 vca_deployed.pop("status_first_error", None)
1029 continue
1030
1031 all_active = False
1032 if vca_status in ("error", "blocked"):
1033 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
1034 # if not first time in this status error
1035 if not vca_deployed.get("time_first_error"):
1036 vca_deployed["time_first_error"] = now
1037 continue
1038 if vca_deployed.get("time_first_error") and \
1039 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1040 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1041 .format(vca_deployed["member-vnf-index"],
1042 vca_deployed["vdu_id"], vca_status,
1043 vca_deployed["detailed-status-error"]))
1044
1045 if all_active:
1046 break
1047 elif n2vc_error_text:
1048 db_nsr_update["config-status"] = "failed"
1049 error_text = "fail configuring " + ";".join(n2vc_error_text)
1050 db_nsr_update["detailed-status"] = error_text
1051 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1052 db_nslcmop_update["detailed-status"] = error_text
1053 db_nslcmop_update["statusEnteredTime"] = time()
1054 configuration_failed = True
1055 break
1056 else:
1057 cs = "configuring: "
1058 separator = ""
1059 for status, num in status_map.items():
1060 cs += separator + "{}: {}".format(status, num)
1061 separator = ", "
1062 if old_status != cs:
1063 db_nsr_update["config-status"] = cs
1064 db_nsr_update["detailed-status"] = cs
1065 db_nslcmop_update["detailed-status"] = cs
1066 old_status = cs
1067 else: # total_deploy_timeout
1068 raise LcmException("Timeout waiting ns to be configured")
1069
1070 if not configuration_failed:
1071 # all is done
1072 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1073 db_nslcmop_update["statusEnteredTime"] = time()
1074 db_nslcmop_update["detailed-status"] = "done"
1075 db_nsr_update["config-status"] = "configured"
1076 db_nsr_update["detailed-status"] = "done"
1077
1078 return
1079
1080 except (ROclient.ROClientException, DbException, LcmException) as e:
1081 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1082 exc = e
1083 except asyncio.CancelledError:
1084 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1085 exc = "Operation was cancelled"
1086 except Exception as e:
1087 exc = traceback.format_exc()
1088 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1089 exc_info=True)
1090 finally:
1091 if exc:
1092 if db_nsr:
1093 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1094 db_nsr_update["operational-status"] = "failed"
1095 if db_nslcmop:
1096 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1097 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1098 db_nslcmop_update["statusEnteredTime"] = time()
1099 try:
1100 if db_nsr:
1101 db_nsr_update["_admin.nslcmop"] = None
1102 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1103 if db_nslcmop_update:
1104 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1105 except DbException as e:
1106 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1107 if nslcmop_operation_state:
1108 try:
1109 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1110 "operationState": nslcmop_operation_state},
1111 loop=self.loop)
1112 except Exception as e:
1113 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1114
1115 self.logger.debug(logging_text + "Exit")
1116 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1117
1118 async def _destroy_charm(self, model, application):
1119 """
1120 Order N2VC destroy a charm
1121 :param model:
1122 :param application:
1123 :return: True if charm does not exist. False if it exist
1124 """
1125 if not await self.n2vc.HasApplication(model, application):
1126 return True # Already removed
1127 await self.n2vc.RemoveCharms(model, application)
1128 return False
1129
1130 async def _wait_charm_destroyed(self, model, application, timeout):
1131 """
1132 Wait until charm does not exist
1133 :param model:
1134 :param application:
1135 :param timeout:
1136 :return: True if not exist, False if timeout
1137 """
1138 while True:
1139 if not await self.n2vc.HasApplication(model, application):
1140 return True
1141 if timeout < 0:
1142 return False
1143 await asyncio.sleep(10)
1144 timeout -= 10
1145
1146 async def terminate(self, nsr_id, nslcmop_id):
1147 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1148 self.logger.debug(logging_text + "Enter")
1149 db_nsr = None
1150 db_nslcmop = None
1151 exc = None
1152 failed_detail = [] # annotates all failed error messages
1153 vca_time_destroy = None # time of where destroy charm order
1154 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1155 db_nslcmop_update = {}
1156 nslcmop_operation_state = None
1157 autoremove = False # autoremove after terminated
1158 try:
1159 step = "Getting nslcmop={} from db".format(nslcmop_id)
1160 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1161 step = "Getting nsr={} from db".format(nsr_id)
1162 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1163 # nsd = db_nsr["nsd"]
1164 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1165 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1166 return
1167 # #TODO check if VIM is creating and wait
1168 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1169
1170 db_nsr_update["operational-status"] = "terminating"
1171 db_nsr_update["config-status"] = "terminating"
1172
1173 if nsr_deployed and nsr_deployed.get("VCA"):
1174 try:
1175 step = "Scheduling configuration charms removing"
1176 db_nsr_update["detailed-status"] = "Deleting charms"
1177 self.logger.debug(logging_text + step)
1178 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1179 # for backward compatibility
1180 if isinstance(nsr_deployed["VCA"], dict):
1181 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1182 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1183 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1184
1185 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1186 if vca_deployed:
1187 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1188 vca_deployed.clear()
1189 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1190 else:
1191 vca_time_destroy = time()
1192 except Exception as e:
1193 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1194
1195 # remove from RO
1196 RO_fail = False
1197 RO = ROclient.ROClient(self.loop, **self.ro_config)
1198
1199 # Delete ns
1200 RO_nsr_id = RO_delete_action = None
1201 if nsr_deployed and nsr_deployed.get("RO"):
1202 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1203 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1204 try:
1205 if RO_nsr_id:
1206 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1207 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1208 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1209 self.logger.debug(logging_text + step)
1210 desc = await RO.delete("ns", RO_nsr_id)
1211 RO_delete_action = desc["action_id"]
1212 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1213 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1214 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1215 if RO_delete_action:
1216 # wait until NS is deleted from VIM
1217 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1218 format(RO_nsr_id, RO_delete_action)
1219 detailed_status_old = None
1220 self.logger.debug(logging_text + step)
1221
1222 delete_timeout = 20 * 60 # 20 minutes
1223 while delete_timeout > 0:
1224 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1225 extra_item_id=RO_delete_action)
1226 ns_status, ns_status_info = RO.check_action_status(desc)
1227 if ns_status == "ERROR":
1228 raise ROclient.ROClientException(ns_status_info)
1229 elif ns_status == "BUILD":
1230 detailed_status = step + "; {}".format(ns_status_info)
1231 elif ns_status == "ACTIVE":
1232 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1233 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1234 break
1235 else:
1236 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1237 if detailed_status != detailed_status_old:
1238 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1239 db_nsr_update["detailed-status"] = detailed_status
1240 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1241 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1242 await asyncio.sleep(5, loop=self.loop)
1243 delete_timeout -= 5
1244 else: # delete_timeout <= 0:
1245 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1246
1247 except ROclient.ROClientException as e:
1248 if e.http_code == 404: # not found
1249 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1250 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1251 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1252 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1253 elif e.http_code == 409: # conflict
1254 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1255 self.logger.debug(logging_text + failed_detail[-1])
1256 RO_fail = True
1257 else:
1258 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1259 self.logger.error(logging_text + failed_detail[-1])
1260 RO_fail = True
1261
1262 # Delete nsd
1263 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1264 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1265 try:
1266 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1267 "Deleting nsd at RO"
1268 await RO.delete("nsd", RO_nsd_id)
1269 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1270 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1271 except ROclient.ROClientException as e:
1272 if e.http_code == 404: # not found
1273 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1274 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1275 elif e.http_code == 409: # conflict
1276 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1277 self.logger.debug(logging_text + failed_detail[-1])
1278 RO_fail = True
1279 else:
1280 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1281 self.logger.error(logging_text + failed_detail[-1])
1282 RO_fail = True
1283
1284 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
1285 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
1286 if not vnf_deployed or not vnf_deployed["id"]:
1287 continue
1288 try:
1289 RO_vnfd_id = vnf_deployed["id"]
1290 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1291 "Deleting member-vnf-index={} RO_vnfd_id={} from RO".format(
1292 vnf_deployed["member-vnf-index"], RO_vnfd_id)
1293 await RO.delete("vnfd", RO_vnfd_id)
1294 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1295 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1296 except ROclient.ROClientException as e:
1297 if e.http_code == 404: # not found
1298 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1299 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1300 elif e.http_code == 409: # conflict
1301 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1302 self.logger.debug(logging_text + failed_detail[-1])
1303 else:
1304 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1305 self.logger.error(logging_text + failed_detail[-1])
1306
1307 # wait until charm deleted
1308 if vca_time_destroy:
1309 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1310 "Waiting for deletion of configuration charms"
1311 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1312 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1313 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1314 if not vca_deployed:
1315 continue
1316 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1317 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1318 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1319 timeout):
1320 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1321 vca_deployed["application"]))
1322 else:
1323 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1324
1325 if failed_detail:
1326 self.logger.error(logging_text + " ;".join(failed_detail))
1327 db_nsr_update["operational-status"] = "failed"
1328 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1329 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1330 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1331 db_nslcmop_update["statusEnteredTime"] = time()
1332 else:
1333 db_nsr_update["operational-status"] = "terminated"
1334 db_nsr_update["detailed-status"] = "Done"
1335 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1336 db_nslcmop_update["detailed-status"] = "Done"
1337 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1338 db_nslcmop_update["statusEnteredTime"] = time()
1339 if db_nslcmop["operationParams"].get("autoremove"):
1340 autoremove = True
1341
1342 except (ROclient.ROClientException, DbException) as e:
1343 self.logger.error(logging_text + "Exit Exception {}".format(e))
1344 exc = e
1345 except asyncio.CancelledError:
1346 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1347 exc = "Operation was cancelled"
1348 except Exception as e:
1349 exc = traceback.format_exc()
1350 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1351 finally:
1352 if exc and db_nslcmop:
1353 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1354 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1355 db_nslcmop_update["statusEnteredTime"] = time()
1356 try:
1357 if db_nslcmop and db_nslcmop_update:
1358 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1359 if db_nsr:
1360 db_nsr_update["_admin.nslcmop"] = None
1361 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1362 except DbException as e:
1363 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1364 if nslcmop_operation_state:
1365 try:
1366 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1367 "operationState": nslcmop_operation_state,
1368 "autoremove": autoremove},
1369 loop=self.loop)
1370 except Exception as e:
1371 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1372 self.logger.debug(logging_text + "Exit")
1373 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1374
1375 @staticmethod
1376 def _map_primitive_params(primitive_desc, params, instantiation_params):
1377 """
1378 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
1379 The default-value is used. If it is between < > it look for a value at instantiation_params
1380 :param primitive_desc: portion of VNFD/NSD that describes primitive
1381 :param params: Params provided by user
1382 :param instantiation_params: Instantiation params provided by user
1383 :return: a dictionary with the calculated params
1384 """
1385 calculated_params = {}
1386 for parameter in primitive_desc.get("parameter", ()):
1387 param_name = parameter["name"]
1388 if param_name in params:
1389 calculated_params[param_name] = params[param_name]
1390 elif "default-value" in parameter:
1391 calculated_params[param_name] = parameter["default-value"]
1392 if isinstance(parameter["default-value"], str) and parameter["default-value"].startswith("<") and \
1393 parameter["default-value"].endswith(">"):
1394 if parameter["default-value"][1:-1] in instantiation_params:
1395 calculated_params[param_name] = instantiation_params[parameter["default-value"][1:-1]]
1396 else:
1397 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1398 format(parameter["default-value"], primitive_desc["name"]))
1399 else:
1400 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1401 format(param_name, primitive_desc["name"]))
1402
1403 if isinstance(calculated_params[param_name], (dict, list, tuple)):
1404 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
1405 width=256)
1406 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
1407 calculated_params[param_name] = calculated_params[param_name][7:]
1408 return calculated_params
1409
1410 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1411 primitive, primitive_params):
1412 start_primitive_time = time()
1413 try:
1414 for vca_deployed in db_deployed["VCA"]:
1415 if not vca_deployed:
1416 continue
1417 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1418 continue
1419 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1420 continue
1421 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1422 continue
1423 break
1424 else:
1425 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
1426 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1427 model_name = vca_deployed.get("model")
1428 application_name = vca_deployed.get("application")
1429 if not model_name or not application_name:
1430 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
1431 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
1432 vdu_count_index))
1433 if vca_deployed["operational-status"] != "active":
1434 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1435 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1436 callback = None # self.n2vc_callback
1437 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1438 await self.n2vc.login()
1439 primitive_id = await self.n2vc.ExecutePrimitive(
1440 model_name,
1441 application_name,
1442 primitive,
1443 callback,
1444 *callback_args,
1445 **primitive_params
1446 )
1447 while time() - start_primitive_time < self.timeout_primitive:
1448 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
1449 if primitive_result_ == "running":
1450 pass
1451 elif primitive_result_ in ("completed", "failed"):
1452 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
1453 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
1454 break
1455 else:
1456 detailed_result = "Invalid N2VC.GetPrimitiveStatus = {} obtained".format(primitive_result_)
1457 primitive_result = "FAILED"
1458 break
1459 await asyncio.sleep(5)
1460 else:
1461 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
1462 return primitive_result, detailed_result
1463 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
1464 return "FAILED", str(e)
1465
1466 async def action(self, nsr_id, nslcmop_id):
1467 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1468 self.logger.debug(logging_text + "Enter")
1469 # get all needed from database
1470 db_nsr = None
1471 db_nslcmop = None
1472 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1473 db_nslcmop_update = {}
1474 nslcmop_operation_state = None
1475 exc = None
1476 try:
1477 step = "Getting information from database"
1478 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1479 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1480
1481 nsr_deployed = db_nsr["_admin"].get("deployed")
1482 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1483 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1484 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1485 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1486
1487 step = "Getting vnfr from database"
1488 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1489 step = "Getting vnfd from database"
1490 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1491
1492 # look if previous tasks in process
1493 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1494 if task_dependency:
1495 step = db_nslcmop_update["detailed-status"] = \
1496 "Waiting for related tasks to be completed: {}".format(task_name)
1497 self.logger.debug(logging_text + step)
1498 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1499 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1500 if pending:
1501 raise LcmException("Timeout waiting related tasks to be completed")
1502
1503 # for backward compatibility
1504 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1505 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1506 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1507 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1508
1509 primitive = db_nslcmop["operationParams"]["primitive"]
1510 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1511
1512 # look for primitive
1513 config_primitive_desc = None
1514 if vdu_id:
1515 for vdu in get_iterable(db_vnfd, "vdu"):
1516 if vdu_id == vdu["id"]:
1517 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
1518 if config_primitive["name"] == primitive:
1519 config_primitive_desc = config_primitive
1520 break
1521 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1522 if config_primitive["name"] == primitive:
1523 config_primitive_desc = config_primitive
1524 break
1525 if not config_primitive_desc:
1526 raise LcmException("Primitive {} not found at vnf-configuration:config-primitive or vdu:"
1527 "vdu-configuration:config-primitive".format(primitive))
1528
1529 vnfr_params = {}
1530 if db_vnfr.get("additionalParamsForVnf"):
1531 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1532
1533 # TODO check if ns is in a proper status
1534 result, result_detail = await self._ns_execute_primitive(
1535 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1536 self._map_primitive_params(config_primitive_desc, primitive_params, vnfr_params))
1537 db_nslcmop_update["detailed-status"] = result_detail
1538 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1539 db_nslcmop_update["statusEnteredTime"] = time()
1540 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1541 return # database update is called inside finally
1542
1543 except (DbException, LcmException) as e:
1544 self.logger.error(logging_text + "Exit Exception {}".format(e))
1545 exc = e
1546 except asyncio.CancelledError:
1547 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1548 exc = "Operation was cancelled"
1549 except Exception as e:
1550 exc = traceback.format_exc()
1551 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1552 finally:
1553 if exc and db_nslcmop:
1554 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1555 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1556 db_nslcmop_update["statusEnteredTime"] = time()
1557 try:
1558 if db_nslcmop_update:
1559 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1560 if db_nsr:
1561 db_nsr_update["_admin.nslcmop"] = None
1562 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1563 except DbException as e:
1564 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1565 self.logger.debug(logging_text + "Exit")
1566 if nslcmop_operation_state:
1567 try:
1568 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1569 "operationState": nslcmop_operation_state},
1570 loop=self.loop)
1571 except Exception as e:
1572 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1573 self.logger.debug(logging_text + "Exit")
1574 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1575
1576 async def scale(self, nsr_id, nslcmop_id):
1577 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1578 self.logger.debug(logging_text + "Enter")
1579 # get all needed from database
1580 db_nsr = None
1581 db_nslcmop = None
1582 db_nslcmop_update = {}
1583 nslcmop_operation_state = None
1584 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1585 exc = None
1586 # in case of error, indicates what part of scale was failed to put nsr at error status
1587 scale_process = None
1588 old_operational_status = ""
1589 old_config_status = ""
1590 vnfr_scaled = False
1591 try:
1592 step = "Getting nslcmop from database"
1593 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1594 step = "Getting nsr from database"
1595 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1596
1597 old_operational_status = db_nsr["operational-status"]
1598 old_config_status = db_nsr["config-status"]
1599
1600 # look if previous tasks in process
1601 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1602 if task_dependency:
1603 step = db_nslcmop_update["detailed-status"] = \
1604 "Waiting for related tasks to be completed: {}".format(task_name)
1605 self.logger.debug(logging_text + step)
1606 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1607 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1608 if pending:
1609 raise LcmException("Timeout waiting related tasks to be completed")
1610
1611 step = "Parsing scaling parameters"
1612 db_nsr_update["operational-status"] = "scaling"
1613 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1614 nsr_deployed = db_nsr["_admin"].get("deployed")
1615 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1616 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1617 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1618 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1619 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1620
1621 # for backward compatibility
1622 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1623 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1624 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1625 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1626
1627 step = "Getting vnfr from database"
1628 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1629 step = "Getting vnfd from database"
1630 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1631 step = "Getting scaling-group-descriptor"
1632 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1633 if scaling_descriptor["name"] == scaling_group:
1634 break
1635 else:
1636 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1637 "at vnfd:scaling-group-descriptor".format(scaling_group))
1638 # cooldown_time = 0
1639 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1640 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1641 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1642 # break
1643
1644 # TODO check if ns is in a proper status
1645 step = "Sending scale order to RO"
1646 nb_scale_op = 0
1647 if not db_nsr["_admin"].get("scaling-group"):
1648 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1649 admin_scale_index = 0
1650 else:
1651 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1652 if admin_scale_info["name"] == scaling_group:
1653 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1654 break
1655 else: # not found, set index one plus last element and add new entry with the name
1656 admin_scale_index += 1
1657 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1658 RO_scaling_info = []
1659 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1660 if scaling_type == "SCALE_OUT":
1661 # count if max-instance-count is reached
1662 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1663 max_instance_count = int(scaling_descriptor["max-instance-count"])
1664 if nb_scale_op >= max_instance_count:
1665 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1666 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1667 nb_scale_op = nb_scale_op + 1
1668 vdu_scaling_info["scaling_direction"] = "OUT"
1669 vdu_scaling_info["vdu-create"] = {}
1670 for vdu_scale_info in scaling_descriptor["vdu"]:
1671 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1672 "type": "create", "count": vdu_scale_info.get("count", 1)})
1673 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1674 elif scaling_type == "SCALE_IN":
1675 # count if min-instance-count is reached
1676 min_instance_count = 0
1677 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1678 min_instance_count = int(scaling_descriptor["min-instance-count"])
1679 if nb_scale_op <= min_instance_count:
1680 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1681 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1682 nb_scale_op = nb_scale_op - 1
1683 vdu_scaling_info["scaling_direction"] = "IN"
1684 vdu_scaling_info["vdu-delete"] = {}
1685 for vdu_scale_info in scaling_descriptor["vdu"]:
1686 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1687 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1688 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1689
1690 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1691 vdu_create = vdu_scaling_info.get("vdu-create")
1692 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1693 if vdu_scaling_info["scaling_direction"] == "IN":
1694 for vdur in reversed(db_vnfr["vdur"]):
1695 if vdu_delete.get(vdur["vdu-id-ref"]):
1696 vdu_delete[vdur["vdu-id-ref"]] -= 1
1697 vdu_scaling_info["vdu"].append({
1698 "name": vdur["name"],
1699 "vdu_id": vdur["vdu-id-ref"],
1700 "interface": []
1701 })
1702 for interface in vdur["interfaces"]:
1703 vdu_scaling_info["vdu"][-1]["interface"].append({
1704 "name": interface["name"],
1705 "ip_address": interface["ip-address"],
1706 "mac_address": interface.get("mac-address"),
1707 })
1708 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1709
1710 # execute primitive service PRE-SCALING
1711 step = "Executing pre-scale vnf-config-primitive"
1712 if scaling_descriptor.get("scaling-config-action"):
1713 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1714 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1715 and scaling_type == "SCALE_IN":
1716 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1717 step = db_nslcmop_update["detailed-status"] = \
1718 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1719
1720 # look for primitive
1721 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1722 if config_primitive["name"] == vnf_config_primitive:
1723 break
1724 else:
1725 raise LcmException(
1726 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1727 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
1728 "primitive".format(scaling_group, config_primitive))
1729
1730 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1731 if db_vnfr.get("additionalParamsForVnf"):
1732 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1733
1734 scale_process = "VCA"
1735 db_nsr_update["config-status"] = "configuring pre-scaling"
1736 result, result_detail = await self._ns_execute_primitive(
1737 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1738 self._map_primitive_params(config_primitive, {}, vnfr_params))
1739 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1740 vnf_config_primitive, result, result_detail))
1741 if result == "FAILED":
1742 raise LcmException(result_detail)
1743 db_nsr_update["config-status"] = old_config_status
1744 scale_process = None
1745
1746 if RO_scaling_info:
1747 scale_process = "RO"
1748 RO = ROclient.ROClient(self.loop, **self.ro_config)
1749 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1750 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1751 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1752 # wait until ready
1753 RO_nslcmop_id = RO_desc["instance_action_id"]
1754 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1755
1756 RO_task_done = False
1757 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1758 detailed_status_old = None
1759 self.logger.debug(logging_text + step)
1760
1761 deployment_timeout = 1 * 3600 # One hour
1762 while deployment_timeout > 0:
1763 if not RO_task_done:
1764 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1765 extra_item_id=RO_nslcmop_id)
1766 ns_status, ns_status_info = RO.check_action_status(desc)
1767 if ns_status == "ERROR":
1768 raise ROclient.ROClientException(ns_status_info)
1769 elif ns_status == "BUILD":
1770 detailed_status = step + "; {}".format(ns_status_info)
1771 elif ns_status == "ACTIVE":
1772 RO_task_done = True
1773 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1774 self.logger.debug(logging_text + step)
1775 else:
1776 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1777 else:
1778 desc = await RO.show("ns", RO_nsr_id)
1779 ns_status, ns_status_info = RO.check_ns_status(desc)
1780 if ns_status == "ERROR":
1781 raise ROclient.ROClientException(ns_status_info)
1782 elif ns_status == "BUILD":
1783 detailed_status = step + "; {}".format(ns_status_info)
1784 elif ns_status == "ACTIVE":
1785 step = detailed_status = \
1786 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1787 if not vnfr_scaled:
1788 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1789 vnfr_scaled = True
1790 try:
1791 desc = await RO.show("ns", RO_nsr_id)
1792 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1793 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1794 break
1795 except LcmExceptionNoMgmtIP:
1796 pass
1797 else:
1798 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1799 if detailed_status != detailed_status_old:
1800 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1801 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1802
1803 await asyncio.sleep(5, loop=self.loop)
1804 deployment_timeout -= 5
1805 if deployment_timeout <= 0:
1806 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1807
1808 # update VDU_SCALING_INFO with the obtained ip_addresses
1809 if vdu_scaling_info["scaling_direction"] == "OUT":
1810 for vdur in reversed(db_vnfr["vdur"]):
1811 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1812 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1813 vdu_scaling_info["vdu"].append({
1814 "name": vdur["name"],
1815 "vdu_id": vdur["vdu-id-ref"],
1816 "interface": []
1817 })
1818 for interface in vdur["interfaces"]:
1819 vdu_scaling_info["vdu"][-1]["interface"].append({
1820 "name": interface["name"],
1821 "ip_address": interface["ip-address"],
1822 "mac_address": interface.get("mac-address"),
1823 })
1824 del vdu_scaling_info["vdu-create"]
1825
1826 scale_process = None
1827 if db_nsr_update:
1828 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1829
1830 # execute primitive service POST-SCALING
1831 step = "Executing post-scale vnf-config-primitive"
1832 if scaling_descriptor.get("scaling-config-action"):
1833 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1834 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1835 and scaling_type == "SCALE_OUT":
1836 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1837 step = db_nslcmop_update["detailed-status"] = \
1838 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1839
1840 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1841 if db_vnfr.get("additionalParamsForVnf"):
1842 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1843
1844 # look for primitive
1845 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1846 if config_primitive["name"] == vnf_config_primitive:
1847 break
1848 else:
1849 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1850 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1851 "match any vnf-configuration:config-primitive".format(scaling_group,
1852 config_primitive))
1853 scale_process = "VCA"
1854 db_nsr_update["config-status"] = "configuring post-scaling"
1855
1856 result, result_detail = await self._ns_execute_primitive(
1857 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1858 self._map_primitive_params(config_primitive, {}, vnfr_params))
1859 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1860 vnf_config_primitive, result, result_detail))
1861 if result == "FAILED":
1862 raise LcmException(result_detail)
1863 db_nsr_update["config-status"] = old_config_status
1864 scale_process = None
1865
1866 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1867 db_nslcmop_update["statusEnteredTime"] = time()
1868 db_nslcmop_update["detailed-status"] = "done"
1869 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1870 db_nsr_update["operational-status"] = old_operational_status
1871 db_nsr_update["config-status"] = old_config_status
1872 return
1873 except (ROclient.ROClientException, DbException, LcmException) as e:
1874 self.logger.error(logging_text + "Exit Exception {}".format(e))
1875 exc = e
1876 except asyncio.CancelledError:
1877 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1878 exc = "Operation was cancelled"
1879 except Exception as e:
1880 exc = traceback.format_exc()
1881 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1882 finally:
1883 if exc:
1884 if db_nslcmop:
1885 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1886 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1887 db_nslcmop_update["statusEnteredTime"] = time()
1888 if db_nsr:
1889 db_nsr_update["operational-status"] = old_operational_status
1890 db_nsr_update["config-status"] = old_config_status
1891 db_nsr_update["detailed-status"] = ""
1892 db_nsr_update["_admin.nslcmop"] = None
1893 if scale_process:
1894 if "VCA" in scale_process:
1895 db_nsr_update["config-status"] = "failed"
1896 if "RO" in scale_process:
1897 db_nsr_update["operational-status"] = "failed"
1898 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1899 exc)
1900 try:
1901 if db_nslcmop and db_nslcmop_update:
1902 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1903 if db_nsr:
1904 db_nsr_update["_admin.nslcmop"] = None
1905 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1906 except DbException as e:
1907 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1908 if nslcmop_operation_state:
1909 try:
1910 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1911 "operationState": nslcmop_operation_state},
1912 loop=self.loop)
1913 # if cooldown_time:
1914 # await asyncio.sleep(cooldown_time)
1915 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1916 except Exception as e:
1917 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1918 self.logger.debug(logging_text + "Exit")
1919 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")