fix bug having different vnfd at nsd
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 2 * 3600 # global timeout for deployment
73 timeout_charm_delete = 10 * 60
74 timeout_primitive = 10 * 60 # timeout for primitive execution
75
76 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 # logging
83 self.logger = logging.getLogger('lcm.ns')
84 self.loop = loop
85 self.lcm_tasks = lcm_tasks
86
87 super().__init__(db, msg, fs, self.logger)
88
89 self.ro_config = ro_config
90
91 self.n2vc = N2VC(
92 log=self.logger,
93 server=vca_config['host'],
94 port=vca_config['port'],
95 user=vca_config['user'],
96 secret=vca_config['secret'],
97 # TODO: This should point to the base folder where charms are stored,
98 # if there is a common one (like object storage). Otherwise, leave
99 # it unset and pass it via DeployCharms
100 # artifacts=vca_config[''],
101 artifacts=None,
102 )
103
104 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
105 """
106 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
107 :param vnfd: input vnfd
108 :param new_id: overrides vnf id if provided
109 :param additionalParams: Instantiation params for VNFs provided
110 :param nsrId: Id of the NSR
111 :return: copy of vnfd
112 """
113 try:
114 vnfd_RO = deepcopy(vnfd)
115 # remove unused by RO configuration, monitoring, scaling and internal keys
116 vnfd_RO.pop("_id", None)
117 vnfd_RO.pop("_admin", None)
118 vnfd_RO.pop("vnf-configuration", None)
119 vnfd_RO.pop("monitoring-param", None)
120 vnfd_RO.pop("scaling-group-descriptor", None)
121 if new_id:
122 vnfd_RO["id"] = new_id
123
124 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
125 for vdu in get_iterable(vnfd_RO, "vdu"):
126 cloud_init_file = None
127 if vdu.get("cloud-init-file"):
128 base_folder = vnfd["_admin"]["storage"]
129 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
130 vdu["cloud-init-file"])
131 with self.fs.file_open(cloud_init_file, "r") as ci_file:
132 cloud_init_content = ci_file.read()
133 vdu.pop("cloud-init-file", None)
134 elif vdu.get("cloud-init"):
135 cloud_init_content = vdu["cloud-init"]
136 else:
137 continue
138
139 env = Environment()
140 ast = env.parse(cloud_init_content)
141 mandatory_vars = meta.find_undeclared_variables(ast)
142 if mandatory_vars:
143 for var in mandatory_vars:
144 if not additionalParams or var not in additionalParams.keys():
145 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
146 "file, must be provided in the instantiation parameters inside the "
147 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
148 template = Template(cloud_init_content)
149 cloud_init_content = template.render(additionalParams or {})
150 vdu["cloud-init"] = cloud_init_content
151
152 return vnfd_RO
153 except FsException as e:
154 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
155 format(vnfd["id"], vdu["id"], cloud_init_file, e))
156 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
157 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
158 format(vnfd["id"], vdu["id"], e))
159
160 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
161 """
162 Callback both for charm status change and task completion
163 :param model_name: Charm model name
164 :param application_name: Charm application name
165 :param status: Can be
166 - blocked: The unit needs manual intervention
167 - maintenance: The unit is actively deploying/configuring
168 - waiting: The unit is waiting for another charm to be ready
169 - active: The unit is deployed, configured, and ready
170 - error: The charm has failed and needs attention.
171 - terminated: The charm has been destroyed
172 - removing,
173 - removed
174 :param message: detailed message error
175 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
176 nsr_id:
177 nslcmop_id:
178 lcmOperationType: currently "instantiate"
179 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
180 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
181 n2vc_event: event used to notify instantiation task that some change has been produced
182 :param task: None for charm status change, or task for completion task callback
183 :return:
184 """
185 try:
186 nsr_id = n2vc_info["nsr_id"]
187 deployed = n2vc_info["deployed"]
188 db_nsr_update = n2vc_info["db_update"]
189 nslcmop_id = n2vc_info["nslcmop_id"]
190 ns_operation = n2vc_info["lcmOperationType"]
191 n2vc_event = n2vc_info["n2vc_event"]
192 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
193 application_name)
194 for vca_index, vca_deployed in enumerate(deployed):
195 if not vca_deployed:
196 continue
197 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
198 break
199 else:
200 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
201 return
202 if task:
203 if task.cancelled():
204 self.logger.debug(logging_text + " task Cancelled")
205 vca_deployed['operational-status'] = "error"
206 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
207 vca_deployed['detailed-status'] = "Task Cancelled"
208 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
209
210 elif task.done():
211 exc = task.exception()
212 if exc:
213 self.logger.error(logging_text + " task Exception={}".format(exc))
214 vca_deployed['operational-status'] = "error"
215 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
216 vca_deployed['detailed-status'] = str(exc)
217 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
218 else:
219 self.logger.debug(logging_text + " task Done")
220 # task is Done, but callback is still ongoing. So ignore
221 return
222 elif status:
223 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
224 if vca_deployed['operational-status'] == status:
225 return # same status, ignore
226 vca_deployed['operational-status'] = status
227 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
228 vca_deployed['detailed-status'] = str(message)
229 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
230 else:
231 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
232 return
233 # wake up instantiate task
234 n2vc_event.set()
235 except Exception as e:
236 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
237
238 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
239 """
240 Creates a RO ns descriptor from OSM ns_instantiate params
241 :param ns_params: OSM instantiate params
242 :return: The RO ns descriptor
243 """
244 vim_2_RO = {}
245 # TODO feature 1417: Check that no instantiation is set over PDU
246 # check if PDU forces a concrete vim-network-id and add it
247 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
248
249 def vim_account_2_RO(vim_account):
250 if vim_account in vim_2_RO:
251 return vim_2_RO[vim_account]
252
253 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
254 if db_vim["_admin"]["operationalState"] != "ENABLED":
255 raise LcmException("VIM={} is not available. operationalState={}".format(
256 vim_account, db_vim["_admin"]["operationalState"]))
257 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
258 vim_2_RO[vim_account] = RO_vim_id
259 return RO_vim_id
260
261 def ip_profile_2_RO(ip_profile):
262 RO_ip_profile = deepcopy((ip_profile))
263 if "dns-server" in RO_ip_profile:
264 if isinstance(RO_ip_profile["dns-server"], list):
265 RO_ip_profile["dns-address"] = []
266 for ds in RO_ip_profile.pop("dns-server"):
267 RO_ip_profile["dns-address"].append(ds['address'])
268 else:
269 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
270 if RO_ip_profile.get("ip-version") == "ipv4":
271 RO_ip_profile["ip-version"] = "IPv4"
272 if RO_ip_profile.get("ip-version") == "ipv6":
273 RO_ip_profile["ip-version"] = "IPv6"
274 if "dhcp-params" in RO_ip_profile:
275 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
276 return RO_ip_profile
277
278 if not ns_params:
279 return None
280 RO_ns_params = {
281 # "name": ns_params["nsName"],
282 # "description": ns_params.get("nsDescription"),
283 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
284 # "scenario": ns_params["nsdId"],
285 }
286 if n2vc_key_list:
287 for vnfd_ref, vnfd in vnfd_dict.items():
288 vdu_needed_access = []
289 mgmt_cp = None
290 if vnfd.get("vnf-configuration"):
291 if vnfd.get("mgmt-interface"):
292 if vnfd["mgmt-interface"].get("vdu-id"):
293 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
294 elif vnfd["mgmt-interface"].get("cp"):
295 mgmt_cp = vnfd["mgmt-interface"]["cp"]
296
297 for vdu in vnfd.get("vdu", ()):
298 if vdu.get("vdu-configuration"):
299 vdu_needed_access.append(vdu["id"])
300 elif mgmt_cp:
301 for vdu_interface in vdu.get("interface"):
302 if vdu_interface.get("external-connection-point-ref") and \
303 vdu_interface["external-connection-point-ref"] == mgmt_cp:
304 vdu_needed_access.append(vdu["id"])
305 mgmt_cp = None
306 break
307
308 if vdu_needed_access:
309 for vnf_member in nsd.get("constituent-vnfd"):
310 if vnf_member["vnfd-id-ref"] != vnfd_ref:
311 continue
312 for vdu in vdu_needed_access:
313 populate_dict(RO_ns_params,
314 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
315 n2vc_key_list)
316
317 if ns_params.get("vduImage"):
318 RO_ns_params["vduImage"] = ns_params["vduImage"]
319
320 if ns_params.get("ssh_keys"):
321 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
322 for vnf_params in get_iterable(ns_params, "vnf"):
323 for constituent_vnfd in nsd["constituent-vnfd"]:
324 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
325 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
326 break
327 else:
328 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
329 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
330 if vnf_params.get("vimAccountId"):
331 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
332 vim_account_2_RO(vnf_params["vimAccountId"]))
333
334 for vdu_params in get_iterable(vnf_params, "vdu"):
335 # TODO feature 1417: check that this VDU exist and it is not a PDU
336 if vdu_params.get("volume"):
337 for volume_params in vdu_params["volume"]:
338 if volume_params.get("vim-volume-id"):
339 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
340 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
341 volume_params["vim-volume-id"])
342 if vdu_params.get("interface"):
343 for interface_params in vdu_params["interface"]:
344 if interface_params.get("ip-address"):
345 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
346 vdu_params["id"], "interfaces", interface_params["name"],
347 "ip_address"),
348 interface_params["ip-address"])
349 if interface_params.get("mac-address"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_params["id"], "interfaces", interface_params["name"],
352 "mac_address"),
353 interface_params["mac-address"])
354 if interface_params.get("floating-ip-required"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "floating-ip"),
358 interface_params["floating-ip-required"])
359
360 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
361 if internal_vld_params.get("vim-network-name"):
362 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
363 internal_vld_params["name"], "vim-network-name"),
364 internal_vld_params["vim-network-name"])
365 if internal_vld_params.get("vim-network-id"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
367 internal_vld_params["name"], "vim-network-id"),
368 internal_vld_params["vim-network-id"])
369 if internal_vld_params.get("ip-profile"):
370 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
371 internal_vld_params["name"], "ip-profile"),
372 ip_profile_2_RO(internal_vld_params["ip-profile"]))
373
374 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
375 # look for interface
376 iface_found = False
377 for vdu_descriptor in vnf_descriptor["vdu"]:
378 for vdu_interface in vdu_descriptor["interface"]:
379 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
380 if icp_params.get("ip-address"):
381 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
382 vdu_descriptor["id"], "interfaces",
383 vdu_interface["name"], "ip_address"),
384 icp_params["ip-address"])
385
386 if icp_params.get("mac-address"):
387 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
388 vdu_descriptor["id"], "interfaces",
389 vdu_interface["name"], "mac_address"),
390 icp_params["mac-address"])
391 iface_found = True
392 break
393 if iface_found:
394 break
395 else:
396 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
397 "internal-vld:id-ref={} is not present at vnfd:internal-"
398 "connection-point".format(vnf_params["member-vnf-index"],
399 icp_params["id-ref"]))
400
401 for vld_params in get_iterable(ns_params, "vld"):
402 if "ip-profile" in vld_params:
403 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
404 ip_profile_2_RO(vld_params["ip-profile"]))
405 if vld_params.get("vim-network-name"):
406 RO_vld_sites = []
407 if isinstance(vld_params["vim-network-name"], dict):
408 for vim_account, vim_net in vld_params["vim-network-name"].items():
409 RO_vld_sites.append({
410 "netmap-use": vim_net,
411 "datacenter": vim_account_2_RO(vim_account)
412 })
413 else: # isinstance str
414 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
415 if RO_vld_sites:
416 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
417 if vld_params.get("vim-network-id"):
418 RO_vld_sites = []
419 if isinstance(vld_params["vim-network-id"], dict):
420 for vim_account, vim_net in vld_params["vim-network-id"].items():
421 RO_vld_sites.append({
422 "netmap-use": vim_net,
423 "datacenter": vim_account_2_RO(vim_account)
424 })
425 else: # isinstance str
426 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
427 if RO_vld_sites:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
429 if "vnfd-connection-point-ref" in vld_params:
430 for cp_params in vld_params["vnfd-connection-point-ref"]:
431 # look for interface
432 for constituent_vnfd in nsd["constituent-vnfd"]:
433 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
434 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
435 break
436 else:
437 raise LcmException(
438 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
439 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
440 match_cp = False
441 for vdu_descriptor in vnf_descriptor["vdu"]:
442 for interface_descriptor in vdu_descriptor["interface"]:
443 if interface_descriptor.get("external-connection-point-ref") == \
444 cp_params["vnfd-connection-point-ref"]:
445 match_cp = True
446 break
447 if match_cp:
448 break
449 else:
450 raise LcmException(
451 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
452 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
453 cp_params["member-vnf-index-ref"],
454 cp_params["vnfd-connection-point-ref"],
455 vnf_descriptor["id"]))
456 if cp_params.get("ip-address"):
457 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
458 vdu_descriptor["id"], "interfaces",
459 interface_descriptor["name"], "ip_address"),
460 cp_params["ip-address"])
461 if cp_params.get("mac-address"):
462 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
463 vdu_descriptor["id"], "interfaces",
464 interface_descriptor["name"], "mac_address"),
465 cp_params["mac-address"])
466 return RO_ns_params
467
468 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
469 # make a copy to do not change
470 vdu_create = copy(vdu_create)
471 vdu_delete = copy(vdu_delete)
472
473 vdurs = db_vnfr.get("vdur")
474 if vdurs is None:
475 vdurs = []
476 vdu_index = len(vdurs)
477 while vdu_index:
478 vdu_index -= 1
479 vdur = vdurs[vdu_index]
480 if vdur.get("pdu-type"):
481 continue
482 vdu_id_ref = vdur["vdu-id-ref"]
483 if vdu_create and vdu_create.get(vdu_id_ref):
484 for index in range(0, vdu_create[vdu_id_ref]):
485 vdur = deepcopy(vdur)
486 vdur["_id"] = str(uuid4())
487 vdur["count-index"] += 1
488 vdurs.insert(vdu_index+1+index, vdur)
489 del vdu_create[vdu_id_ref]
490 if vdu_delete and vdu_delete.get(vdu_id_ref):
491 del vdurs[vdu_index]
492 vdu_delete[vdu_id_ref] -= 1
493 if not vdu_delete[vdu_id_ref]:
494 del vdu_delete[vdu_id_ref]
495 # check all operations are done
496 if vdu_create or vdu_delete:
497 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
498 vdu_create))
499 if vdu_delete:
500 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
501 vdu_delete))
502
503 vnfr_update = {"vdur": vdurs}
504 db_vnfr["vdur"] = vdurs
505 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
506
507 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
508 """
509 Updates database nsr with the RO info for the created vld
510 :param ns_update_nsr: dictionary to be filled with the updated info
511 :param db_nsr: content of db_nsr. This is also modified
512 :param nsr_desc_RO: nsr descriptor from RO
513 :return: Nothing, LcmException is raised on errors
514 """
515
516 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
517 for net_RO in get_iterable(nsr_desc_RO, "nets"):
518 if vld["id"] != net_RO.get("ns_net_osm_id"):
519 continue
520 vld["vim-id"] = net_RO.get("vim_net_id")
521 vld["name"] = net_RO.get("vim_name")
522 vld["status"] = net_RO.get("status")
523 vld["status-detailed"] = net_RO.get("error_msg")
524 ns_update_nsr["vld.{}".format(vld_index)] = vld
525 break
526 else:
527 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
528
529 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
530 """
531 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
532 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
533 :param nsr_desc_RO: nsr descriptor from RO
534 :return: Nothing, LcmException is raised on errors
535 """
536 for vnf_index, db_vnfr in db_vnfrs.items():
537 for vnf_RO in nsr_desc_RO["vnfs"]:
538 if vnf_RO["member_vnf_index"] != vnf_index:
539 continue
540 vnfr_update = {}
541 if vnf_RO.get("ip_address"):
542 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
543 elif not db_vnfr.get("ip-address"):
544 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
545
546 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
547 vdur_RO_count_index = 0
548 if vdur.get("pdu-type"):
549 continue
550 for vdur_RO in get_iterable(vnf_RO, "vms"):
551 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
552 continue
553 if vdur["count-index"] != vdur_RO_count_index:
554 vdur_RO_count_index += 1
555 continue
556 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
557 vdur["ip-address"] = vdur_RO.get("ip_address")
558 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
559 vdur["name"] = vdur_RO.get("vim_name")
560 vdur["status"] = vdur_RO.get("status")
561 vdur["status-detailed"] = vdur_RO.get("error_msg")
562 for ifacer in get_iterable(vdur, "interfaces"):
563 for interface_RO in get_iterable(vdur_RO, "interfaces"):
564 if ifacer["name"] == interface_RO.get("internal_name"):
565 ifacer["ip-address"] = interface_RO.get("ip_address")
566 ifacer["mac-address"] = interface_RO.get("mac_address")
567 break
568 else:
569 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
570 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
571 vnfr_update["vdur.{}".format(vdu_index)] = vdur
572 break
573 else:
574 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
575 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
576
577 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
578 for net_RO in get_iterable(nsr_desc_RO, "nets"):
579 if vld["id"] != net_RO.get("vnf_net_osm_id"):
580 continue
581 vld["vim-id"] = net_RO.get("vim_net_id")
582 vld["name"] = net_RO.get("vim_name")
583 vld["status"] = net_RO.get("status")
584 vld["status-detailed"] = net_RO.get("error_msg")
585 vnfr_update["vld.{}".format(vld_index)] = vld
586 break
587 else:
588 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
589 vnf_index, vld["id"]))
590
591 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
592 break
593
594 else:
595 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
596
597 async def instantiate(self, nsr_id, nslcmop_id):
598 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
599 self.logger.debug(logging_text + "Enter")
600 # get all needed from database
601 start_deploy = time()
602 db_nsr = None
603 db_nslcmop = None
604 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
605 db_nslcmop_update = {}
606 nslcmop_operation_state = None
607 db_vnfrs = {}
608 RO_descriptor_number = 0 # number of descriptors created at RO
609 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
610 n2vc_info = {}
611 exc = None
612 try:
613 step = "Getting nslcmop={} from db".format(nslcmop_id)
614 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
615 step = "Getting nsr={} from db".format(nsr_id)
616 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
617 ns_params = db_nslcmop.get("operationParams")
618 nsd = db_nsr["nsd"]
619 nsr_name = db_nsr["name"] # TODO short-name??
620
621 # look if previous tasks in process
622 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
623 if task_dependency:
624 step = db_nslcmop_update["detailed-status"] = \
625 "Waiting for related tasks to be completed: {}".format(task_name)
626 self.logger.debug(logging_text + step)
627 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
628 _, pending = await asyncio.wait(task_dependency, timeout=3600)
629 if pending:
630 raise LcmException("Timeout waiting related tasks to be completed")
631
632 step = "Getting vnfrs from db"
633 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
634 db_vnfds_ref = {}
635 db_vnfds = {}
636 for vnfr in db_vnfrs_list:
637 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
638 vnfd_id = vnfr["vnfd-id"]
639 vnfd_ref = vnfr["vnfd-ref"]
640 if vnfd_id not in db_vnfds:
641 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
642 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
643 db_vnfds_ref[vnfd_ref] = vnfd
644 db_vnfds[vnfd_id] = vnfd
645
646 # Get or generates the _admin.deployed,VCA list
647 vca_deployed_list = None
648 if db_nsr["_admin"].get("deployed"):
649 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
650 if vca_deployed_list is None:
651 vca_deployed_list = []
652 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
653 elif isinstance(vca_deployed_list, dict):
654 # maintain backward compatibility. Change a dict to list at database
655 vca_deployed_list = list(vca_deployed_list.values())
656 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
657
658 db_nsr_update["detailed-status"] = "creating"
659 db_nsr_update["operational-status"] = "init"
660 if not db_nsr["_admin"].get("deployed") or not db_nsr["_admin"]["deployed"].get("RO") or \
661 not db_nsr["_admin"]["deployed"]["RO"].get("vnfd"):
662 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
663 db_nsr_update["_admin.deployed.RO.vnfd"] = []
664
665 RO = ROclient.ROClient(self.loop, **self.ro_config)
666
667 # set state to INSTANTIATED. When instantiated NBI will not delete directly
668 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
669 self.update_db_2("nsrs", nsr_id, db_nsr_update)
670
671 # get vnfds, instantiate at RO
672 for c_vnf in nsd.get("constituent-vnfd", ()):
673 member_vnf_index = c_vnf["member-vnf-index"]
674 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
675 vnfd_ref = vnfd["id"]
676 step = db_nsr_update["detailed-status"] = "Creating vnfd='{}' member-vnf-index='{}' at RO".format(
677 vnfd_ref, member_vnf_index)
678 # self.logger.debug(logging_text + step)
679 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
680 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
681 RO_descriptor_number += 1
682
683 # look position at deployed.RO.vnfd if not present it will be appended at the end
684 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
685 if vnf_deployed["member-vnf-index"] == member_vnf_index:
686 break
687 else:
688 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
689 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
690
691 # look if present
692 RO_update = {"member-vnf-index": member_vnf_index}
693 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
694 if vnfd_list:
695 RO_update["id"] = vnfd_list[0]["uuid"]
696 self.logger.debug(logging_text + "vnfd='{}' member-vnf-index='{}' exists at RO. Using RO_id={}".
697 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
698 else:
699 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
700 get("additionalParamsForVnf"), nsr_id)
701 desc = await RO.create("vnfd", descriptor=vnfd_RO)
702 RO_update["id"] = desc["uuid"]
703 self.logger.debug(logging_text + "vnfd='{}' member-vnf-index='{}' created at RO. RO_id={}".format(
704 vnfd_ref, member_vnf_index, desc["uuid"]))
705 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
706 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
707 self.update_db_2("nsrs", nsr_id, db_nsr_update)
708
709 # create nsd at RO
710 nsd_ref = nsd["id"]
711 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
712 # self.logger.debug(logging_text + step)
713
714 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
715 RO_descriptor_number += 1
716 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
717 if nsd_list:
718 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
719 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
720 nsd_ref, RO_nsd_uuid))
721 else:
722 nsd_RO = deepcopy(nsd)
723 nsd_RO["id"] = RO_osm_nsd_id
724 nsd_RO.pop("_id", None)
725 nsd_RO.pop("_admin", None)
726 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
727 member_vnf_index = c_vnf["member-vnf-index"]
728 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
729 for c_vld in nsd_RO.get("vld", ()):
730 for cp in c_vld.get("vnfd-connection-point-ref", ()):
731 member_vnf_index = cp["member-vnf-index-ref"]
732 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
733
734 desc = await RO.create("nsd", descriptor=nsd_RO)
735 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
736 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
737 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
738 self.update_db_2("nsrs", nsr_id, db_nsr_update)
739
740 # Crate ns at RO
741 # if present use it unless in error status
742 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
743 if RO_nsr_id:
744 try:
745 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
746 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
747 desc = await RO.show("ns", RO_nsr_id)
748 except ROclient.ROClientException as e:
749 if e.http_code != HTTPStatus.NOT_FOUND:
750 raise
751 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
752 if RO_nsr_id:
753 ns_status, ns_status_info = RO.check_ns_status(desc)
754 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
755 if ns_status == "ERROR":
756 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
757 self.logger.debug(logging_text + step)
758 await RO.delete("ns", RO_nsr_id)
759 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
760 if not RO_nsr_id:
761 step = db_nsr_update["detailed-status"] = "Checking dependencies"
762 # self.logger.debug(logging_text + step)
763
764 # check if VIM is creating and wait look if previous tasks in process
765 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
766 if task_dependency:
767 step = "Waiting for related tasks to be completed: {}".format(task_name)
768 self.logger.debug(logging_text + step)
769 await asyncio.wait(task_dependency, timeout=3600)
770 if ns_params.get("vnf"):
771 for vnf in ns_params["vnf"]:
772 if "vimAccountId" in vnf:
773 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
774 vnf["vimAccountId"])
775 if task_dependency:
776 step = "Waiting for related tasks to be completed: {}".format(task_name)
777 self.logger.debug(logging_text + step)
778 await asyncio.wait(task_dependency, timeout=3600)
779
780 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
781
782 # feature 1429. Add n2vc public key to needed VMs
783 n2vc_key = await self.n2vc.GetPublicKey()
784 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
785
786 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
787 desc = await RO.create("ns", descriptor=RO_ns_params,
788 name=db_nsr["name"],
789 scenario=RO_nsd_uuid)
790 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
791 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
792 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
793 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
794 self.update_db_2("nsrs", nsr_id, db_nsr_update)
795
796 # wait until NS is ready
797 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
798 detailed_status_old = None
799 self.logger.debug(logging_text + step)
800
801 while time() <= start_deploy + self.total_deploy_timeout:
802 desc = await RO.show("ns", RO_nsr_id)
803 ns_status, ns_status_info = RO.check_ns_status(desc)
804 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
805 if ns_status == "ERROR":
806 raise ROclient.ROClientException(ns_status_info)
807 elif ns_status == "BUILD":
808 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
809 elif ns_status == "ACTIVE":
810 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
811 try:
812 self.ns_update_vnfr(db_vnfrs, desc)
813 break
814 except LcmExceptionNoMgmtIP:
815 pass
816 else:
817 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
818 if detailed_status != detailed_status_old:
819 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
820 self.update_db_2("nsrs", nsr_id, db_nsr_update)
821 await asyncio.sleep(5, loop=self.loop)
822 else: # total_deploy_timeout
823 raise ROclient.ROClientException("Timeout waiting ns to be ready")
824
825 step = "Updating NSR"
826 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
827
828 db_nsr["detailed-status"] = "Configuring vnfr"
829 self.update_db_2("nsrs", nsr_id, db_nsr_update)
830
831 # The parameters we'll need to deploy a charm
832 number_to_configure = 0
833
834 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info):
835 """An inner function to deploy the charm from either vnf or vdu
836 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
837 """
838 if not charm_params["rw_mgmt_ip"]:
839 raise LcmException("vnfd/vdu has not management ip address to configure it")
840 # Login to the VCA.
841 # if number_to_configure == 0:
842 # self.logger.debug("Logging into N2VC...")
843 # task = asyncio.ensure_future(self.n2vc.login())
844 # yield from asyncio.wait_for(task, 30.0)
845 # self.logger.debug("Logged into N2VC!")
846
847 # # await self.n2vc.login()
848
849 # Note: The charm needs to exist on disk at the location
850 # specified by charm_path.
851 base_folder = vnfd["_admin"]["storage"]
852 storage_params = self.fs.get_params()
853 charm_path = "{}{}/{}/charms/{}".format(
854 storage_params["path"],
855 base_folder["folder"],
856 base_folder["pkg-dir"],
857 proxy_charm
858 )
859
860 # ns_name will be ignored in the current version of N2VC
861 # but will be implemented for the next point release.
862 model_name = "default" # TODO bug 585 nsr_id
863 if vdu_id:
864 vdu_id_text = vdu_id + "-"
865 else:
866 vdu_id_text = "-"
867 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
868
869 vca_index = len(vca_deployed_list)
870 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
871 # 26*26 charm in the same NS
872 application_name = application_name[0:48]
873 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
874 vca_deployed_ = {
875 "member-vnf-index": vnf_index,
876 "vdu_id": vdu_id,
877 "model": model_name,
878 "application": application_name,
879 "operational-status": "init",
880 "detailed-status": "",
881 "vnfd_id": vnfd_id,
882 "vdu_name": vdu_name,
883 "vdu_count_index": vdu_count_index,
884 }
885 vca_deployed_list.append(vca_deployed_)
886 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
887 self.update_db_2("nsrs", nsr_id, db_nsr_update)
888
889 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
890 proxy_charm))
891 if not n2vc_info:
892 n2vc_info["nsr_id"] = nsr_id
893 n2vc_info["nslcmop_id"] = nslcmop_id
894 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
895 n2vc_info["lcmOperationType"] = "instantiate"
896 n2vc_info["deployed"] = vca_deployed_list
897 n2vc_info["db_update"] = db_nsr_update
898 task = asyncio.ensure_future(
899 self.n2vc.DeployCharms(
900 model_name, # The network service name
901 application_name, # The application name
902 vnfd, # The vnf descriptor
903 charm_path, # Path to charm
904 charm_params, # Runtime params, like mgmt ip
905 {}, # for native charms only
906 self.n2vc_callback, # Callback for status changes
907 n2vc_info, # Callback parameter
908 None, # Callback parameter (task)
909 )
910 )
911 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
912 n2vc_info))
913 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
914
915 step = "Looking for needed vnfd to configure"
916 self.logger.debug(logging_text + step)
917
918 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
919 vnfd_id = c_vnf["vnfd-id-ref"]
920 vnf_index = str(c_vnf["member-vnf-index"])
921 vnfd = db_vnfds_ref[vnfd_id]
922
923 # Get additional parameters
924 vnfr_params = {}
925 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
926 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
927 for k, v in vnfr_params.items():
928 if isinstance(v, str) and v.startswith("!!yaml "):
929 vnfr_params[k] = yaml.safe_load(v[7:])
930
931 # Check if this VNF has a charm configuration
932 vnf_config = vnfd.get("vnf-configuration")
933 if vnf_config and vnf_config.get("juju"):
934 proxy_charm = vnf_config["juju"]["charm"]
935
936 if proxy_charm:
937 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
938 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
939 charm_params = {
940 "user_values": vnfr_params,
941 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
942 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {}
943 }
944
945 # Login to the VCA. If there are multiple calls to login(),
946 # subsequent calls will be a nop and return immediately.
947 await self.n2vc.login()
948 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
949 number_to_configure += 1
950
951 # Deploy charms for each VDU that supports one.
952 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
953 vdu_config = vdu.get('vdu-configuration')
954 proxy_charm = None
955
956 if vdu_config and vdu_config.get("juju"):
957 proxy_charm = vdu_config["juju"]["charm"]
958
959 if proxy_charm:
960 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
961 await self.n2vc.login()
962 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
963 # TODO for the moment only first vdu_id contains a charm deployed
964 if vdur["vdu-id-ref"] != vdu["id"]:
965 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
966 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
967 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
968 charm_params = {
969 "user_values": vnfr_params,
970 "rw_mgmt_ip": vdur["ip-address"],
971 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
972 }
973 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
974 charm_params, n2vc_info)
975 number_to_configure += 1
976
977 db_nsr_update["operational-status"] = "running"
978 configuration_failed = False
979 if number_to_configure:
980 old_status = "configuring: init: {}".format(number_to_configure)
981 db_nsr_update["config-status"] = old_status
982 db_nsr_update["detailed-status"] = old_status
983 db_nslcmop_update["detailed-status"] = old_status
984
985 # wait until all are configured.
986 while time() <= start_deploy + self.total_deploy_timeout:
987 if db_nsr_update:
988 self.update_db_2("nsrs", nsr_id, db_nsr_update)
989 if db_nslcmop_update:
990 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
991 # TODO add a fake task that set n2vc_event after some time
992 await n2vc_info["n2vc_event"].wait()
993 n2vc_info["n2vc_event"].clear()
994 all_active = True
995 status_map = {}
996 n2vc_error_text = [] # contain text error list. If empty no one is in error status
997 now = time()
998 for vca_deployed in vca_deployed_list:
999 vca_status = vca_deployed["operational-status"]
1000 if vca_status not in status_map:
1001 # Initialize it
1002 status_map[vca_status] = 0
1003 status_map[vca_status] += 1
1004
1005 if vca_status == "active":
1006 vca_deployed.pop("time_first_error", None)
1007 vca_deployed.pop("status_first_error", None)
1008 continue
1009
1010 all_active = False
1011 if vca_status in ("error", "blocked"):
1012 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
1013 # if not first time in this status error
1014 if not vca_deployed.get("time_first_error"):
1015 vca_deployed["time_first_error"] = now
1016 continue
1017 if vca_deployed.get("time_first_error") and \
1018 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1019 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1020 .format(vca_deployed["member-vnf-index"],
1021 vca_deployed["vdu_id"], vca_status,
1022 vca_deployed["detailed-status-error"]))
1023
1024 if all_active:
1025 break
1026 elif n2vc_error_text:
1027 db_nsr_update["config-status"] = "failed"
1028 error_text = "fail configuring " + ";".join(n2vc_error_text)
1029 db_nsr_update["detailed-status"] = error_text
1030 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1031 db_nslcmop_update["detailed-status"] = error_text
1032 db_nslcmop_update["statusEnteredTime"] = time()
1033 configuration_failed = True
1034 break
1035 else:
1036 cs = "configuring: "
1037 separator = ""
1038 for status, num in status_map.items():
1039 cs += separator + "{}: {}".format(status, num)
1040 separator = ", "
1041 if old_status != cs:
1042 db_nsr_update["config-status"] = cs
1043 db_nsr_update["detailed-status"] = cs
1044 db_nslcmop_update["detailed-status"] = cs
1045 old_status = cs
1046 else: # total_deploy_timeout
1047 raise LcmException("Timeout waiting ns to be configured")
1048
1049 if not configuration_failed:
1050 # all is done
1051 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1052 db_nslcmop_update["statusEnteredTime"] = time()
1053 db_nslcmop_update["detailed-status"] = "done"
1054 db_nsr_update["config-status"] = "configured"
1055 db_nsr_update["detailed-status"] = "done"
1056
1057 return
1058
1059 except (ROclient.ROClientException, DbException, LcmException) as e:
1060 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1061 exc = e
1062 except asyncio.CancelledError:
1063 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1064 exc = "Operation was cancelled"
1065 except Exception as e:
1066 exc = traceback.format_exc()
1067 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1068 exc_info=True)
1069 finally:
1070 if exc:
1071 if db_nsr:
1072 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1073 db_nsr_update["operational-status"] = "failed"
1074 if db_nslcmop:
1075 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1076 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1077 db_nslcmop_update["statusEnteredTime"] = time()
1078 try:
1079 if db_nsr:
1080 db_nsr_update["_admin.nslcmop"] = None
1081 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1082 if db_nslcmop_update:
1083 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1084 except DbException as e:
1085 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1086 if nslcmop_operation_state:
1087 try:
1088 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1089 "operationState": nslcmop_operation_state},
1090 loop=self.loop)
1091 except Exception as e:
1092 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1093
1094 self.logger.debug(logging_text + "Exit")
1095 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1096
1097 async def _destroy_charm(self, model, application):
1098 """
1099 Order N2VC destroy a charm
1100 :param model:
1101 :param application:
1102 :return: True if charm does not exist. False if it exist
1103 """
1104 if not await self.n2vc.HasApplication(model, application):
1105 return True # Already removed
1106 await self.n2vc.RemoveCharms(model, application)
1107 return False
1108
1109 async def _wait_charm_destroyed(self, model, application, timeout):
1110 """
1111 Wait until charm does not exist
1112 :param model:
1113 :param application:
1114 :param timeout:
1115 :return: True if not exist, False if timeout
1116 """
1117 while True:
1118 if not await self.n2vc.HasApplication(model, application):
1119 return True
1120 if timeout < 0:
1121 return False
1122 await asyncio.sleep(10)
1123 timeout -= 10
1124
1125 async def terminate(self, nsr_id, nslcmop_id):
1126 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1127 self.logger.debug(logging_text + "Enter")
1128 db_nsr = None
1129 db_nslcmop = None
1130 exc = None
1131 failed_detail = [] # annotates all failed error messages
1132 vca_time_destroy = None # time of where destroy charm order
1133 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1134 db_nslcmop_update = {}
1135 nslcmop_operation_state = None
1136 autoremove = False # autoremove after terminated
1137 try:
1138 step = "Getting nslcmop={} from db".format(nslcmop_id)
1139 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1140 step = "Getting nsr={} from db".format(nsr_id)
1141 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1142 # nsd = db_nsr["nsd"]
1143 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1144 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1145 return
1146 # #TODO check if VIM is creating and wait
1147 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1148
1149 db_nsr_update["operational-status"] = "terminating"
1150 db_nsr_update["config-status"] = "terminating"
1151
1152 if nsr_deployed and nsr_deployed.get("VCA"):
1153 try:
1154 step = "Scheduling configuration charms removing"
1155 db_nsr_update["detailed-status"] = "Deleting charms"
1156 self.logger.debug(logging_text + step)
1157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1158 # for backward compatibility
1159 if isinstance(nsr_deployed["VCA"], dict):
1160 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1161 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1162 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1163
1164 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1165 if vca_deployed:
1166 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1167 vca_deployed.clear()
1168 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1169 else:
1170 vca_time_destroy = time()
1171 except Exception as e:
1172 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1173
1174 # remove from RO
1175 RO_fail = False
1176 RO = ROclient.ROClient(self.loop, **self.ro_config)
1177
1178 # Delete ns
1179 RO_nsr_id = RO_delete_action = None
1180 if nsr_deployed and nsr_deployed.get("RO"):
1181 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1182 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1183 try:
1184 if RO_nsr_id:
1185 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1186 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1187 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1188 self.logger.debug(logging_text + step)
1189 desc = await RO.delete("ns", RO_nsr_id)
1190 RO_delete_action = desc["action_id"]
1191 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1192 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1193 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1194 if RO_delete_action:
1195 # wait until NS is deleted from VIM
1196 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1197 format(RO_nsr_id, RO_delete_action)
1198 detailed_status_old = None
1199 self.logger.debug(logging_text + step)
1200
1201 delete_timeout = 20 * 60 # 20 minutes
1202 while delete_timeout > 0:
1203 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1204 extra_item_id=RO_delete_action)
1205 ns_status, ns_status_info = RO.check_action_status(desc)
1206 if ns_status == "ERROR":
1207 raise ROclient.ROClientException(ns_status_info)
1208 elif ns_status == "BUILD":
1209 detailed_status = step + "; {}".format(ns_status_info)
1210 elif ns_status == "ACTIVE":
1211 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1212 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1213 break
1214 else:
1215 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1216 if detailed_status != detailed_status_old:
1217 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1218 db_nsr_update["detailed-status"] = detailed_status
1219 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1220 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1221 await asyncio.sleep(5, loop=self.loop)
1222 delete_timeout -= 5
1223 else: # delete_timeout <= 0:
1224 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1225
1226 except ROclient.ROClientException as e:
1227 if e.http_code == 404: # not found
1228 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1229 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1230 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1231 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1232 elif e.http_code == 409: # conflict
1233 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1234 self.logger.debug(logging_text + failed_detail[-1])
1235 RO_fail = True
1236 else:
1237 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1238 self.logger.error(logging_text + failed_detail[-1])
1239 RO_fail = True
1240
1241 # Delete nsd
1242 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1243 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1244 try:
1245 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1246 "Deleting nsd at RO"
1247 await RO.delete("nsd", RO_nsd_id)
1248 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1249 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1250 except ROclient.ROClientException as e:
1251 if e.http_code == 404: # not found
1252 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1253 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1254 elif e.http_code == 409: # conflict
1255 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1256 self.logger.debug(logging_text + failed_detail[-1])
1257 RO_fail = True
1258 else:
1259 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1260 self.logger.error(logging_text + failed_detail[-1])
1261 RO_fail = True
1262
1263 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
1264 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
1265 if not vnf_deployed or not vnf_deployed["id"]:
1266 continue
1267 try:
1268 RO_vnfd_id = vnf_deployed["id"]
1269 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1270 "Deleting member-vnf-index={} RO_vnfd_id={} from RO".format(
1271 vnf_deployed["member-vnf-index"], RO_vnfd_id)
1272 await RO.delete("vnfd", RO_vnfd_id)
1273 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1274 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1275 except ROclient.ROClientException as e:
1276 if e.http_code == 404: # not found
1277 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1278 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1279 elif e.http_code == 409: # conflict
1280 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1281 self.logger.debug(logging_text + failed_detail[-1])
1282 else:
1283 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1284 self.logger.error(logging_text + failed_detail[-1])
1285
1286 # wait until charm deleted
1287 if vca_time_destroy:
1288 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1289 "Waiting for deletion of configuration charms"
1290 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1291 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1292 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1293 if not vca_deployed:
1294 continue
1295 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1296 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1297 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1298 timeout):
1299 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1300 vca_deployed["application"]))
1301 else:
1302 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1303
1304 if failed_detail:
1305 self.logger.error(logging_text + " ;".join(failed_detail))
1306 db_nsr_update["operational-status"] = "failed"
1307 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1308 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1309 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1310 db_nslcmop_update["statusEnteredTime"] = time()
1311 else:
1312 db_nsr_update["operational-status"] = "terminated"
1313 db_nsr_update["detailed-status"] = "Done"
1314 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1315 db_nslcmop_update["detailed-status"] = "Done"
1316 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1317 db_nslcmop_update["statusEnteredTime"] = time()
1318 if db_nslcmop["operationParams"].get("autoremove"):
1319 autoremove = True
1320
1321 except (ROclient.ROClientException, DbException) as e:
1322 self.logger.error(logging_text + "Exit Exception {}".format(e))
1323 exc = e
1324 except asyncio.CancelledError:
1325 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1326 exc = "Operation was cancelled"
1327 except Exception as e:
1328 exc = traceback.format_exc()
1329 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1330 finally:
1331 if exc and db_nslcmop:
1332 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1333 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1334 db_nslcmop_update["statusEnteredTime"] = time()
1335 try:
1336 if db_nslcmop and db_nslcmop_update:
1337 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1338 if db_nsr:
1339 db_nsr_update["_admin.nslcmop"] = None
1340 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1341 except DbException as e:
1342 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1343 if nslcmop_operation_state:
1344 try:
1345 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1346 "operationState": nslcmop_operation_state,
1347 "autoremove": autoremove},
1348 loop=self.loop)
1349 except Exception as e:
1350 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1351 self.logger.debug(logging_text + "Exit")
1352 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1353
1354 @staticmethod
1355 def _map_primitive_params(primitive_desc, params, instantiation_params):
1356 """
1357 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
1358 The default-value is used. If it is between < > it look for a value at instantiation_params
1359 :param primitive_desc: portion of VNFD/NSD that describes primitive
1360 :param params: Params provided by user
1361 :param instantiation_params: Instantiation params provided by user
1362 :return: a dictionary with the calculated params
1363 """
1364 calculated_params = {}
1365 for parameter in primitive_desc.get("parameter", ()):
1366 param_name = parameter["name"]
1367 if param_name in params:
1368 calculated_params[param_name] = params[param_name]
1369 elif "default-value" in parameter:
1370 calculated_params[param_name] = parameter["default-value"]
1371 if isinstance(parameter["default-value"], str) and parameter["default-value"].startswith("<") and \
1372 parameter["default-value"].endswith(">"):
1373 if parameter["default-value"][1:-1] in instantiation_params:
1374 calculated_params[param_name] = instantiation_params[parameter["default-value"][1:-1]]
1375 else:
1376 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1377 format(parameter["default-value"], primitive_desc["name"]))
1378 else:
1379 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1380 format(param_name, primitive_desc["name"]))
1381
1382 if isinstance(calculated_params[param_name], (dict, list, tuple)):
1383 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
1384 width=256)
1385 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
1386 calculated_params[param_name] = calculated_params[param_name][7:]
1387 return calculated_params
1388
1389 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1390 primitive, primitive_params):
1391 start_primitive_time = time()
1392 try:
1393 for vca_deployed in db_deployed["VCA"]:
1394 if not vca_deployed:
1395 continue
1396 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1397 continue
1398 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1399 continue
1400 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1401 continue
1402 break
1403 else:
1404 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
1405 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1406 model_name = vca_deployed.get("model")
1407 application_name = vca_deployed.get("application")
1408 if not model_name or not application_name:
1409 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
1410 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
1411 vdu_count_index))
1412 if vca_deployed["operational-status"] != "active":
1413 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1414 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1415 callback = None # self.n2vc_callback
1416 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1417 await self.n2vc.login()
1418 primitive_id = await self.n2vc.ExecutePrimitive(
1419 model_name,
1420 application_name,
1421 primitive,
1422 callback,
1423 *callback_args,
1424 **primitive_params
1425 )
1426 while time() - start_primitive_time < self.timeout_primitive:
1427 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
1428 if primitive_result_ == "running":
1429 pass
1430 elif primitive_result_ in ("completed", "failed"):
1431 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
1432 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
1433 break
1434 else:
1435 detailed_result = "Invalid N2VC.GetPrimitiveStatus = {} obtained".format(primitive_result_)
1436 primitive_result = "FAILED"
1437 break
1438 await asyncio.sleep(5)
1439 else:
1440 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
1441 return primitive_result, detailed_result
1442 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
1443 return "FAILED", str(e)
1444
1445 async def action(self, nsr_id, nslcmop_id):
1446 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1447 self.logger.debug(logging_text + "Enter")
1448 # get all needed from database
1449 db_nsr = None
1450 db_nslcmop = None
1451 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1452 db_nslcmop_update = {}
1453 nslcmop_operation_state = None
1454 exc = None
1455 try:
1456 step = "Getting information from database"
1457 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1458 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1459
1460 nsr_deployed = db_nsr["_admin"].get("deployed")
1461 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1462 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1463 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1464 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1465
1466 step = "Getting vnfr from database"
1467 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1468 step = "Getting vnfd from database"
1469 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1470
1471 # look if previous tasks in process
1472 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1473 if task_dependency:
1474 step = db_nslcmop_update["detailed-status"] = \
1475 "Waiting for related tasks to be completed: {}".format(task_name)
1476 self.logger.debug(logging_text + step)
1477 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1478 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1479 if pending:
1480 raise LcmException("Timeout waiting related tasks to be completed")
1481
1482 # for backward compatibility
1483 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1484 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1485 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1486 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1487
1488 primitive = db_nslcmop["operationParams"]["primitive"]
1489 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1490
1491 # look for primitive
1492 config_primitive_desc = None
1493 if vdu_id:
1494 for vdu in get_iterable(db_vnfd, "vdu"):
1495 if vdu_id == vdu["id"]:
1496 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
1497 if config_primitive["name"] == primitive:
1498 config_primitive_desc = config_primitive
1499 break
1500 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1501 if config_primitive["name"] == primitive:
1502 config_primitive_desc = config_primitive
1503 break
1504 if not config_primitive_desc:
1505 raise LcmException("Primitive {} not found at vnf-configuration:config-primitive or vdu:"
1506 "vdu-configuration:config-primitive".format(primitive))
1507
1508 vnfr_params = {}
1509 if db_vnfr.get("additionalParamsForVnf"):
1510 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1511
1512 # TODO check if ns is in a proper status
1513 result, result_detail = await self._ns_execute_primitive(
1514 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1515 self._map_primitive_params(config_primitive_desc, primitive_params, vnfr_params))
1516 db_nslcmop_update["detailed-status"] = result_detail
1517 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1518 db_nslcmop_update["statusEnteredTime"] = time()
1519 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1520 return # database update is called inside finally
1521
1522 except (DbException, LcmException) as e:
1523 self.logger.error(logging_text + "Exit Exception {}".format(e))
1524 exc = e
1525 except asyncio.CancelledError:
1526 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1527 exc = "Operation was cancelled"
1528 except Exception as e:
1529 exc = traceback.format_exc()
1530 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1531 finally:
1532 if exc and db_nslcmop:
1533 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1534 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1535 db_nslcmop_update["statusEnteredTime"] = time()
1536 try:
1537 if db_nslcmop_update:
1538 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1539 if db_nsr:
1540 db_nsr_update["_admin.nslcmop"] = None
1541 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1542 except DbException as e:
1543 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1544 self.logger.debug(logging_text + "Exit")
1545 if nslcmop_operation_state:
1546 try:
1547 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1548 "operationState": nslcmop_operation_state},
1549 loop=self.loop)
1550 except Exception as e:
1551 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1552 self.logger.debug(logging_text + "Exit")
1553 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1554
1555 async def scale(self, nsr_id, nslcmop_id):
1556 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1557 self.logger.debug(logging_text + "Enter")
1558 # get all needed from database
1559 db_nsr = None
1560 db_nslcmop = None
1561 db_nslcmop_update = {}
1562 nslcmop_operation_state = None
1563 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1564 exc = None
1565 # in case of error, indicates what part of scale was failed to put nsr at error status
1566 scale_process = None
1567 old_operational_status = ""
1568 old_config_status = ""
1569 vnfr_scaled = False
1570 try:
1571 step = "Getting nslcmop from database"
1572 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1573 step = "Getting nsr from database"
1574 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1575
1576 old_operational_status = db_nsr["operational-status"]
1577 old_config_status = db_nsr["config-status"]
1578
1579 # look if previous tasks in process
1580 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1581 if task_dependency:
1582 step = db_nslcmop_update["detailed-status"] = \
1583 "Waiting for related tasks to be completed: {}".format(task_name)
1584 self.logger.debug(logging_text + step)
1585 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1586 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1587 if pending:
1588 raise LcmException("Timeout waiting related tasks to be completed")
1589
1590 step = "Parsing scaling parameters"
1591 db_nsr_update["operational-status"] = "scaling"
1592 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1593 nsr_deployed = db_nsr["_admin"].get("deployed")
1594 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1595 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1596 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1597 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1598 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1599
1600 # for backward compatibility
1601 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1602 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1603 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1604 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1605
1606 step = "Getting vnfr from database"
1607 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1608 step = "Getting vnfd from database"
1609 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1610 step = "Getting scaling-group-descriptor"
1611 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1612 if scaling_descriptor["name"] == scaling_group:
1613 break
1614 else:
1615 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1616 "at vnfd:scaling-group-descriptor".format(scaling_group))
1617 # cooldown_time = 0
1618 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1619 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1620 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1621 # break
1622
1623 # TODO check if ns is in a proper status
1624 step = "Sending scale order to RO"
1625 nb_scale_op = 0
1626 if not db_nsr["_admin"].get("scaling-group"):
1627 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1628 admin_scale_index = 0
1629 else:
1630 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1631 if admin_scale_info["name"] == scaling_group:
1632 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1633 break
1634 else: # not found, set index one plus last element and add new entry with the name
1635 admin_scale_index += 1
1636 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1637 RO_scaling_info = []
1638 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1639 if scaling_type == "SCALE_OUT":
1640 # count if max-instance-count is reached
1641 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1642 max_instance_count = int(scaling_descriptor["max-instance-count"])
1643 if nb_scale_op >= max_instance_count:
1644 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1645 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1646 nb_scale_op = nb_scale_op + 1
1647 vdu_scaling_info["scaling_direction"] = "OUT"
1648 vdu_scaling_info["vdu-create"] = {}
1649 for vdu_scale_info in scaling_descriptor["vdu"]:
1650 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1651 "type": "create", "count": vdu_scale_info.get("count", 1)})
1652 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1653 elif scaling_type == "SCALE_IN":
1654 # count if min-instance-count is reached
1655 min_instance_count = 0
1656 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1657 min_instance_count = int(scaling_descriptor["min-instance-count"])
1658 if nb_scale_op <= min_instance_count:
1659 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1660 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1661 nb_scale_op = nb_scale_op - 1
1662 vdu_scaling_info["scaling_direction"] = "IN"
1663 vdu_scaling_info["vdu-delete"] = {}
1664 for vdu_scale_info in scaling_descriptor["vdu"]:
1665 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1666 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1667 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1668
1669 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1670 vdu_create = vdu_scaling_info.get("vdu-create")
1671 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1672 if vdu_scaling_info["scaling_direction"] == "IN":
1673 for vdur in reversed(db_vnfr["vdur"]):
1674 if vdu_delete.get(vdur["vdu-id-ref"]):
1675 vdu_delete[vdur["vdu-id-ref"]] -= 1
1676 vdu_scaling_info["vdu"].append({
1677 "name": vdur["name"],
1678 "vdu_id": vdur["vdu-id-ref"],
1679 "interface": []
1680 })
1681 for interface in vdur["interfaces"]:
1682 vdu_scaling_info["vdu"][-1]["interface"].append({
1683 "name": interface["name"],
1684 "ip_address": interface["ip-address"],
1685 "mac_address": interface.get("mac-address"),
1686 })
1687 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1688
1689 # execute primitive service PRE-SCALING
1690 step = "Executing pre-scale vnf-config-primitive"
1691 if scaling_descriptor.get("scaling-config-action"):
1692 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1693 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1694 and scaling_type == "SCALE_IN":
1695 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1696 step = db_nslcmop_update["detailed-status"] = \
1697 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1698
1699 # look for primitive
1700 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1701 if config_primitive["name"] == vnf_config_primitive:
1702 break
1703 else:
1704 raise LcmException(
1705 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1706 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
1707 "primitive".format(scaling_group, config_primitive))
1708
1709 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1710 if db_vnfr.get("additionalParamsForVnf"):
1711 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1712
1713 scale_process = "VCA"
1714 db_nsr_update["config-status"] = "configuring pre-scaling"
1715 result, result_detail = await self._ns_execute_primitive(
1716 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1717 self._map_primitive_params(config_primitive, {}, vnfr_params))
1718 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1719 vnf_config_primitive, result, result_detail))
1720 if result == "FAILED":
1721 raise LcmException(result_detail)
1722 db_nsr_update["config-status"] = old_config_status
1723 scale_process = None
1724
1725 if RO_scaling_info:
1726 scale_process = "RO"
1727 RO = ROclient.ROClient(self.loop, **self.ro_config)
1728 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1729 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1730 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1731 # wait until ready
1732 RO_nslcmop_id = RO_desc["instance_action_id"]
1733 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1734
1735 RO_task_done = False
1736 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1737 detailed_status_old = None
1738 self.logger.debug(logging_text + step)
1739
1740 deployment_timeout = 1 * 3600 # One hour
1741 while deployment_timeout > 0:
1742 if not RO_task_done:
1743 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1744 extra_item_id=RO_nslcmop_id)
1745 ns_status, ns_status_info = RO.check_action_status(desc)
1746 if ns_status == "ERROR":
1747 raise ROclient.ROClientException(ns_status_info)
1748 elif ns_status == "BUILD":
1749 detailed_status = step + "; {}".format(ns_status_info)
1750 elif ns_status == "ACTIVE":
1751 RO_task_done = True
1752 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1753 self.logger.debug(logging_text + step)
1754 else:
1755 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1756 else:
1757 desc = await RO.show("ns", RO_nsr_id)
1758 ns_status, ns_status_info = RO.check_ns_status(desc)
1759 if ns_status == "ERROR":
1760 raise ROclient.ROClientException(ns_status_info)
1761 elif ns_status == "BUILD":
1762 detailed_status = step + "; {}".format(ns_status_info)
1763 elif ns_status == "ACTIVE":
1764 step = detailed_status = \
1765 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1766 if not vnfr_scaled:
1767 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1768 vnfr_scaled = True
1769 try:
1770 desc = await RO.show("ns", RO_nsr_id)
1771 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1772 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1773 break
1774 except LcmExceptionNoMgmtIP:
1775 pass
1776 else:
1777 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1778 if detailed_status != detailed_status_old:
1779 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1780 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1781
1782 await asyncio.sleep(5, loop=self.loop)
1783 deployment_timeout -= 5
1784 if deployment_timeout <= 0:
1785 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1786
1787 # update VDU_SCALING_INFO with the obtained ip_addresses
1788 if vdu_scaling_info["scaling_direction"] == "OUT":
1789 for vdur in reversed(db_vnfr["vdur"]):
1790 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1791 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1792 vdu_scaling_info["vdu"].append({
1793 "name": vdur["name"],
1794 "vdu_id": vdur["vdu-id-ref"],
1795 "interface": []
1796 })
1797 for interface in vdur["interfaces"]:
1798 vdu_scaling_info["vdu"][-1]["interface"].append({
1799 "name": interface["name"],
1800 "ip_address": interface["ip-address"],
1801 "mac_address": interface.get("mac-address"),
1802 })
1803 del vdu_scaling_info["vdu-create"]
1804
1805 scale_process = None
1806 if db_nsr_update:
1807 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1808
1809 # execute primitive service POST-SCALING
1810 step = "Executing post-scale vnf-config-primitive"
1811 if scaling_descriptor.get("scaling-config-action"):
1812 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1813 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1814 and scaling_type == "SCALE_OUT":
1815 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1816 step = db_nslcmop_update["detailed-status"] = \
1817 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1818
1819 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1820 if db_vnfr.get("additionalParamsForVnf"):
1821 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1822
1823 # look for primitive
1824 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1825 if config_primitive["name"] == vnf_config_primitive:
1826 break
1827 else:
1828 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1829 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1830 "match any vnf-configuration:config-primitive".format(scaling_group,
1831 config_primitive))
1832 scale_process = "VCA"
1833 db_nsr_update["config-status"] = "configuring post-scaling"
1834
1835 result, result_detail = await self._ns_execute_primitive(
1836 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1837 self._map_primitive_params(config_primitive, {}, vnfr_params))
1838 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1839 vnf_config_primitive, result, result_detail))
1840 if result == "FAILED":
1841 raise LcmException(result_detail)
1842 db_nsr_update["config-status"] = old_config_status
1843 scale_process = None
1844
1845 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1846 db_nslcmop_update["statusEnteredTime"] = time()
1847 db_nslcmop_update["detailed-status"] = "done"
1848 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1849 db_nsr_update["operational-status"] = old_operational_status
1850 db_nsr_update["config-status"] = old_config_status
1851 return
1852 except (ROclient.ROClientException, DbException, LcmException) as e:
1853 self.logger.error(logging_text + "Exit Exception {}".format(e))
1854 exc = e
1855 except asyncio.CancelledError:
1856 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1857 exc = "Operation was cancelled"
1858 except Exception as e:
1859 exc = traceback.format_exc()
1860 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1861 finally:
1862 if exc:
1863 if db_nslcmop:
1864 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1865 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1866 db_nslcmop_update["statusEnteredTime"] = time()
1867 if db_nsr:
1868 db_nsr_update["operational-status"] = old_operational_status
1869 db_nsr_update["config-status"] = old_config_status
1870 db_nsr_update["detailed-status"] = ""
1871 db_nsr_update["_admin.nslcmop"] = None
1872 if scale_process:
1873 if "VCA" in scale_process:
1874 db_nsr_update["config-status"] = "failed"
1875 if "RO" in scale_process:
1876 db_nsr_update["operational-status"] = "failed"
1877 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1878 exc)
1879 try:
1880 if db_nslcmop and db_nslcmop_update:
1881 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1882 if db_nsr:
1883 db_nsr_update["_admin.nslcmop"] = None
1884 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1885 except DbException as e:
1886 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1887 if nslcmop_operation_state:
1888 try:
1889 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1890 "operationState": nslcmop_operation_state},
1891 loop=self.loop)
1892 # if cooldown_time:
1893 # await asyncio.sleep(cooldown_time)
1894 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1895 except Exception as e:
1896 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1897 self.logger.debug(logging_text + "Exit")
1898 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")