bug 561: Obtain Primitive status/details
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 2 * 3600 # global timeout for deployment
73 timeout_charm_delete = 10 * 60
74 timeout_primitive = 10 * 60 # timeout for primitive execution
75
76 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 # logging
83 self.logger = logging.getLogger('lcm.ns')
84 self.loop = loop
85 self.lcm_tasks = lcm_tasks
86
87 super().__init__(db, msg, fs, self.logger)
88
89 self.ro_config = ro_config
90
91 self.n2vc = N2VC(
92 log=self.logger,
93 server=vca_config['host'],
94 port=vca_config['port'],
95 user=vca_config['user'],
96 secret=vca_config['secret'],
97 # TODO: This should point to the base folder where charms are stored,
98 # if there is a common one (like object storage). Otherwise, leave
99 # it unset and pass it via DeployCharms
100 # artifacts=vca_config[''],
101 artifacts=None,
102 )
103
104 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
105 """
106 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
107 :param vnfd: input vnfd
108 :param new_id: overrides vnf id if provided
109 :param additionalParams: Instantiation params for VNFs provided
110 :param nsrId: Id of the NSR
111 :return: copy of vnfd
112 """
113 try:
114 vnfd_RO = deepcopy(vnfd)
115 # remove unused by RO configuration, monitoring, scaling and internal keys
116 vnfd_RO.pop("_id", None)
117 vnfd_RO.pop("_admin", None)
118 vnfd_RO.pop("vnf-configuration", None)
119 vnfd_RO.pop("monitoring-param", None)
120 vnfd_RO.pop("scaling-group-descriptor", None)
121 if new_id:
122 vnfd_RO["id"] = new_id
123
124 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
125 for vdu in get_iterable(vnfd_RO, "vdu"):
126 cloud_init_file = None
127 if vdu.get("cloud-init-file"):
128 base_folder = vnfd["_admin"]["storage"]
129 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
130 vdu["cloud-init-file"])
131 with self.fs.file_open(cloud_init_file, "r") as ci_file:
132 cloud_init_content = ci_file.read()
133 vdu.pop("cloud-init-file", None)
134 elif vdu.get("cloud-init"):
135 cloud_init_content = vdu["cloud-init"]
136 else:
137 continue
138
139 env = Environment()
140 ast = env.parse(cloud_init_content)
141 mandatory_vars = meta.find_undeclared_variables(ast)
142 if mandatory_vars:
143 for var in mandatory_vars:
144 if not additionalParams or var not in additionalParams.keys():
145 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
146 "file, must be provided in the instantiation parameters inside the "
147 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
148 template = Template(cloud_init_content)
149 cloud_init_content = template.render(additionalParams or {})
150 vdu["cloud-init"] = cloud_init_content
151
152 return vnfd_RO
153 except FsException as e:
154 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
155 format(vnfd["id"], vdu["id"], cloud_init_file, e))
156 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
157 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
158 format(vnfd["id"], vdu["id"], e))
159
160 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
161 """
162 Callback both for charm status change and task completion
163 :param model_name: Charm model name
164 :param application_name: Charm application name
165 :param status: Can be
166 - blocked: The unit needs manual intervention
167 - maintenance: The unit is actively deploying/configuring
168 - waiting: The unit is waiting for another charm to be ready
169 - active: The unit is deployed, configured, and ready
170 - error: The charm has failed and needs attention.
171 - terminated: The charm has been destroyed
172 - removing,
173 - removed
174 :param message: detailed message error
175 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
176 nsr_id:
177 nslcmop_id:
178 lcmOperationType: currently "instantiate"
179 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
180 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
181 n2vc_event: event used to notify instantiation task that some change has been produced
182 :param task: None for charm status change, or task for completion task callback
183 :return:
184 """
185 try:
186 nsr_id = n2vc_info["nsr_id"]
187 deployed = n2vc_info["deployed"]
188 db_nsr_update = n2vc_info["db_update"]
189 nslcmop_id = n2vc_info["nslcmop_id"]
190 ns_operation = n2vc_info["lcmOperationType"]
191 n2vc_event = n2vc_info["n2vc_event"]
192 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
193 application_name)
194 for vca_index, vca_deployed in enumerate(deployed):
195 if not vca_deployed:
196 continue
197 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
198 break
199 else:
200 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
201 return
202 if task:
203 if task.cancelled():
204 self.logger.debug(logging_text + " task Cancelled")
205 vca_deployed['operational-status'] = "error"
206 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
207 vca_deployed['detailed-status'] = "Task Cancelled"
208 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
209
210 elif task.done():
211 exc = task.exception()
212 if exc:
213 self.logger.error(logging_text + " task Exception={}".format(exc))
214 vca_deployed['operational-status'] = "error"
215 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
216 vca_deployed['detailed-status'] = str(exc)
217 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
218 else:
219 self.logger.debug(logging_text + " task Done")
220 # task is Done, but callback is still ongoing. So ignore
221 return
222 elif status:
223 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
224 if vca_deployed['operational-status'] == status:
225 return # same status, ignore
226 vca_deployed['operational-status'] = status
227 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
228 vca_deployed['detailed-status'] = str(message)
229 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
230 else:
231 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
232 return
233 # wake up instantiate task
234 n2vc_event.set()
235 except Exception as e:
236 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
237
238 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
239 """
240 Creates a RO ns descriptor from OSM ns_instantiate params
241 :param ns_params: OSM instantiate params
242 :return: The RO ns descriptor
243 """
244 vim_2_RO = {}
245 # TODO feature 1417: Check that no instantiation is set over PDU
246 # check if PDU forces a concrete vim-network-id and add it
247 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
248
249 def vim_account_2_RO(vim_account):
250 if vim_account in vim_2_RO:
251 return vim_2_RO[vim_account]
252
253 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
254 if db_vim["_admin"]["operationalState"] != "ENABLED":
255 raise LcmException("VIM={} is not available. operationalState={}".format(
256 vim_account, db_vim["_admin"]["operationalState"]))
257 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
258 vim_2_RO[vim_account] = RO_vim_id
259 return RO_vim_id
260
261 def ip_profile_2_RO(ip_profile):
262 RO_ip_profile = deepcopy((ip_profile))
263 if "dns-server" in RO_ip_profile:
264 if isinstance(RO_ip_profile["dns-server"], list):
265 RO_ip_profile["dns-address"] = []
266 for ds in RO_ip_profile.pop("dns-server"):
267 RO_ip_profile["dns-address"].append(ds['address'])
268 else:
269 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
270 if RO_ip_profile.get("ip-version") == "ipv4":
271 RO_ip_profile["ip-version"] = "IPv4"
272 if RO_ip_profile.get("ip-version") == "ipv6":
273 RO_ip_profile["ip-version"] = "IPv6"
274 if "dhcp-params" in RO_ip_profile:
275 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
276 return RO_ip_profile
277
278 if not ns_params:
279 return None
280 RO_ns_params = {
281 # "name": ns_params["nsName"],
282 # "description": ns_params.get("nsDescription"),
283 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
284 # "scenario": ns_params["nsdId"],
285 }
286 if n2vc_key_list:
287 for vnfd_ref, vnfd in vnfd_dict.items():
288 vdu_needed_access = []
289 mgmt_cp = None
290 if vnfd.get("vnf-configuration"):
291 if vnfd.get("mgmt-interface"):
292 if vnfd["mgmt-interface"].get("vdu-id"):
293 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
294 elif vnfd["mgmt-interface"].get("cp"):
295 mgmt_cp = vnfd["mgmt-interface"]["cp"]
296
297 for vdu in vnfd.get("vdu", ()):
298 if vdu.get("vdu-configuration"):
299 vdu_needed_access.append(vdu["id"])
300 elif mgmt_cp:
301 for vdu_interface in vdu.get("interface"):
302 if vdu_interface.get("external-connection-point-ref") and \
303 vdu_interface["external-connection-point-ref"] == mgmt_cp:
304 vdu_needed_access.append(vdu["id"])
305 mgmt_cp = None
306 break
307
308 if vdu_needed_access:
309 for vnf_member in nsd.get("constituent-vnfd"):
310 if vnf_member["vnfd-id-ref"] != vnfd_ref:
311 continue
312 for vdu in vdu_needed_access:
313 populate_dict(RO_ns_params,
314 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
315 n2vc_key_list)
316
317 if ns_params.get("vduImage"):
318 RO_ns_params["vduImage"] = ns_params["vduImage"]
319
320 if ns_params.get("ssh_keys"):
321 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
322 for vnf_params in get_iterable(ns_params, "vnf"):
323 for constituent_vnfd in nsd["constituent-vnfd"]:
324 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
325 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
326 break
327 else:
328 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
329 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
330 if vnf_params.get("vimAccountId"):
331 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
332 vim_account_2_RO(vnf_params["vimAccountId"]))
333
334 for vdu_params in get_iterable(vnf_params, "vdu"):
335 # TODO feature 1417: check that this VDU exist and it is not a PDU
336 if vdu_params.get("volume"):
337 for volume_params in vdu_params["volume"]:
338 if volume_params.get("vim-volume-id"):
339 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
340 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
341 volume_params["vim-volume-id"])
342 if vdu_params.get("interface"):
343 for interface_params in vdu_params["interface"]:
344 if interface_params.get("ip-address"):
345 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
346 vdu_params["id"], "interfaces", interface_params["name"],
347 "ip_address"),
348 interface_params["ip-address"])
349 if interface_params.get("mac-address"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_params["id"], "interfaces", interface_params["name"],
352 "mac_address"),
353 interface_params["mac-address"])
354 if interface_params.get("floating-ip-required"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "floating-ip"),
358 interface_params["floating-ip-required"])
359
360 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
361 if internal_vld_params.get("vim-network-name"):
362 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
363 internal_vld_params["name"], "vim-network-name"),
364 internal_vld_params["vim-network-name"])
365 if internal_vld_params.get("vim-network-id"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
367 internal_vld_params["name"], "vim-network-id"),
368 internal_vld_params["vim-network-id"])
369 if internal_vld_params.get("ip-profile"):
370 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
371 internal_vld_params["name"], "ip-profile"),
372 ip_profile_2_RO(internal_vld_params["ip-profile"]))
373
374 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
375 # look for interface
376 iface_found = False
377 for vdu_descriptor in vnf_descriptor["vdu"]:
378 for vdu_interface in vdu_descriptor["interface"]:
379 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
380 if icp_params.get("ip-address"):
381 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
382 vdu_descriptor["id"], "interfaces",
383 vdu_interface["name"], "ip_address"),
384 icp_params["ip-address"])
385
386 if icp_params.get("mac-address"):
387 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
388 vdu_descriptor["id"], "interfaces",
389 vdu_interface["name"], "mac_address"),
390 icp_params["mac-address"])
391 iface_found = True
392 break
393 if iface_found:
394 break
395 else:
396 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
397 "internal-vld:id-ref={} is not present at vnfd:internal-"
398 "connection-point".format(vnf_params["member-vnf-index"],
399 icp_params["id-ref"]))
400
401 for vld_params in get_iterable(ns_params, "vld"):
402 if "ip-profile" in vld_params:
403 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
404 ip_profile_2_RO(vld_params["ip-profile"]))
405 if vld_params.get("vim-network-name"):
406 RO_vld_sites = []
407 if isinstance(vld_params["vim-network-name"], dict):
408 for vim_account, vim_net in vld_params["vim-network-name"].items():
409 RO_vld_sites.append({
410 "netmap-use": vim_net,
411 "datacenter": vim_account_2_RO(vim_account)
412 })
413 else: # isinstance str
414 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
415 if RO_vld_sites:
416 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
417 if vld_params.get("vim-network-id"):
418 RO_vld_sites = []
419 if isinstance(vld_params["vim-network-id"], dict):
420 for vim_account, vim_net in vld_params["vim-network-id"].items():
421 RO_vld_sites.append({
422 "netmap-use": vim_net,
423 "datacenter": vim_account_2_RO(vim_account)
424 })
425 else: # isinstance str
426 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
427 if RO_vld_sites:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
429 if "vnfd-connection-point-ref" in vld_params:
430 for cp_params in vld_params["vnfd-connection-point-ref"]:
431 # look for interface
432 for constituent_vnfd in nsd["constituent-vnfd"]:
433 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
434 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
435 break
436 else:
437 raise LcmException(
438 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
439 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
440 match_cp = False
441 for vdu_descriptor in vnf_descriptor["vdu"]:
442 for interface_descriptor in vdu_descriptor["interface"]:
443 if interface_descriptor.get("external-connection-point-ref") == \
444 cp_params["vnfd-connection-point-ref"]:
445 match_cp = True
446 break
447 if match_cp:
448 break
449 else:
450 raise LcmException(
451 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
452 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
453 cp_params["member-vnf-index-ref"],
454 cp_params["vnfd-connection-point-ref"],
455 vnf_descriptor["id"]))
456 if cp_params.get("ip-address"):
457 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
458 vdu_descriptor["id"], "interfaces",
459 interface_descriptor["name"], "ip_address"),
460 cp_params["ip-address"])
461 if cp_params.get("mac-address"):
462 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
463 vdu_descriptor["id"], "interfaces",
464 interface_descriptor["name"], "mac_address"),
465 cp_params["mac-address"])
466 return RO_ns_params
467
468 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
469 # make a copy to do not change
470 vdu_create = copy(vdu_create)
471 vdu_delete = copy(vdu_delete)
472
473 vdurs = db_vnfr.get("vdur")
474 if vdurs is None:
475 vdurs = []
476 vdu_index = len(vdurs)
477 while vdu_index:
478 vdu_index -= 1
479 vdur = vdurs[vdu_index]
480 if vdur.get("pdu-type"):
481 continue
482 vdu_id_ref = vdur["vdu-id-ref"]
483 if vdu_create and vdu_create.get(vdu_id_ref):
484 for index in range(0, vdu_create[vdu_id_ref]):
485 vdur = deepcopy(vdur)
486 vdur["_id"] = str(uuid4())
487 vdur["count-index"] += 1
488 vdurs.insert(vdu_index+1+index, vdur)
489 del vdu_create[vdu_id_ref]
490 if vdu_delete and vdu_delete.get(vdu_id_ref):
491 del vdurs[vdu_index]
492 vdu_delete[vdu_id_ref] -= 1
493 if not vdu_delete[vdu_id_ref]:
494 del vdu_delete[vdu_id_ref]
495 # check all operations are done
496 if vdu_create or vdu_delete:
497 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
498 vdu_create))
499 if vdu_delete:
500 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
501 vdu_delete))
502
503 vnfr_update = {"vdur": vdurs}
504 db_vnfr["vdur"] = vdurs
505 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
506
507 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
508 """
509 Updates database nsr with the RO info for the created vld
510 :param ns_update_nsr: dictionary to be filled with the updated info
511 :param db_nsr: content of db_nsr. This is also modified
512 :param nsr_desc_RO: nsr descriptor from RO
513 :return: Nothing, LcmException is raised on errors
514 """
515
516 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
517 for net_RO in get_iterable(nsr_desc_RO, "nets"):
518 if vld["id"] != net_RO.get("ns_net_osm_id"):
519 continue
520 vld["vim-id"] = net_RO.get("vim_net_id")
521 vld["name"] = net_RO.get("vim_name")
522 vld["status"] = net_RO.get("status")
523 vld["status-detailed"] = net_RO.get("error_msg")
524 ns_update_nsr["vld.{}".format(vld_index)] = vld
525 break
526 else:
527 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
528
529 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
530 """
531 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
532 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
533 :param nsr_desc_RO: nsr descriptor from RO
534 :return: Nothing, LcmException is raised on errors
535 """
536 for vnf_index, db_vnfr in db_vnfrs.items():
537 for vnf_RO in nsr_desc_RO["vnfs"]:
538 if vnf_RO["member_vnf_index"] != vnf_index:
539 continue
540 vnfr_update = {}
541 if vnf_RO.get("ip_address"):
542 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
543 elif not db_vnfr.get("ip-address"):
544 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
545
546 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
547 vdur_RO_count_index = 0
548 if vdur.get("pdu-type"):
549 continue
550 for vdur_RO in get_iterable(vnf_RO, "vms"):
551 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
552 continue
553 if vdur["count-index"] != vdur_RO_count_index:
554 vdur_RO_count_index += 1
555 continue
556 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
557 vdur["ip-address"] = vdur_RO.get("ip_address")
558 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
559 vdur["name"] = vdur_RO.get("vim_name")
560 vdur["status"] = vdur_RO.get("status")
561 vdur["status-detailed"] = vdur_RO.get("error_msg")
562 for ifacer in get_iterable(vdur, "interfaces"):
563 for interface_RO in get_iterable(vdur_RO, "interfaces"):
564 if ifacer["name"] == interface_RO.get("internal_name"):
565 ifacer["ip-address"] = interface_RO.get("ip_address")
566 ifacer["mac-address"] = interface_RO.get("mac_address")
567 break
568 else:
569 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
570 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
571 vnfr_update["vdur.{}".format(vdu_index)] = vdur
572 break
573 else:
574 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
575 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
576
577 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
578 for net_RO in get_iterable(nsr_desc_RO, "nets"):
579 if vld["id"] != net_RO.get("vnf_net_osm_id"):
580 continue
581 vld["vim-id"] = net_RO.get("vim_net_id")
582 vld["name"] = net_RO.get("vim_name")
583 vld["status"] = net_RO.get("status")
584 vld["status-detailed"] = net_RO.get("error_msg")
585 vnfr_update["vld.{}".format(vld_index)] = vld
586 break
587 else:
588 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
589 vnf_index, vld["id"]))
590
591 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
592 break
593
594 else:
595 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
596
597 async def instantiate(self, nsr_id, nslcmop_id):
598 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
599 self.logger.debug(logging_text + "Enter")
600 # get all needed from database
601 start_deploy = time()
602 db_nsr = None
603 db_nslcmop = None
604 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
605 db_nslcmop_update = {}
606 nslcmop_operation_state = None
607 db_vnfrs = {}
608 RO_descriptor_number = 0 # number of descriptors created at RO
609 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
610 n2vc_info = {}
611 exc = None
612 try:
613 step = "Getting nslcmop={} from db".format(nslcmop_id)
614 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
615 step = "Getting nsr={} from db".format(nsr_id)
616 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
617 ns_params = db_nslcmop.get("operationParams")
618 nsd = db_nsr["nsd"]
619 nsr_name = db_nsr["name"] # TODO short-name??
620
621 # look if previous tasks in process
622 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
623 if task_dependency:
624 step = db_nslcmop_update["detailed-status"] = \
625 "Waiting for related tasks to be completed: {}".format(task_name)
626 self.logger.debug(logging_text + step)
627 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
628 _, pending = await asyncio.wait(task_dependency, timeout=3600)
629 if pending:
630 raise LcmException("Timeout waiting related tasks to be completed")
631
632 step = "Getting vnfrs from db"
633 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
634 db_vnfds_ref = {}
635 db_vnfds = {}
636 for vnfr in db_vnfrs_list:
637 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
638 vnfd_id = vnfr["vnfd-id"]
639 vnfd_ref = vnfr["vnfd-ref"]
640 if vnfd_id not in db_vnfds:
641 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
642 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
643 db_vnfds_ref[vnfd_ref] = vnfd
644 db_vnfds[vnfd_id] = vnfd
645
646 # Get or generates the _admin.deployed,VCA list
647 vca_deployed_list = None
648 if db_nsr["_admin"].get("deployed"):
649 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
650 if vca_deployed_list is None:
651 vca_deployed_list = []
652 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
653 elif isinstance(vca_deployed_list, dict):
654 # maintain backward compatibility. Change a dict to list at database
655 vca_deployed_list = list(vca_deployed_list.values())
656 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
657
658 db_nsr_update["detailed-status"] = "creating"
659 db_nsr_update["operational-status"] = "init"
660
661 RO = ROclient.ROClient(self.loop, **self.ro_config)
662
663 # set state to INSTANTIATED. When instantiated NBI will not delete directly
664 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
665 self.update_db_2("nsrs", nsr_id, db_nsr_update)
666
667 # get vnfds, instantiate at RO
668 for c_vnf in nsd.get("constituent-vnfd", ()):
669 member_vnf_index = c_vnf["member-vnf-index"]
670 vnfd_ref = vnfd["id"]
671 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
672 # self.logger.debug(logging_text + step)
673 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
674 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
675 RO_descriptor_number += 1
676
677 # look if present
678 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
679 if vnfd_list:
680 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
681 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
682 vnfd_ref, vnfd_list[0]["uuid"]))
683 else:
684 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
685 get("additionalParamsForVnf"), nsr_id)
686 desc = await RO.create("vnfd", descriptor=vnfd_RO)
687 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
688 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
689 vnfd_ref, desc["uuid"]))
690 self.update_db_2("nsrs", nsr_id, db_nsr_update)
691
692 # create nsd at RO
693 nsd_ref = nsd["id"]
694 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
695 # self.logger.debug(logging_text + step)
696
697 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
698 RO_descriptor_number += 1
699 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
700 if nsd_list:
701 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
702 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
703 nsd_ref, RO_nsd_uuid))
704 else:
705 nsd_RO = deepcopy(nsd)
706 nsd_RO["id"] = RO_osm_nsd_id
707 nsd_RO.pop("_id", None)
708 nsd_RO.pop("_admin", None)
709 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
710 member_vnf_index = c_vnf["member-vnf-index"]
711 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
712 for c_vld in nsd_RO.get("vld", ()):
713 for cp in c_vld.get("vnfd-connection-point-ref", ()):
714 member_vnf_index = cp["member-vnf-index-ref"]
715 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
716
717 desc = await RO.create("nsd", descriptor=nsd_RO)
718 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
719 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
720 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
721 self.update_db_2("nsrs", nsr_id, db_nsr_update)
722
723 # Crate ns at RO
724 # if present use it unless in error status
725 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
726 if RO_nsr_id:
727 try:
728 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
729 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
730 desc = await RO.show("ns", RO_nsr_id)
731 except ROclient.ROClientException as e:
732 if e.http_code != HTTPStatus.NOT_FOUND:
733 raise
734 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
735 if RO_nsr_id:
736 ns_status, ns_status_info = RO.check_ns_status(desc)
737 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
738 if ns_status == "ERROR":
739 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
740 self.logger.debug(logging_text + step)
741 await RO.delete("ns", RO_nsr_id)
742 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
743 if not RO_nsr_id:
744 step = db_nsr_update["detailed-status"] = "Checking dependencies"
745 # self.logger.debug(logging_text + step)
746
747 # check if VIM is creating and wait look if previous tasks in process
748 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
749 if task_dependency:
750 step = "Waiting for related tasks to be completed: {}".format(task_name)
751 self.logger.debug(logging_text + step)
752 await asyncio.wait(task_dependency, timeout=3600)
753 if ns_params.get("vnf"):
754 for vnf in ns_params["vnf"]:
755 if "vimAccountId" in vnf:
756 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
757 vnf["vimAccountId"])
758 if task_dependency:
759 step = "Waiting for related tasks to be completed: {}".format(task_name)
760 self.logger.debug(logging_text + step)
761 await asyncio.wait(task_dependency, timeout=3600)
762
763 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
764
765 # feature 1429. Add n2vc public key to needed VMs
766 n2vc_key = await self.n2vc.GetPublicKey()
767 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
768
769 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
770 desc = await RO.create("ns", descriptor=RO_ns_params,
771 name=db_nsr["name"],
772 scenario=RO_nsd_uuid)
773 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
774 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
775 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
776 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
777 self.update_db_2("nsrs", nsr_id, db_nsr_update)
778
779 # wait until NS is ready
780 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
781 detailed_status_old = None
782 self.logger.debug(logging_text + step)
783
784 while time() <= start_deploy + self.total_deploy_timeout:
785 desc = await RO.show("ns", RO_nsr_id)
786 ns_status, ns_status_info = RO.check_ns_status(desc)
787 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
788 if ns_status == "ERROR":
789 raise ROclient.ROClientException(ns_status_info)
790 elif ns_status == "BUILD":
791 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
792 elif ns_status == "ACTIVE":
793 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
794 try:
795 self.ns_update_vnfr(db_vnfrs, desc)
796 break
797 except LcmExceptionNoMgmtIP:
798 pass
799 else:
800 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
801 if detailed_status != detailed_status_old:
802 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
803 self.update_db_2("nsrs", nsr_id, db_nsr_update)
804 await asyncio.sleep(5, loop=self.loop)
805 else: # total_deploy_timeout
806 raise ROclient.ROClientException("Timeout waiting ns to be ready")
807
808 step = "Updating NSR"
809 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
810
811 db_nsr["detailed-status"] = "Configuring vnfr"
812 self.update_db_2("nsrs", nsr_id, db_nsr_update)
813
814 # The parameters we'll need to deploy a charm
815 number_to_configure = 0
816
817 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info):
818 """An inner function to deploy the charm from either vnf or vdu
819 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
820 """
821 if not charm_params["rw_mgmt_ip"]:
822 raise LcmException("vnfd/vdu has not management ip address to configure it")
823 # Login to the VCA.
824 # if number_to_configure == 0:
825 # self.logger.debug("Logging into N2VC...")
826 # task = asyncio.ensure_future(self.n2vc.login())
827 # yield from asyncio.wait_for(task, 30.0)
828 # self.logger.debug("Logged into N2VC!")
829
830 # # await self.n2vc.login()
831
832 # Note: The charm needs to exist on disk at the location
833 # specified by charm_path.
834 base_folder = vnfd["_admin"]["storage"]
835 storage_params = self.fs.get_params()
836 charm_path = "{}{}/{}/charms/{}".format(
837 storage_params["path"],
838 base_folder["folder"],
839 base_folder["pkg-dir"],
840 proxy_charm
841 )
842
843 # ns_name will be ignored in the current version of N2VC
844 # but will be implemented for the next point release.
845 model_name = "default" # TODO bug 585 nsr_id
846 if vdu_id:
847 vdu_id_text = vdu_id + "-"
848 else:
849 vdu_id_text = "-"
850 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
851
852 vca_index = len(vca_deployed_list)
853 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
854 # 26*26 charm in the same NS
855 application_name = application_name[0:48]
856 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
857 vca_deployed_ = {
858 "member-vnf-index": vnf_index,
859 "vdu_id": vdu_id,
860 "model": model_name,
861 "application": application_name,
862 "operational-status": "init",
863 "detailed-status": "",
864 "vnfd_id": vnfd_id,
865 "vdu_name": vdu_name,
866 "vdu_count_index": vdu_count_index,
867 }
868 vca_deployed_list.append(vca_deployed_)
869 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
870 self.update_db_2("nsrs", nsr_id, db_nsr_update)
871
872 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
873 proxy_charm))
874 if not n2vc_info:
875 n2vc_info["nsr_id"] = nsr_id
876 n2vc_info["nslcmop_id"] = nslcmop_id
877 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
878 n2vc_info["lcmOperationType"] = "instantiate"
879 n2vc_info["deployed"] = vca_deployed_list
880 n2vc_info["db_update"] = db_nsr_update
881 task = asyncio.ensure_future(
882 self.n2vc.DeployCharms(
883 model_name, # The network service name
884 application_name, # The application name
885 vnfd, # The vnf descriptor
886 charm_path, # Path to charm
887 charm_params, # Runtime params, like mgmt ip
888 {}, # for native charms only
889 self.n2vc_callback, # Callback for status changes
890 n2vc_info, # Callback parameter
891 None, # Callback parameter (task)
892 )
893 )
894 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
895 n2vc_info))
896 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
897
898 step = "Looking for needed vnfd to configure"
899 self.logger.debug(logging_text + step)
900
901 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
902 vnfd_id = c_vnf["vnfd-id-ref"]
903 vnf_index = str(c_vnf["member-vnf-index"])
904 vnfd = db_vnfds_ref[vnfd_id]
905
906 # Get additional parameters
907 vnfr_params = {}
908 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
909 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
910 for k, v in vnfr_params.items():
911 if isinstance(v, str) and v.startswith("!!yaml "):
912 vnfr_params[k] = yaml.safe_load(v[7:])
913
914 # Check if this VNF has a charm configuration
915 vnf_config = vnfd.get("vnf-configuration")
916 if vnf_config and vnf_config.get("juju"):
917 proxy_charm = vnf_config["juju"]["charm"]
918
919 if proxy_charm:
920 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
921 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
922 charm_params = {
923 "user_values": vnfr_params,
924 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
925 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {}
926 }
927
928 # Login to the VCA. If there are multiple calls to login(),
929 # subsequent calls will be a nop and return immediately.
930 await self.n2vc.login()
931 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
932 number_to_configure += 1
933
934 # Deploy charms for each VDU that supports one.
935 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
936 vdu_config = vdu.get('vdu-configuration')
937 proxy_charm = None
938
939 if vdu_config and vdu_config.get("juju"):
940 proxy_charm = vdu_config["juju"]["charm"]
941
942 if proxy_charm:
943 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
944 await self.n2vc.login()
945 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
946 # TODO for the moment only first vdu_id contains a charm deployed
947 if vdur["vdu-id-ref"] != vdu["id"]:
948 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
949 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
950 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
951 charm_params = {
952 "user_values": vnfr_params,
953 "rw_mgmt_ip": vdur["ip-address"],
954 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
955 }
956 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
957 charm_params, n2vc_info)
958 number_to_configure += 1
959
960 db_nsr_update["operational-status"] = "running"
961 configuration_failed = False
962 if number_to_configure:
963 old_status = "configuring: init: {}".format(number_to_configure)
964 db_nsr_update["config-status"] = old_status
965 db_nsr_update["detailed-status"] = old_status
966 db_nslcmop_update["detailed-status"] = old_status
967
968 # wait until all are configured.
969 while time() <= start_deploy + self.total_deploy_timeout:
970 if db_nsr_update:
971 self.update_db_2("nsrs", nsr_id, db_nsr_update)
972 if db_nslcmop_update:
973 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
974 # TODO add a fake task that set n2vc_event after some time
975 await n2vc_info["n2vc_event"].wait()
976 n2vc_info["n2vc_event"].clear()
977 all_active = True
978 status_map = {}
979 n2vc_error_text = [] # contain text error list. If empty no one is in error status
980 now = time()
981 for vca_deployed in vca_deployed_list:
982 vca_status = vca_deployed["operational-status"]
983 if vca_status not in status_map:
984 # Initialize it
985 status_map[vca_status] = 0
986 status_map[vca_status] += 1
987
988 if vca_status == "active":
989 vca_deployed.pop("time_first_error", None)
990 vca_deployed.pop("status_first_error", None)
991 continue
992
993 all_active = False
994 if vca_status in ("error", "blocked"):
995 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
996 # if not first time in this status error
997 if not vca_deployed.get("time_first_error"):
998 vca_deployed["time_first_error"] = now
999 continue
1000 if vca_deployed.get("time_first_error") and \
1001 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1002 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1003 .format(vca_deployed["member-vnf-index"],
1004 vca_deployed["vdu_id"], vca_status,
1005 vca_deployed["detailed-status-error"]))
1006
1007 if all_active:
1008 break
1009 elif n2vc_error_text:
1010 db_nsr_update["config-status"] = "failed"
1011 error_text = "fail configuring " + ";".join(n2vc_error_text)
1012 db_nsr_update["detailed-status"] = error_text
1013 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1014 db_nslcmop_update["detailed-status"] = error_text
1015 db_nslcmop_update["statusEnteredTime"] = time()
1016 configuration_failed = True
1017 break
1018 else:
1019 cs = "configuring: "
1020 separator = ""
1021 for status, num in status_map.items():
1022 cs += separator + "{}: {}".format(status, num)
1023 separator = ", "
1024 if old_status != cs:
1025 db_nsr_update["config-status"] = cs
1026 db_nsr_update["detailed-status"] = cs
1027 db_nslcmop_update["detailed-status"] = cs
1028 old_status = cs
1029 else: # total_deploy_timeout
1030 raise LcmException("Timeout waiting ns to be configured")
1031
1032 if not configuration_failed:
1033 # all is done
1034 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1035 db_nslcmop_update["statusEnteredTime"] = time()
1036 db_nslcmop_update["detailed-status"] = "done"
1037 db_nsr_update["config-status"] = "configured"
1038 db_nsr_update["detailed-status"] = "done"
1039
1040 return
1041
1042 except (ROclient.ROClientException, DbException, LcmException) as e:
1043 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1044 exc = e
1045 except asyncio.CancelledError:
1046 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1047 exc = "Operation was cancelled"
1048 except Exception as e:
1049 exc = traceback.format_exc()
1050 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1051 exc_info=True)
1052 finally:
1053 if exc:
1054 if db_nsr:
1055 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1056 db_nsr_update["operational-status"] = "failed"
1057 if db_nslcmop:
1058 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1059 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1060 db_nslcmop_update["statusEnteredTime"] = time()
1061 try:
1062 if db_nsr:
1063 db_nsr_update["_admin.nslcmop"] = None
1064 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1065 if db_nslcmop_update:
1066 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1067 except DbException as e:
1068 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1069 if nslcmop_operation_state:
1070 try:
1071 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1072 "operationState": nslcmop_operation_state},
1073 loop=self.loop)
1074 except Exception as e:
1075 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1076
1077 self.logger.debug(logging_text + "Exit")
1078 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1079
1080 async def _destroy_charm(self, model, application):
1081 """
1082 Order N2VC destroy a charm
1083 :param model:
1084 :param application:
1085 :return: True if charm does not exist. False if it exist
1086 """
1087 if not await self.n2vc.HasApplication(model, application):
1088 return True # Already removed
1089 await self.n2vc.RemoveCharms(model, application)
1090 return False
1091
1092 async def _wait_charm_destroyed(self, model, application, timeout):
1093 """
1094 Wait until charm does not exist
1095 :param model:
1096 :param application:
1097 :param timeout:
1098 :return: True if not exist, False if timeout
1099 """
1100 while True:
1101 if not await self.n2vc.HasApplication(model, application):
1102 return True
1103 if timeout < 0:
1104 return False
1105 await asyncio.sleep(10)
1106 timeout -= 10
1107
1108 async def terminate(self, nsr_id, nslcmop_id):
1109 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1110 self.logger.debug(logging_text + "Enter")
1111 db_nsr = None
1112 db_nslcmop = None
1113 exc = None
1114 failed_detail = [] # annotates all failed error messages
1115 vca_time_destroy = None # time of where destroy charm order
1116 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1117 db_nslcmop_update = {}
1118 nslcmop_operation_state = None
1119 try:
1120 step = "Getting nslcmop={} from db".format(nslcmop_id)
1121 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1122 step = "Getting nsr={} from db".format(nsr_id)
1123 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1124 # nsd = db_nsr["nsd"]
1125 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1126 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1127 return
1128 # #TODO check if VIM is creating and wait
1129 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1130
1131 db_nsr_update["operational-status"] = "terminating"
1132 db_nsr_update["config-status"] = "terminating"
1133
1134 if nsr_deployed and nsr_deployed.get("VCA"):
1135 try:
1136 step = "Scheduling configuration charms removing"
1137 db_nsr_update["detailed-status"] = "Deleting charms"
1138 self.logger.debug(logging_text + step)
1139 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1140 # for backward compatibility
1141 if isinstance(nsr_deployed["VCA"], dict):
1142 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1143 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1144 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1145
1146 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1147 if vca_deployed:
1148 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1149 vca_deployed.clear()
1150 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1151 else:
1152 vca_time_destroy = time()
1153 except Exception as e:
1154 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1155
1156 # remove from RO
1157 RO_fail = False
1158 RO = ROclient.ROClient(self.loop, **self.ro_config)
1159
1160 # Delete ns
1161 RO_nsr_id = RO_delete_action = None
1162 if nsr_deployed and nsr_deployed.get("RO"):
1163 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1164 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1165 try:
1166 if RO_nsr_id:
1167 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1168 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1169 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1170 self.logger.debug(logging_text + step)
1171 desc = await RO.delete("ns", RO_nsr_id)
1172 RO_delete_action = desc["action_id"]
1173 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1174 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1175 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1176 if RO_delete_action:
1177 # wait until NS is deleted from VIM
1178 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1179 format(RO_nsr_id, RO_delete_action)
1180 detailed_status_old = None
1181 self.logger.debug(logging_text + step)
1182
1183 delete_timeout = 20 * 60 # 20 minutes
1184 while delete_timeout > 0:
1185 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1186 extra_item_id=RO_delete_action)
1187 ns_status, ns_status_info = RO.check_action_status(desc)
1188 if ns_status == "ERROR":
1189 raise ROclient.ROClientException(ns_status_info)
1190 elif ns_status == "BUILD":
1191 detailed_status = step + "; {}".format(ns_status_info)
1192 elif ns_status == "ACTIVE":
1193 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1194 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1195 break
1196 else:
1197 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1198 if detailed_status != detailed_status_old:
1199 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1200 db_nsr_update["detailed-status"] = detailed_status
1201 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1202 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1203 await asyncio.sleep(5, loop=self.loop)
1204 delete_timeout -= 5
1205 else: # delete_timeout <= 0:
1206 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1207
1208 except ROclient.ROClientException as e:
1209 if e.http_code == 404: # not found
1210 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1211 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1212 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1213 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1214 elif e.http_code == 409: # conflict
1215 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1216 self.logger.debug(logging_text + failed_detail[-1])
1217 RO_fail = True
1218 else:
1219 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1220 self.logger.error(logging_text + failed_detail[-1])
1221 RO_fail = True
1222
1223 # Delete nsd
1224 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1225 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1226 try:
1227 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1228 "Deleting nsd at RO"
1229 await RO.delete("nsd", RO_nsd_id)
1230 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1231 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1232 except ROclient.ROClientException as e:
1233 if e.http_code == 404: # not found
1234 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1235 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1236 elif e.http_code == 409: # conflict
1237 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1238 self.logger.debug(logging_text + failed_detail[-1])
1239 RO_fail = True
1240 else:
1241 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1242 self.logger.error(logging_text + failed_detail[-1])
1243 RO_fail = True
1244
1245 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1246 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1247 if not RO_vnfd_id:
1248 continue
1249 try:
1250 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1251 "Deleting vnfd={} at RO".format(vnf_id)
1252 await RO.delete("vnfd", RO_vnfd_id)
1253 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1254 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1255 except ROclient.ROClientException as e:
1256 if e.http_code == 404: # not found
1257 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1258 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1259 elif e.http_code == 409: # conflict
1260 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1261 self.logger.debug(logging_text + failed_detail[-1])
1262 else:
1263 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1264 self.logger.error(logging_text + failed_detail[-1])
1265
1266 # wait until charm deleted
1267 if vca_time_destroy:
1268 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1269 "Waiting for deletion of configuration charms"
1270 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1271 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1272 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1273 if not vca_deployed:
1274 continue
1275 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1276 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1277 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1278 timeout):
1279 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1280 vca_deployed["application"]))
1281 else:
1282 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1283
1284 if failed_detail:
1285 self.logger.error(logging_text + " ;".join(failed_detail))
1286 db_nsr_update["operational-status"] = "failed"
1287 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1288 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1289 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1290 db_nslcmop_update["statusEnteredTime"] = time()
1291 elif db_nslcmop["operationParams"].get("autoremove"):
1292 self.db.del_one("nsrs", {"_id": nsr_id})
1293 db_nsr = None
1294 db_nsr_update.clear()
1295 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1296 db_nslcmop = None
1297 nslcmop_operation_state = "COMPLETED"
1298 db_nslcmop_update.clear()
1299 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1300 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1301 {"_admin.usageState": "NOT_IN_USE", "_admin.usage": None})
1302 self.logger.debug(logging_text + "Delete from database")
1303 else:
1304 db_nsr_update["operational-status"] = "terminated"
1305 db_nsr_update["detailed-status"] = "Done"
1306 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1307 db_nslcmop_update["detailed-status"] = "Done"
1308 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1309 db_nslcmop_update["statusEnteredTime"] = time()
1310
1311 except (ROclient.ROClientException, DbException) as e:
1312 self.logger.error(logging_text + "Exit Exception {}".format(e))
1313 exc = e
1314 except asyncio.CancelledError:
1315 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1316 exc = "Operation was cancelled"
1317 except Exception as e:
1318 exc = traceback.format_exc()
1319 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1320 finally:
1321 if exc and db_nslcmop:
1322 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1323 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1324 db_nslcmop_update["statusEnteredTime"] = time()
1325 try:
1326 if db_nslcmop and db_nslcmop_update:
1327 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1328 if db_nsr:
1329 db_nsr_update["_admin.nslcmop"] = None
1330 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1331 except DbException as e:
1332 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1333 if nslcmop_operation_state:
1334 try:
1335 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1336 "operationState": nslcmop_operation_state},
1337 loop=self.loop)
1338 except Exception as e:
1339 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1340 self.logger.debug(logging_text + "Exit")
1341 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1342
1343 @staticmethod
1344 def _map_primitive_params(primitive_desc, params, instantiation_params):
1345 """
1346 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
1347 The default-value is used. If it is between < > it look for a value at instantiation_params
1348 :param primitive_desc: portion of VNFD/NSD that describes primitive
1349 :param params: Params provided by user
1350 :param instantiation_params: Instantiation params provided by user
1351 :return: a dictionary with the calculated params
1352 """
1353 calculated_params = {}
1354 for parameter in primitive_desc.get("parameter", ()):
1355 param_name = parameter["name"]
1356 if param_name in params:
1357 calculated_params[param_name] = params[param_name]
1358 elif "default-value" in parameter:
1359 calculated_params[param_name] = parameter["default-value"]
1360 if isinstance(parameter["default-value"], str) and parameter["default-value"].startswith("<") and \
1361 parameter["default-value"].endswith(">"):
1362 if parameter["default-value"][1:-1] in instantiation_params:
1363 calculated_params[param_name] = instantiation_params[parameter["default-value"][1:-1]]
1364 else:
1365 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1366 format(parameter["default-value"], primitive_desc["name"]))
1367 else:
1368 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1369 format(param_name, primitive_desc["name"]))
1370
1371 if isinstance(calculated_params[param_name], (dict, list, tuple)):
1372 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
1373 width=256)
1374 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
1375 calculated_params[param_name] = calculated_params[param_name][7:]
1376 return calculated_params
1377
1378 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1379 primitive, primitive_params):
1380 start_primitive_time = time()
1381 try:
1382 for vca_deployed in db_deployed["VCA"]:
1383 if not vca_deployed:
1384 continue
1385 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1386 continue
1387 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1388 continue
1389 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1390 continue
1391 break
1392 else:
1393 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
1394 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1395 model_name = vca_deployed.get("model")
1396 application_name = vca_deployed.get("application")
1397 if not model_name or not application_name:
1398 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
1399 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
1400 vdu_count_index))
1401 if vca_deployed["operational-status"] != "active":
1402 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1403 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1404 callback = None # self.n2vc_callback
1405 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1406 await self.n2vc.login()
1407 primitive_id = await self.n2vc.ExecutePrimitive(
1408 model_name,
1409 application_name,
1410 primitive,
1411 callback,
1412 *callback_args,
1413 **primitive_params
1414 )
1415 while time() - start_primitive_time < self.timeout_primitive:
1416 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
1417 if primitive_result_ == "running":
1418 pass
1419 elif primitive_result_ in ("completed", "failed"):
1420 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
1421 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
1422 break
1423 else:
1424 detailed_result = "Invalid N2VC.GetPrimitiveStatus = {} obtained".format(primitive_result_)
1425 primitive_result = "FAILED"
1426 break
1427 await asyncio.sleep(5)
1428 else:
1429 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
1430 return primitive_result, detailed_result
1431 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
1432 return "FAILED", str(e)
1433
1434 async def action(self, nsr_id, nslcmop_id):
1435 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1436 self.logger.debug(logging_text + "Enter")
1437 # get all needed from database
1438 db_nsr = None
1439 db_nslcmop = None
1440 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1441 db_nslcmop_update = {}
1442 nslcmop_operation_state = None
1443 exc = None
1444 try:
1445 step = "Getting information from database"
1446 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1447 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1448
1449 nsr_deployed = db_nsr["_admin"].get("deployed")
1450 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1451 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1452 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1453 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1454
1455 step = "Getting vnfr from database"
1456 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1457 step = "Getting vnfd from database"
1458 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1459
1460 # look if previous tasks in process
1461 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1462 if task_dependency:
1463 step = db_nslcmop_update["detailed-status"] = \
1464 "Waiting for related tasks to be completed: {}".format(task_name)
1465 self.logger.debug(logging_text + step)
1466 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1467 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1468 if pending:
1469 raise LcmException("Timeout waiting related tasks to be completed")
1470
1471 # for backward compatibility
1472 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1473 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1474 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1475 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1476
1477 primitive = db_nslcmop["operationParams"]["primitive"]
1478 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1479
1480 # look for primitive
1481 config_primitive_desc = None
1482 if vdu_id:
1483 for vdu in get_iterable(db_vnfd, "vdu"):
1484 if vdu_id == vdu["id"]:
1485 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
1486 if config_primitive["name"] == primitive:
1487 config_primitive_desc = config_primitive
1488 break
1489 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1490 if config_primitive["name"] == primitive:
1491 config_primitive_desc = config_primitive
1492 break
1493 if not config_primitive_desc:
1494 raise LcmException("Primitive {} not found at vnf-configuration:config-primitive or vdu:"
1495 "vdu-configuration:config-primitive".format(primitive))
1496
1497 vnfr_params = {}
1498 if db_vnfr.get("additionalParamsForVnf"):
1499 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1500
1501 # TODO check if ns is in a proper status
1502 result, result_detail = await self._ns_execute_primitive(
1503 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1504 self._map_primitive_params(config_primitive_desc, primitive_params, vnfr_params))
1505 db_nslcmop_update["detailed-status"] = result_detail
1506 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1507 db_nslcmop_update["statusEnteredTime"] = time()
1508 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1509 return # database update is called inside finally
1510
1511 except (DbException, LcmException) as e:
1512 self.logger.error(logging_text + "Exit Exception {}".format(e))
1513 exc = e
1514 except asyncio.CancelledError:
1515 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1516 exc = "Operation was cancelled"
1517 except Exception as e:
1518 exc = traceback.format_exc()
1519 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1520 finally:
1521 if exc and db_nslcmop:
1522 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1523 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1524 db_nslcmop_update["statusEnteredTime"] = time()
1525 try:
1526 if db_nslcmop_update:
1527 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1528 if db_nsr:
1529 db_nsr_update["_admin.nslcmop"] = None
1530 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1531 except DbException as e:
1532 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1533 self.logger.debug(logging_text + "Exit")
1534 if nslcmop_operation_state:
1535 try:
1536 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1537 "operationState": nslcmop_operation_state},
1538 loop=self.loop)
1539 except Exception as e:
1540 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1541 self.logger.debug(logging_text + "Exit")
1542 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1543
1544 async def scale(self, nsr_id, nslcmop_id):
1545 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1546 self.logger.debug(logging_text + "Enter")
1547 # get all needed from database
1548 db_nsr = None
1549 db_nslcmop = None
1550 db_nslcmop_update = {}
1551 nslcmop_operation_state = None
1552 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1553 exc = None
1554 # in case of error, indicates what part of scale was failed to put nsr at error status
1555 scale_process = None
1556 old_operational_status = ""
1557 old_config_status = ""
1558 vnfr_scaled = False
1559 try:
1560 step = "Getting nslcmop from database"
1561 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1562 step = "Getting nsr from database"
1563 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1564
1565 old_operational_status = db_nsr["operational-status"]
1566 old_config_status = db_nsr["config-status"]
1567
1568 # look if previous tasks in process
1569 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1570 if task_dependency:
1571 step = db_nslcmop_update["detailed-status"] = \
1572 "Waiting for related tasks to be completed: {}".format(task_name)
1573 self.logger.debug(logging_text + step)
1574 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1575 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1576 if pending:
1577 raise LcmException("Timeout waiting related tasks to be completed")
1578
1579 step = "Parsing scaling parameters"
1580 db_nsr_update["operational-status"] = "scaling"
1581 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1582 nsr_deployed = db_nsr["_admin"].get("deployed")
1583 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1584 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1585 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1586 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1587 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1588
1589 # for backward compatibility
1590 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1591 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1592 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1593 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1594
1595 step = "Getting vnfr from database"
1596 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1597 step = "Getting vnfd from database"
1598 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1599 step = "Getting scaling-group-descriptor"
1600 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1601 if scaling_descriptor["name"] == scaling_group:
1602 break
1603 else:
1604 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1605 "at vnfd:scaling-group-descriptor".format(scaling_group))
1606 # cooldown_time = 0
1607 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1608 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1609 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1610 # break
1611
1612 # TODO check if ns is in a proper status
1613 step = "Sending scale order to RO"
1614 nb_scale_op = 0
1615 if not db_nsr["_admin"].get("scaling-group"):
1616 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1617 admin_scale_index = 0
1618 else:
1619 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1620 if admin_scale_info["name"] == scaling_group:
1621 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1622 break
1623 else: # not found, set index one plus last element and add new entry with the name
1624 admin_scale_index += 1
1625 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1626 RO_scaling_info = []
1627 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1628 if scaling_type == "SCALE_OUT":
1629 # count if max-instance-count is reached
1630 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1631 max_instance_count = int(scaling_descriptor["max-instance-count"])
1632 if nb_scale_op >= max_instance_count:
1633 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1634 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1635 nb_scale_op = nb_scale_op + 1
1636 vdu_scaling_info["scaling_direction"] = "OUT"
1637 vdu_scaling_info["vdu-create"] = {}
1638 for vdu_scale_info in scaling_descriptor["vdu"]:
1639 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1640 "type": "create", "count": vdu_scale_info.get("count", 1)})
1641 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1642 elif scaling_type == "SCALE_IN":
1643 # count if min-instance-count is reached
1644 min_instance_count = 0
1645 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1646 min_instance_count = int(scaling_descriptor["min-instance-count"])
1647 if nb_scale_op <= min_instance_count:
1648 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1649 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1650 nb_scale_op = nb_scale_op - 1
1651 vdu_scaling_info["scaling_direction"] = "IN"
1652 vdu_scaling_info["vdu-delete"] = {}
1653 for vdu_scale_info in scaling_descriptor["vdu"]:
1654 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1655 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1656 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1657
1658 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1659 vdu_create = vdu_scaling_info.get("vdu-create")
1660 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1661 if vdu_scaling_info["scaling_direction"] == "IN":
1662 for vdur in reversed(db_vnfr["vdur"]):
1663 if vdu_delete.get(vdur["vdu-id-ref"]):
1664 vdu_delete[vdur["vdu-id-ref"]] -= 1
1665 vdu_scaling_info["vdu"].append({
1666 "name": vdur["name"],
1667 "vdu_id": vdur["vdu-id-ref"],
1668 "interface": []
1669 })
1670 for interface in vdur["interfaces"]:
1671 vdu_scaling_info["vdu"][-1]["interface"].append({
1672 "name": interface["name"],
1673 "ip_address": interface["ip-address"],
1674 "mac_address": interface.get("mac-address"),
1675 })
1676 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1677
1678 # execute primitive service PRE-SCALING
1679 step = "Executing pre-scale vnf-config-primitive"
1680 if scaling_descriptor.get("scaling-config-action"):
1681 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1682 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1683 and scaling_type == "SCALE_IN":
1684 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1685 step = db_nslcmop_update["detailed-status"] = \
1686 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1687
1688 # look for primitive
1689 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1690 if config_primitive["name"] == vnf_config_primitive:
1691 break
1692 else:
1693 raise LcmException(
1694 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1695 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
1696 "primitive".format(scaling_group, config_primitive))
1697
1698 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1699 if db_vnfr.get("additionalParamsForVnf"):
1700 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1701
1702 scale_process = "VCA"
1703 db_nsr_update["config-status"] = "configuring pre-scaling"
1704 result, result_detail = await self._ns_execute_primitive(
1705 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1706 self._map_primitive_params(config_primitive, {}, vnfr_params))
1707 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1708 vnf_config_primitive, result, result_detail))
1709 if result == "FAILED":
1710 raise LcmException(result_detail)
1711 db_nsr_update["config-status"] = old_config_status
1712 scale_process = None
1713
1714 if RO_scaling_info:
1715 scale_process = "RO"
1716 RO = ROclient.ROClient(self.loop, **self.ro_config)
1717 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1718 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1719 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1720 # wait until ready
1721 RO_nslcmop_id = RO_desc["instance_action_id"]
1722 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1723
1724 RO_task_done = False
1725 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1726 detailed_status_old = None
1727 self.logger.debug(logging_text + step)
1728
1729 deployment_timeout = 1 * 3600 # One hour
1730 while deployment_timeout > 0:
1731 if not RO_task_done:
1732 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1733 extra_item_id=RO_nslcmop_id)
1734 ns_status, ns_status_info = RO.check_action_status(desc)
1735 if ns_status == "ERROR":
1736 raise ROclient.ROClientException(ns_status_info)
1737 elif ns_status == "BUILD":
1738 detailed_status = step + "; {}".format(ns_status_info)
1739 elif ns_status == "ACTIVE":
1740 RO_task_done = True
1741 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1742 self.logger.debug(logging_text + step)
1743 else:
1744 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1745 else:
1746 desc = await RO.show("ns", RO_nsr_id)
1747 ns_status, ns_status_info = RO.check_ns_status(desc)
1748 if ns_status == "ERROR":
1749 raise ROclient.ROClientException(ns_status_info)
1750 elif ns_status == "BUILD":
1751 detailed_status = step + "; {}".format(ns_status_info)
1752 elif ns_status == "ACTIVE":
1753 step = detailed_status = \
1754 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1755 if not vnfr_scaled:
1756 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1757 vnfr_scaled = True
1758 try:
1759 desc = await RO.show("ns", RO_nsr_id)
1760 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1761 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1762 break
1763 except LcmExceptionNoMgmtIP:
1764 pass
1765 else:
1766 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1767 if detailed_status != detailed_status_old:
1768 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1769 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1770
1771 await asyncio.sleep(5, loop=self.loop)
1772 deployment_timeout -= 5
1773 if deployment_timeout <= 0:
1774 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1775
1776 # update VDU_SCALING_INFO with the obtained ip_addresses
1777 if vdu_scaling_info["scaling_direction"] == "OUT":
1778 for vdur in reversed(db_vnfr["vdur"]):
1779 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1780 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1781 vdu_scaling_info["vdu"].append({
1782 "name": vdur["name"],
1783 "vdu_id": vdur["vdu-id-ref"],
1784 "interface": []
1785 })
1786 for interface in vdur["interfaces"]:
1787 vdu_scaling_info["vdu"][-1]["interface"].append({
1788 "name": interface["name"],
1789 "ip_address": interface["ip-address"],
1790 "mac_address": interface.get("mac-address"),
1791 })
1792 del vdu_scaling_info["vdu-create"]
1793
1794 scale_process = None
1795 if db_nsr_update:
1796 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1797
1798 # execute primitive service POST-SCALING
1799 step = "Executing post-scale vnf-config-primitive"
1800 if scaling_descriptor.get("scaling-config-action"):
1801 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1802 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1803 and scaling_type == "SCALE_OUT":
1804 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1805 step = db_nslcmop_update["detailed-status"] = \
1806 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1807
1808 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1809 if db_vnfr.get("additionalParamsForVnf"):
1810 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1811
1812 # look for primitive
1813 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1814 if config_primitive["name"] == vnf_config_primitive:
1815 break
1816 else:
1817 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1818 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1819 "match any vnf-configuration:config-primitive".format(scaling_group,
1820 config_primitive))
1821 scale_process = "VCA"
1822 db_nsr_update["config-status"] = "configuring post-scaling"
1823
1824 result, result_detail = await self._ns_execute_primitive(
1825 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1826 self._map_primitive_params(config_primitive, {}, vnfr_params))
1827 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1828 vnf_config_primitive, result, result_detail))
1829 if result == "FAILED":
1830 raise LcmException(result_detail)
1831 db_nsr_update["config-status"] = old_config_status
1832 scale_process = None
1833
1834 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1835 db_nslcmop_update["statusEnteredTime"] = time()
1836 db_nslcmop_update["detailed-status"] = "done"
1837 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1838 db_nsr_update["operational-status"] = old_operational_status
1839 db_nsr_update["config-status"] = old_config_status
1840 return
1841 except (ROclient.ROClientException, DbException, LcmException) as e:
1842 self.logger.error(logging_text + "Exit Exception {}".format(e))
1843 exc = e
1844 except asyncio.CancelledError:
1845 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1846 exc = "Operation was cancelled"
1847 except Exception as e:
1848 exc = traceback.format_exc()
1849 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1850 finally:
1851 if exc:
1852 if db_nslcmop:
1853 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1854 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1855 db_nslcmop_update["statusEnteredTime"] = time()
1856 if db_nsr:
1857 db_nsr_update["operational-status"] = old_operational_status
1858 db_nsr_update["config-status"] = old_config_status
1859 db_nsr_update["detailed-status"] = ""
1860 db_nsr_update["_admin.nslcmop"] = None
1861 if scale_process:
1862 if "VCA" in scale_process:
1863 db_nsr_update["config-status"] = "failed"
1864 if "RO" in scale_process:
1865 db_nsr_update["operational-status"] = "failed"
1866 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1867 exc)
1868 try:
1869 if db_nslcmop and db_nslcmop_update:
1870 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1871 if db_nsr:
1872 db_nsr_update["_admin.nslcmop"] = None
1873 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1874 except DbException as e:
1875 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1876 if nslcmop_operation_state:
1877 try:
1878 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1879 "operationState": nslcmop_operation_state},
1880 loop=self.loop)
1881 # if cooldown_time:
1882 # await asyncio.sleep(cooldown_time)
1883 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1884 except Exception as e:
1885 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1886 self.logger.debug(logging_text + "Exit")
1887 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")