On ns/netslice terminate, sends to kafka for NBI to delete
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 2 * 3600 # global timeout for deployment
73 timeout_charm_delete = 10 * 60
74 timeout_primitive = 10 * 60 # timeout for primitive execution
75
76 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 # logging
83 self.logger = logging.getLogger('lcm.ns')
84 self.loop = loop
85 self.lcm_tasks = lcm_tasks
86
87 super().__init__(db, msg, fs, self.logger)
88
89 self.ro_config = ro_config
90
91 self.n2vc = N2VC(
92 log=self.logger,
93 server=vca_config['host'],
94 port=vca_config['port'],
95 user=vca_config['user'],
96 secret=vca_config['secret'],
97 # TODO: This should point to the base folder where charms are stored,
98 # if there is a common one (like object storage). Otherwise, leave
99 # it unset and pass it via DeployCharms
100 # artifacts=vca_config[''],
101 artifacts=None,
102 )
103
104 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
105 """
106 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
107 :param vnfd: input vnfd
108 :param new_id: overrides vnf id if provided
109 :param additionalParams: Instantiation params for VNFs provided
110 :param nsrId: Id of the NSR
111 :return: copy of vnfd
112 """
113 try:
114 vnfd_RO = deepcopy(vnfd)
115 # remove unused by RO configuration, monitoring, scaling and internal keys
116 vnfd_RO.pop("_id", None)
117 vnfd_RO.pop("_admin", None)
118 vnfd_RO.pop("vnf-configuration", None)
119 vnfd_RO.pop("monitoring-param", None)
120 vnfd_RO.pop("scaling-group-descriptor", None)
121 if new_id:
122 vnfd_RO["id"] = new_id
123
124 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
125 for vdu in get_iterable(vnfd_RO, "vdu"):
126 cloud_init_file = None
127 if vdu.get("cloud-init-file"):
128 base_folder = vnfd["_admin"]["storage"]
129 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
130 vdu["cloud-init-file"])
131 with self.fs.file_open(cloud_init_file, "r") as ci_file:
132 cloud_init_content = ci_file.read()
133 vdu.pop("cloud-init-file", None)
134 elif vdu.get("cloud-init"):
135 cloud_init_content = vdu["cloud-init"]
136 else:
137 continue
138
139 env = Environment()
140 ast = env.parse(cloud_init_content)
141 mandatory_vars = meta.find_undeclared_variables(ast)
142 if mandatory_vars:
143 for var in mandatory_vars:
144 if not additionalParams or var not in additionalParams.keys():
145 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
146 "file, must be provided in the instantiation parameters inside the "
147 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
148 template = Template(cloud_init_content)
149 cloud_init_content = template.render(additionalParams or {})
150 vdu["cloud-init"] = cloud_init_content
151
152 return vnfd_RO
153 except FsException as e:
154 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
155 format(vnfd["id"], vdu["id"], cloud_init_file, e))
156 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
157 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
158 format(vnfd["id"], vdu["id"], e))
159
160 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
161 """
162 Callback both for charm status change and task completion
163 :param model_name: Charm model name
164 :param application_name: Charm application name
165 :param status: Can be
166 - blocked: The unit needs manual intervention
167 - maintenance: The unit is actively deploying/configuring
168 - waiting: The unit is waiting for another charm to be ready
169 - active: The unit is deployed, configured, and ready
170 - error: The charm has failed and needs attention.
171 - terminated: The charm has been destroyed
172 - removing,
173 - removed
174 :param message: detailed message error
175 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
176 nsr_id:
177 nslcmop_id:
178 lcmOperationType: currently "instantiate"
179 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
180 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
181 n2vc_event: event used to notify instantiation task that some change has been produced
182 :param task: None for charm status change, or task for completion task callback
183 :return:
184 """
185 try:
186 nsr_id = n2vc_info["nsr_id"]
187 deployed = n2vc_info["deployed"]
188 db_nsr_update = n2vc_info["db_update"]
189 nslcmop_id = n2vc_info["nslcmop_id"]
190 ns_operation = n2vc_info["lcmOperationType"]
191 n2vc_event = n2vc_info["n2vc_event"]
192 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
193 application_name)
194 for vca_index, vca_deployed in enumerate(deployed):
195 if not vca_deployed:
196 continue
197 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
198 break
199 else:
200 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
201 return
202 if task:
203 if task.cancelled():
204 self.logger.debug(logging_text + " task Cancelled")
205 vca_deployed['operational-status'] = "error"
206 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
207 vca_deployed['detailed-status'] = "Task Cancelled"
208 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
209
210 elif task.done():
211 exc = task.exception()
212 if exc:
213 self.logger.error(logging_text + " task Exception={}".format(exc))
214 vca_deployed['operational-status'] = "error"
215 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
216 vca_deployed['detailed-status'] = str(exc)
217 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
218 else:
219 self.logger.debug(logging_text + " task Done")
220 # task is Done, but callback is still ongoing. So ignore
221 return
222 elif status:
223 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
224 if vca_deployed['operational-status'] == status:
225 return # same status, ignore
226 vca_deployed['operational-status'] = status
227 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
228 vca_deployed['detailed-status'] = str(message)
229 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
230 else:
231 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
232 return
233 # wake up instantiate task
234 n2vc_event.set()
235 except Exception as e:
236 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
237
238 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
239 """
240 Creates a RO ns descriptor from OSM ns_instantiate params
241 :param ns_params: OSM instantiate params
242 :return: The RO ns descriptor
243 """
244 vim_2_RO = {}
245 # TODO feature 1417: Check that no instantiation is set over PDU
246 # check if PDU forces a concrete vim-network-id and add it
247 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
248
249 def vim_account_2_RO(vim_account):
250 if vim_account in vim_2_RO:
251 return vim_2_RO[vim_account]
252
253 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
254 if db_vim["_admin"]["operationalState"] != "ENABLED":
255 raise LcmException("VIM={} is not available. operationalState={}".format(
256 vim_account, db_vim["_admin"]["operationalState"]))
257 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
258 vim_2_RO[vim_account] = RO_vim_id
259 return RO_vim_id
260
261 def ip_profile_2_RO(ip_profile):
262 RO_ip_profile = deepcopy((ip_profile))
263 if "dns-server" in RO_ip_profile:
264 if isinstance(RO_ip_profile["dns-server"], list):
265 RO_ip_profile["dns-address"] = []
266 for ds in RO_ip_profile.pop("dns-server"):
267 RO_ip_profile["dns-address"].append(ds['address'])
268 else:
269 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
270 if RO_ip_profile.get("ip-version") == "ipv4":
271 RO_ip_profile["ip-version"] = "IPv4"
272 if RO_ip_profile.get("ip-version") == "ipv6":
273 RO_ip_profile["ip-version"] = "IPv6"
274 if "dhcp-params" in RO_ip_profile:
275 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
276 return RO_ip_profile
277
278 if not ns_params:
279 return None
280 RO_ns_params = {
281 # "name": ns_params["nsName"],
282 # "description": ns_params.get("nsDescription"),
283 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
284 # "scenario": ns_params["nsdId"],
285 }
286 if n2vc_key_list:
287 for vnfd_ref, vnfd in vnfd_dict.items():
288 vdu_needed_access = []
289 mgmt_cp = None
290 if vnfd.get("vnf-configuration"):
291 if vnfd.get("mgmt-interface"):
292 if vnfd["mgmt-interface"].get("vdu-id"):
293 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
294 elif vnfd["mgmt-interface"].get("cp"):
295 mgmt_cp = vnfd["mgmt-interface"]["cp"]
296
297 for vdu in vnfd.get("vdu", ()):
298 if vdu.get("vdu-configuration"):
299 vdu_needed_access.append(vdu["id"])
300 elif mgmt_cp:
301 for vdu_interface in vdu.get("interface"):
302 if vdu_interface.get("external-connection-point-ref") and \
303 vdu_interface["external-connection-point-ref"] == mgmt_cp:
304 vdu_needed_access.append(vdu["id"])
305 mgmt_cp = None
306 break
307
308 if vdu_needed_access:
309 for vnf_member in nsd.get("constituent-vnfd"):
310 if vnf_member["vnfd-id-ref"] != vnfd_ref:
311 continue
312 for vdu in vdu_needed_access:
313 populate_dict(RO_ns_params,
314 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
315 n2vc_key_list)
316
317 if ns_params.get("vduImage"):
318 RO_ns_params["vduImage"] = ns_params["vduImage"]
319
320 if ns_params.get("ssh_keys"):
321 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
322 for vnf_params in get_iterable(ns_params, "vnf"):
323 for constituent_vnfd in nsd["constituent-vnfd"]:
324 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
325 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
326 break
327 else:
328 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
329 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
330 if vnf_params.get("vimAccountId"):
331 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
332 vim_account_2_RO(vnf_params["vimAccountId"]))
333
334 for vdu_params in get_iterable(vnf_params, "vdu"):
335 # TODO feature 1417: check that this VDU exist and it is not a PDU
336 if vdu_params.get("volume"):
337 for volume_params in vdu_params["volume"]:
338 if volume_params.get("vim-volume-id"):
339 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
340 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
341 volume_params["vim-volume-id"])
342 if vdu_params.get("interface"):
343 for interface_params in vdu_params["interface"]:
344 if interface_params.get("ip-address"):
345 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
346 vdu_params["id"], "interfaces", interface_params["name"],
347 "ip_address"),
348 interface_params["ip-address"])
349 if interface_params.get("mac-address"):
350 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
351 vdu_params["id"], "interfaces", interface_params["name"],
352 "mac_address"),
353 interface_params["mac-address"])
354 if interface_params.get("floating-ip-required"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "interfaces", interface_params["name"],
357 "floating-ip"),
358 interface_params["floating-ip-required"])
359
360 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
361 if internal_vld_params.get("vim-network-name"):
362 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
363 internal_vld_params["name"], "vim-network-name"),
364 internal_vld_params["vim-network-name"])
365 if internal_vld_params.get("vim-network-id"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
367 internal_vld_params["name"], "vim-network-id"),
368 internal_vld_params["vim-network-id"])
369 if internal_vld_params.get("ip-profile"):
370 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
371 internal_vld_params["name"], "ip-profile"),
372 ip_profile_2_RO(internal_vld_params["ip-profile"]))
373
374 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
375 # look for interface
376 iface_found = False
377 for vdu_descriptor in vnf_descriptor["vdu"]:
378 for vdu_interface in vdu_descriptor["interface"]:
379 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
380 if icp_params.get("ip-address"):
381 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
382 vdu_descriptor["id"], "interfaces",
383 vdu_interface["name"], "ip_address"),
384 icp_params["ip-address"])
385
386 if icp_params.get("mac-address"):
387 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
388 vdu_descriptor["id"], "interfaces",
389 vdu_interface["name"], "mac_address"),
390 icp_params["mac-address"])
391 iface_found = True
392 break
393 if iface_found:
394 break
395 else:
396 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
397 "internal-vld:id-ref={} is not present at vnfd:internal-"
398 "connection-point".format(vnf_params["member-vnf-index"],
399 icp_params["id-ref"]))
400
401 for vld_params in get_iterable(ns_params, "vld"):
402 if "ip-profile" in vld_params:
403 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
404 ip_profile_2_RO(vld_params["ip-profile"]))
405 if vld_params.get("vim-network-name"):
406 RO_vld_sites = []
407 if isinstance(vld_params["vim-network-name"], dict):
408 for vim_account, vim_net in vld_params["vim-network-name"].items():
409 RO_vld_sites.append({
410 "netmap-use": vim_net,
411 "datacenter": vim_account_2_RO(vim_account)
412 })
413 else: # isinstance str
414 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
415 if RO_vld_sites:
416 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
417 if vld_params.get("vim-network-id"):
418 RO_vld_sites = []
419 if isinstance(vld_params["vim-network-id"], dict):
420 for vim_account, vim_net in vld_params["vim-network-id"].items():
421 RO_vld_sites.append({
422 "netmap-use": vim_net,
423 "datacenter": vim_account_2_RO(vim_account)
424 })
425 else: # isinstance str
426 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
427 if RO_vld_sites:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
429 if "vnfd-connection-point-ref" in vld_params:
430 for cp_params in vld_params["vnfd-connection-point-ref"]:
431 # look for interface
432 for constituent_vnfd in nsd["constituent-vnfd"]:
433 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
434 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
435 break
436 else:
437 raise LcmException(
438 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
439 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
440 match_cp = False
441 for vdu_descriptor in vnf_descriptor["vdu"]:
442 for interface_descriptor in vdu_descriptor["interface"]:
443 if interface_descriptor.get("external-connection-point-ref") == \
444 cp_params["vnfd-connection-point-ref"]:
445 match_cp = True
446 break
447 if match_cp:
448 break
449 else:
450 raise LcmException(
451 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
452 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
453 cp_params["member-vnf-index-ref"],
454 cp_params["vnfd-connection-point-ref"],
455 vnf_descriptor["id"]))
456 if cp_params.get("ip-address"):
457 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
458 vdu_descriptor["id"], "interfaces",
459 interface_descriptor["name"], "ip_address"),
460 cp_params["ip-address"])
461 if cp_params.get("mac-address"):
462 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
463 vdu_descriptor["id"], "interfaces",
464 interface_descriptor["name"], "mac_address"),
465 cp_params["mac-address"])
466 return RO_ns_params
467
468 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
469 # make a copy to do not change
470 vdu_create = copy(vdu_create)
471 vdu_delete = copy(vdu_delete)
472
473 vdurs = db_vnfr.get("vdur")
474 if vdurs is None:
475 vdurs = []
476 vdu_index = len(vdurs)
477 while vdu_index:
478 vdu_index -= 1
479 vdur = vdurs[vdu_index]
480 if vdur.get("pdu-type"):
481 continue
482 vdu_id_ref = vdur["vdu-id-ref"]
483 if vdu_create and vdu_create.get(vdu_id_ref):
484 for index in range(0, vdu_create[vdu_id_ref]):
485 vdur = deepcopy(vdur)
486 vdur["_id"] = str(uuid4())
487 vdur["count-index"] += 1
488 vdurs.insert(vdu_index+1+index, vdur)
489 del vdu_create[vdu_id_ref]
490 if vdu_delete and vdu_delete.get(vdu_id_ref):
491 del vdurs[vdu_index]
492 vdu_delete[vdu_id_ref] -= 1
493 if not vdu_delete[vdu_id_ref]:
494 del vdu_delete[vdu_id_ref]
495 # check all operations are done
496 if vdu_create or vdu_delete:
497 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
498 vdu_create))
499 if vdu_delete:
500 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
501 vdu_delete))
502
503 vnfr_update = {"vdur": vdurs}
504 db_vnfr["vdur"] = vdurs
505 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
506
507 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
508 """
509 Updates database nsr with the RO info for the created vld
510 :param ns_update_nsr: dictionary to be filled with the updated info
511 :param db_nsr: content of db_nsr. This is also modified
512 :param nsr_desc_RO: nsr descriptor from RO
513 :return: Nothing, LcmException is raised on errors
514 """
515
516 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
517 for net_RO in get_iterable(nsr_desc_RO, "nets"):
518 if vld["id"] != net_RO.get("ns_net_osm_id"):
519 continue
520 vld["vim-id"] = net_RO.get("vim_net_id")
521 vld["name"] = net_RO.get("vim_name")
522 vld["status"] = net_RO.get("status")
523 vld["status-detailed"] = net_RO.get("error_msg")
524 ns_update_nsr["vld.{}".format(vld_index)] = vld
525 break
526 else:
527 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
528
529 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
530 """
531 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
532 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
533 :param nsr_desc_RO: nsr descriptor from RO
534 :return: Nothing, LcmException is raised on errors
535 """
536 for vnf_index, db_vnfr in db_vnfrs.items():
537 for vnf_RO in nsr_desc_RO["vnfs"]:
538 if vnf_RO["member_vnf_index"] != vnf_index:
539 continue
540 vnfr_update = {}
541 if vnf_RO.get("ip_address"):
542 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
543 elif not db_vnfr.get("ip-address"):
544 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
545
546 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
547 vdur_RO_count_index = 0
548 if vdur.get("pdu-type"):
549 continue
550 for vdur_RO in get_iterable(vnf_RO, "vms"):
551 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
552 continue
553 if vdur["count-index"] != vdur_RO_count_index:
554 vdur_RO_count_index += 1
555 continue
556 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
557 vdur["ip-address"] = vdur_RO.get("ip_address")
558 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
559 vdur["name"] = vdur_RO.get("vim_name")
560 vdur["status"] = vdur_RO.get("status")
561 vdur["status-detailed"] = vdur_RO.get("error_msg")
562 for ifacer in get_iterable(vdur, "interfaces"):
563 for interface_RO in get_iterable(vdur_RO, "interfaces"):
564 if ifacer["name"] == interface_RO.get("internal_name"):
565 ifacer["ip-address"] = interface_RO.get("ip_address")
566 ifacer["mac-address"] = interface_RO.get("mac_address")
567 break
568 else:
569 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
570 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
571 vnfr_update["vdur.{}".format(vdu_index)] = vdur
572 break
573 else:
574 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
575 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
576
577 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
578 for net_RO in get_iterable(nsr_desc_RO, "nets"):
579 if vld["id"] != net_RO.get("vnf_net_osm_id"):
580 continue
581 vld["vim-id"] = net_RO.get("vim_net_id")
582 vld["name"] = net_RO.get("vim_name")
583 vld["status"] = net_RO.get("status")
584 vld["status-detailed"] = net_RO.get("error_msg")
585 vnfr_update["vld.{}".format(vld_index)] = vld
586 break
587 else:
588 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
589 vnf_index, vld["id"]))
590
591 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
592 break
593
594 else:
595 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
596
597 async def instantiate(self, nsr_id, nslcmop_id):
598 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
599 self.logger.debug(logging_text + "Enter")
600 # get all needed from database
601 start_deploy = time()
602 db_nsr = None
603 db_nslcmop = None
604 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
605 db_nslcmop_update = {}
606 nslcmop_operation_state = None
607 db_vnfrs = {}
608 RO_descriptor_number = 0 # number of descriptors created at RO
609 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
610 n2vc_info = {}
611 exc = None
612 try:
613 step = "Getting nslcmop={} from db".format(nslcmop_id)
614 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
615 step = "Getting nsr={} from db".format(nsr_id)
616 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
617 ns_params = db_nslcmop.get("operationParams")
618 nsd = db_nsr["nsd"]
619 nsr_name = db_nsr["name"] # TODO short-name??
620
621 # look if previous tasks in process
622 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
623 if task_dependency:
624 step = db_nslcmop_update["detailed-status"] = \
625 "Waiting for related tasks to be completed: {}".format(task_name)
626 self.logger.debug(logging_text + step)
627 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
628 _, pending = await asyncio.wait(task_dependency, timeout=3600)
629 if pending:
630 raise LcmException("Timeout waiting related tasks to be completed")
631
632 step = "Getting vnfrs from db"
633 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
634 db_vnfds_ref = {}
635 db_vnfds = {}
636 for vnfr in db_vnfrs_list:
637 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
638 vnfd_id = vnfr["vnfd-id"]
639 vnfd_ref = vnfr["vnfd-ref"]
640 if vnfd_id not in db_vnfds:
641 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
642 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
643 db_vnfds_ref[vnfd_ref] = vnfd
644 db_vnfds[vnfd_id] = vnfd
645
646 # Get or generates the _admin.deployed,VCA list
647 vca_deployed_list = None
648 if db_nsr["_admin"].get("deployed"):
649 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
650 if vca_deployed_list is None:
651 vca_deployed_list = []
652 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
653 elif isinstance(vca_deployed_list, dict):
654 # maintain backward compatibility. Change a dict to list at database
655 vca_deployed_list = list(vca_deployed_list.values())
656 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
657
658 db_nsr_update["detailed-status"] = "creating"
659 db_nsr_update["operational-status"] = "init"
660
661 RO = ROclient.ROClient(self.loop, **self.ro_config)
662
663 # set state to INSTANTIATED. When instantiated NBI will not delete directly
664 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
665 self.update_db_2("nsrs", nsr_id, db_nsr_update)
666
667 # get vnfds, instantiate at RO
668 for c_vnf in nsd.get("constituent-vnfd", ()):
669 member_vnf_index = c_vnf["member-vnf-index"]
670 vnfd_ref = vnfd["id"]
671 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
672 # self.logger.debug(logging_text + step)
673 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
674 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
675 RO_descriptor_number += 1
676
677 # look if present
678 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
679 if vnfd_list:
680 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
681 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
682 vnfd_ref, vnfd_list[0]["uuid"]))
683 else:
684 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
685 get("additionalParamsForVnf"), nsr_id)
686 desc = await RO.create("vnfd", descriptor=vnfd_RO)
687 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
688 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
689 vnfd_ref, desc["uuid"]))
690 self.update_db_2("nsrs", nsr_id, db_nsr_update)
691
692 # create nsd at RO
693 nsd_ref = nsd["id"]
694 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
695 # self.logger.debug(logging_text + step)
696
697 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
698 RO_descriptor_number += 1
699 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
700 if nsd_list:
701 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
702 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
703 nsd_ref, RO_nsd_uuid))
704 else:
705 nsd_RO = deepcopy(nsd)
706 nsd_RO["id"] = RO_osm_nsd_id
707 nsd_RO.pop("_id", None)
708 nsd_RO.pop("_admin", None)
709 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
710 member_vnf_index = c_vnf["member-vnf-index"]
711 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
712 for c_vld in nsd_RO.get("vld", ()):
713 for cp in c_vld.get("vnfd-connection-point-ref", ()):
714 member_vnf_index = cp["member-vnf-index-ref"]
715 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
716
717 desc = await RO.create("nsd", descriptor=nsd_RO)
718 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
719 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
720 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
721 self.update_db_2("nsrs", nsr_id, db_nsr_update)
722
723 # Crate ns at RO
724 # if present use it unless in error status
725 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
726 if RO_nsr_id:
727 try:
728 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
729 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
730 desc = await RO.show("ns", RO_nsr_id)
731 except ROclient.ROClientException as e:
732 if e.http_code != HTTPStatus.NOT_FOUND:
733 raise
734 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
735 if RO_nsr_id:
736 ns_status, ns_status_info = RO.check_ns_status(desc)
737 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
738 if ns_status == "ERROR":
739 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
740 self.logger.debug(logging_text + step)
741 await RO.delete("ns", RO_nsr_id)
742 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
743 if not RO_nsr_id:
744 step = db_nsr_update["detailed-status"] = "Checking dependencies"
745 # self.logger.debug(logging_text + step)
746
747 # check if VIM is creating and wait look if previous tasks in process
748 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
749 if task_dependency:
750 step = "Waiting for related tasks to be completed: {}".format(task_name)
751 self.logger.debug(logging_text + step)
752 await asyncio.wait(task_dependency, timeout=3600)
753 if ns_params.get("vnf"):
754 for vnf in ns_params["vnf"]:
755 if "vimAccountId" in vnf:
756 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
757 vnf["vimAccountId"])
758 if task_dependency:
759 step = "Waiting for related tasks to be completed: {}".format(task_name)
760 self.logger.debug(logging_text + step)
761 await asyncio.wait(task_dependency, timeout=3600)
762
763 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
764
765 # feature 1429. Add n2vc public key to needed VMs
766 n2vc_key = await self.n2vc.GetPublicKey()
767 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
768
769 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
770 desc = await RO.create("ns", descriptor=RO_ns_params,
771 name=db_nsr["name"],
772 scenario=RO_nsd_uuid)
773 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
774 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
775 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
776 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
777 self.update_db_2("nsrs", nsr_id, db_nsr_update)
778
779 # wait until NS is ready
780 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
781 detailed_status_old = None
782 self.logger.debug(logging_text + step)
783
784 while time() <= start_deploy + self.total_deploy_timeout:
785 desc = await RO.show("ns", RO_nsr_id)
786 ns_status, ns_status_info = RO.check_ns_status(desc)
787 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
788 if ns_status == "ERROR":
789 raise ROclient.ROClientException(ns_status_info)
790 elif ns_status == "BUILD":
791 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
792 elif ns_status == "ACTIVE":
793 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
794 try:
795 self.ns_update_vnfr(db_vnfrs, desc)
796 break
797 except LcmExceptionNoMgmtIP:
798 pass
799 else:
800 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
801 if detailed_status != detailed_status_old:
802 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
803 self.update_db_2("nsrs", nsr_id, db_nsr_update)
804 await asyncio.sleep(5, loop=self.loop)
805 else: # total_deploy_timeout
806 raise ROclient.ROClientException("Timeout waiting ns to be ready")
807
808 step = "Updating NSR"
809 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
810
811 db_nsr["detailed-status"] = "Configuring vnfr"
812 self.update_db_2("nsrs", nsr_id, db_nsr_update)
813
814 # The parameters we'll need to deploy a charm
815 number_to_configure = 0
816
817 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info):
818 """An inner function to deploy the charm from either vnf or vdu
819 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
820 """
821 if not charm_params["rw_mgmt_ip"]:
822 raise LcmException("vnfd/vdu has not management ip address to configure it")
823 # Login to the VCA.
824 # if number_to_configure == 0:
825 # self.logger.debug("Logging into N2VC...")
826 # task = asyncio.ensure_future(self.n2vc.login())
827 # yield from asyncio.wait_for(task, 30.0)
828 # self.logger.debug("Logged into N2VC!")
829
830 # # await self.n2vc.login()
831
832 # Note: The charm needs to exist on disk at the location
833 # specified by charm_path.
834 base_folder = vnfd["_admin"]["storage"]
835 storage_params = self.fs.get_params()
836 charm_path = "{}{}/{}/charms/{}".format(
837 storage_params["path"],
838 base_folder["folder"],
839 base_folder["pkg-dir"],
840 proxy_charm
841 )
842
843 # ns_name will be ignored in the current version of N2VC
844 # but will be implemented for the next point release.
845 model_name = "default" # TODO bug 585 nsr_id
846 if vdu_id:
847 vdu_id_text = vdu_id + "-"
848 else:
849 vdu_id_text = "-"
850 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
851
852 vca_index = len(vca_deployed_list)
853 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
854 # 26*26 charm in the same NS
855 application_name = application_name[0:48]
856 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
857 vca_deployed_ = {
858 "member-vnf-index": vnf_index,
859 "vdu_id": vdu_id,
860 "model": model_name,
861 "application": application_name,
862 "operational-status": "init",
863 "detailed-status": "",
864 "vnfd_id": vnfd_id,
865 "vdu_name": vdu_name,
866 "vdu_count_index": vdu_count_index,
867 }
868 vca_deployed_list.append(vca_deployed_)
869 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
870 self.update_db_2("nsrs", nsr_id, db_nsr_update)
871
872 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
873 proxy_charm))
874 if not n2vc_info:
875 n2vc_info["nsr_id"] = nsr_id
876 n2vc_info["nslcmop_id"] = nslcmop_id
877 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
878 n2vc_info["lcmOperationType"] = "instantiate"
879 n2vc_info["deployed"] = vca_deployed_list
880 n2vc_info["db_update"] = db_nsr_update
881 task = asyncio.ensure_future(
882 self.n2vc.DeployCharms(
883 model_name, # The network service name
884 application_name, # The application name
885 vnfd, # The vnf descriptor
886 charm_path, # Path to charm
887 charm_params, # Runtime params, like mgmt ip
888 {}, # for native charms only
889 self.n2vc_callback, # Callback for status changes
890 n2vc_info, # Callback parameter
891 None, # Callback parameter (task)
892 )
893 )
894 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
895 n2vc_info))
896 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
897
898 step = "Looking for needed vnfd to configure"
899 self.logger.debug(logging_text + step)
900
901 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
902 vnfd_id = c_vnf["vnfd-id-ref"]
903 vnf_index = str(c_vnf["member-vnf-index"])
904 vnfd = db_vnfds_ref[vnfd_id]
905
906 # Get additional parameters
907 vnfr_params = {}
908 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
909 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
910 for k, v in vnfr_params.items():
911 if isinstance(v, str) and v.startswith("!!yaml "):
912 vnfr_params[k] = yaml.safe_load(v[7:])
913
914 # Check if this VNF has a charm configuration
915 vnf_config = vnfd.get("vnf-configuration")
916 if vnf_config and vnf_config.get("juju"):
917 proxy_charm = vnf_config["juju"]["charm"]
918
919 if proxy_charm:
920 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
921 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
922 charm_params = {
923 "user_values": vnfr_params,
924 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
925 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {}
926 }
927
928 # Login to the VCA. If there are multiple calls to login(),
929 # subsequent calls will be a nop and return immediately.
930 await self.n2vc.login()
931 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
932 number_to_configure += 1
933
934 # Deploy charms for each VDU that supports one.
935 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
936 vdu_config = vdu.get('vdu-configuration')
937 proxy_charm = None
938
939 if vdu_config and vdu_config.get("juju"):
940 proxy_charm = vdu_config["juju"]["charm"]
941
942 if proxy_charm:
943 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
944 await self.n2vc.login()
945 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
946 # TODO for the moment only first vdu_id contains a charm deployed
947 if vdur["vdu-id-ref"] != vdu["id"]:
948 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
949 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
950 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
951 charm_params = {
952 "user_values": vnfr_params,
953 "rw_mgmt_ip": vdur["ip-address"],
954 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
955 }
956 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
957 charm_params, n2vc_info)
958 number_to_configure += 1
959
960 db_nsr_update["operational-status"] = "running"
961 configuration_failed = False
962 if number_to_configure:
963 old_status = "configuring: init: {}".format(number_to_configure)
964 db_nsr_update["config-status"] = old_status
965 db_nsr_update["detailed-status"] = old_status
966 db_nslcmop_update["detailed-status"] = old_status
967
968 # wait until all are configured.
969 while time() <= start_deploy + self.total_deploy_timeout:
970 if db_nsr_update:
971 self.update_db_2("nsrs", nsr_id, db_nsr_update)
972 if db_nslcmop_update:
973 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
974 # TODO add a fake task that set n2vc_event after some time
975 await n2vc_info["n2vc_event"].wait()
976 n2vc_info["n2vc_event"].clear()
977 all_active = True
978 status_map = {}
979 n2vc_error_text = [] # contain text error list. If empty no one is in error status
980 now = time()
981 for vca_deployed in vca_deployed_list:
982 vca_status = vca_deployed["operational-status"]
983 if vca_status not in status_map:
984 # Initialize it
985 status_map[vca_status] = 0
986 status_map[vca_status] += 1
987
988 if vca_status == "active":
989 vca_deployed.pop("time_first_error", None)
990 vca_deployed.pop("status_first_error", None)
991 continue
992
993 all_active = False
994 if vca_status in ("error", "blocked"):
995 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
996 # if not first time in this status error
997 if not vca_deployed.get("time_first_error"):
998 vca_deployed["time_first_error"] = now
999 continue
1000 if vca_deployed.get("time_first_error") and \
1001 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1002 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1003 .format(vca_deployed["member-vnf-index"],
1004 vca_deployed["vdu_id"], vca_status,
1005 vca_deployed["detailed-status-error"]))
1006
1007 if all_active:
1008 break
1009 elif n2vc_error_text:
1010 db_nsr_update["config-status"] = "failed"
1011 error_text = "fail configuring " + ";".join(n2vc_error_text)
1012 db_nsr_update["detailed-status"] = error_text
1013 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1014 db_nslcmop_update["detailed-status"] = error_text
1015 db_nslcmop_update["statusEnteredTime"] = time()
1016 configuration_failed = True
1017 break
1018 else:
1019 cs = "configuring: "
1020 separator = ""
1021 for status, num in status_map.items():
1022 cs += separator + "{}: {}".format(status, num)
1023 separator = ", "
1024 if old_status != cs:
1025 db_nsr_update["config-status"] = cs
1026 db_nsr_update["detailed-status"] = cs
1027 db_nslcmop_update["detailed-status"] = cs
1028 old_status = cs
1029 else: # total_deploy_timeout
1030 raise LcmException("Timeout waiting ns to be configured")
1031
1032 if not configuration_failed:
1033 # all is done
1034 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1035 db_nslcmop_update["statusEnteredTime"] = time()
1036 db_nslcmop_update["detailed-status"] = "done"
1037 db_nsr_update["config-status"] = "configured"
1038 db_nsr_update["detailed-status"] = "done"
1039
1040 return
1041
1042 except (ROclient.ROClientException, DbException, LcmException) as e:
1043 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1044 exc = e
1045 except asyncio.CancelledError:
1046 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1047 exc = "Operation was cancelled"
1048 except Exception as e:
1049 exc = traceback.format_exc()
1050 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1051 exc_info=True)
1052 finally:
1053 if exc:
1054 if db_nsr:
1055 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1056 db_nsr_update["operational-status"] = "failed"
1057 if db_nslcmop:
1058 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1059 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1060 db_nslcmop_update["statusEnteredTime"] = time()
1061 try:
1062 if db_nsr:
1063 db_nsr_update["_admin.nslcmop"] = None
1064 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1065 if db_nslcmop_update:
1066 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1067 except DbException as e:
1068 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1069 if nslcmop_operation_state:
1070 try:
1071 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1072 "operationState": nslcmop_operation_state},
1073 loop=self.loop)
1074 except Exception as e:
1075 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1076
1077 self.logger.debug(logging_text + "Exit")
1078 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1079
1080 async def _destroy_charm(self, model, application):
1081 """
1082 Order N2VC destroy a charm
1083 :param model:
1084 :param application:
1085 :return: True if charm does not exist. False if it exist
1086 """
1087 if not await self.n2vc.HasApplication(model, application):
1088 return True # Already removed
1089 await self.n2vc.RemoveCharms(model, application)
1090 return False
1091
1092 async def _wait_charm_destroyed(self, model, application, timeout):
1093 """
1094 Wait until charm does not exist
1095 :param model:
1096 :param application:
1097 :param timeout:
1098 :return: True if not exist, False if timeout
1099 """
1100 while True:
1101 if not await self.n2vc.HasApplication(model, application):
1102 return True
1103 if timeout < 0:
1104 return False
1105 await asyncio.sleep(10)
1106 timeout -= 10
1107
1108 async def terminate(self, nsr_id, nslcmop_id):
1109 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1110 self.logger.debug(logging_text + "Enter")
1111 db_nsr = None
1112 db_nslcmop = None
1113 exc = None
1114 failed_detail = [] # annotates all failed error messages
1115 vca_time_destroy = None # time of where destroy charm order
1116 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1117 db_nslcmop_update = {}
1118 nslcmop_operation_state = None
1119 autoremove = False # autoremove after terminated
1120 try:
1121 step = "Getting nslcmop={} from db".format(nslcmop_id)
1122 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1123 step = "Getting nsr={} from db".format(nsr_id)
1124 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1125 # nsd = db_nsr["nsd"]
1126 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1127 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1128 return
1129 # #TODO check if VIM is creating and wait
1130 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1131
1132 db_nsr_update["operational-status"] = "terminating"
1133 db_nsr_update["config-status"] = "terminating"
1134
1135 if nsr_deployed and nsr_deployed.get("VCA"):
1136 try:
1137 step = "Scheduling configuration charms removing"
1138 db_nsr_update["detailed-status"] = "Deleting charms"
1139 self.logger.debug(logging_text + step)
1140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1141 # for backward compatibility
1142 if isinstance(nsr_deployed["VCA"], dict):
1143 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1144 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1145 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1146
1147 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1148 if vca_deployed:
1149 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1150 vca_deployed.clear()
1151 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1152 else:
1153 vca_time_destroy = time()
1154 except Exception as e:
1155 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1156
1157 # remove from RO
1158 RO_fail = False
1159 RO = ROclient.ROClient(self.loop, **self.ro_config)
1160
1161 # Delete ns
1162 RO_nsr_id = RO_delete_action = None
1163 if nsr_deployed and nsr_deployed.get("RO"):
1164 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1165 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1166 try:
1167 if RO_nsr_id:
1168 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1169 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1170 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1171 self.logger.debug(logging_text + step)
1172 desc = await RO.delete("ns", RO_nsr_id)
1173 RO_delete_action = desc["action_id"]
1174 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1175 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1176 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1177 if RO_delete_action:
1178 # wait until NS is deleted from VIM
1179 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1180 format(RO_nsr_id, RO_delete_action)
1181 detailed_status_old = None
1182 self.logger.debug(logging_text + step)
1183
1184 delete_timeout = 20 * 60 # 20 minutes
1185 while delete_timeout > 0:
1186 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1187 extra_item_id=RO_delete_action)
1188 ns_status, ns_status_info = RO.check_action_status(desc)
1189 if ns_status == "ERROR":
1190 raise ROclient.ROClientException(ns_status_info)
1191 elif ns_status == "BUILD":
1192 detailed_status = step + "; {}".format(ns_status_info)
1193 elif ns_status == "ACTIVE":
1194 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1195 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1196 break
1197 else:
1198 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1199 if detailed_status != detailed_status_old:
1200 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1201 db_nsr_update["detailed-status"] = detailed_status
1202 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1203 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1204 await asyncio.sleep(5, loop=self.loop)
1205 delete_timeout -= 5
1206 else: # delete_timeout <= 0:
1207 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1208
1209 except ROclient.ROClientException as e:
1210 if e.http_code == 404: # not found
1211 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1212 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1213 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1214 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1215 elif e.http_code == 409: # conflict
1216 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1217 self.logger.debug(logging_text + failed_detail[-1])
1218 RO_fail = True
1219 else:
1220 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1221 self.logger.error(logging_text + failed_detail[-1])
1222 RO_fail = True
1223
1224 # Delete nsd
1225 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1226 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1227 try:
1228 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1229 "Deleting nsd at RO"
1230 await RO.delete("nsd", RO_nsd_id)
1231 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1232 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1233 except ROclient.ROClientException as e:
1234 if e.http_code == 404: # not found
1235 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1236 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1237 elif e.http_code == 409: # conflict
1238 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1239 self.logger.debug(logging_text + failed_detail[-1])
1240 RO_fail = True
1241 else:
1242 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1243 self.logger.error(logging_text + failed_detail[-1])
1244 RO_fail = True
1245
1246 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1247 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1248 if not RO_vnfd_id:
1249 continue
1250 try:
1251 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1252 "Deleting vnfd={} at RO".format(vnf_id)
1253 await RO.delete("vnfd", RO_vnfd_id)
1254 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1255 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1256 except ROclient.ROClientException as e:
1257 if e.http_code == 404: # not found
1258 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1259 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1260 elif e.http_code == 409: # conflict
1261 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1262 self.logger.debug(logging_text + failed_detail[-1])
1263 else:
1264 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1265 self.logger.error(logging_text + failed_detail[-1])
1266
1267 # wait until charm deleted
1268 if vca_time_destroy:
1269 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1270 "Waiting for deletion of configuration charms"
1271 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1272 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1273 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1274 if not vca_deployed:
1275 continue
1276 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1277 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1278 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1279 timeout):
1280 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1281 vca_deployed["application"]))
1282 else:
1283 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1284
1285 if failed_detail:
1286 self.logger.error(logging_text + " ;".join(failed_detail))
1287 db_nsr_update["operational-status"] = "failed"
1288 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1289 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1290 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1291 db_nslcmop_update["statusEnteredTime"] = time()
1292 else:
1293 db_nsr_update["operational-status"] = "terminated"
1294 db_nsr_update["detailed-status"] = "Done"
1295 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1296 db_nslcmop_update["detailed-status"] = "Done"
1297 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1298 db_nslcmop_update["statusEnteredTime"] = time()
1299 if db_nslcmop["operationParams"].get("autoremove"):
1300 autoremove = True
1301
1302 except (ROclient.ROClientException, DbException) as e:
1303 self.logger.error(logging_text + "Exit Exception {}".format(e))
1304 exc = e
1305 except asyncio.CancelledError:
1306 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1307 exc = "Operation was cancelled"
1308 except Exception as e:
1309 exc = traceback.format_exc()
1310 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1311 finally:
1312 if exc and db_nslcmop:
1313 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1314 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1315 db_nslcmop_update["statusEnteredTime"] = time()
1316 try:
1317 if db_nslcmop and db_nslcmop_update:
1318 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1319 if db_nsr:
1320 db_nsr_update["_admin.nslcmop"] = None
1321 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1322 except DbException as e:
1323 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1324 if nslcmop_operation_state:
1325 try:
1326 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1327 "operationState": nslcmop_operation_state,
1328 "autoremove": autoremove},
1329 loop=self.loop)
1330 except Exception as e:
1331 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1332 self.logger.debug(logging_text + "Exit")
1333 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1334
1335 @staticmethod
1336 def _map_primitive_params(primitive_desc, params, instantiation_params):
1337 """
1338 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
1339 The default-value is used. If it is between < > it look for a value at instantiation_params
1340 :param primitive_desc: portion of VNFD/NSD that describes primitive
1341 :param params: Params provided by user
1342 :param instantiation_params: Instantiation params provided by user
1343 :return: a dictionary with the calculated params
1344 """
1345 calculated_params = {}
1346 for parameter in primitive_desc.get("parameter", ()):
1347 param_name = parameter["name"]
1348 if param_name in params:
1349 calculated_params[param_name] = params[param_name]
1350 elif "default-value" in parameter:
1351 calculated_params[param_name] = parameter["default-value"]
1352 if isinstance(parameter["default-value"], str) and parameter["default-value"].startswith("<") and \
1353 parameter["default-value"].endswith(">"):
1354 if parameter["default-value"][1:-1] in instantiation_params:
1355 calculated_params[param_name] = instantiation_params[parameter["default-value"][1:-1]]
1356 else:
1357 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1358 format(parameter["default-value"], primitive_desc["name"]))
1359 else:
1360 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1361 format(param_name, primitive_desc["name"]))
1362
1363 if isinstance(calculated_params[param_name], (dict, list, tuple)):
1364 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
1365 width=256)
1366 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
1367 calculated_params[param_name] = calculated_params[param_name][7:]
1368 return calculated_params
1369
1370 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1371 primitive, primitive_params):
1372 start_primitive_time = time()
1373 try:
1374 for vca_deployed in db_deployed["VCA"]:
1375 if not vca_deployed:
1376 continue
1377 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1378 continue
1379 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1380 continue
1381 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1382 continue
1383 break
1384 else:
1385 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
1386 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1387 model_name = vca_deployed.get("model")
1388 application_name = vca_deployed.get("application")
1389 if not model_name or not application_name:
1390 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
1391 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
1392 vdu_count_index))
1393 if vca_deployed["operational-status"] != "active":
1394 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1395 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1396 callback = None # self.n2vc_callback
1397 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1398 await self.n2vc.login()
1399 primitive_id = await self.n2vc.ExecutePrimitive(
1400 model_name,
1401 application_name,
1402 primitive,
1403 callback,
1404 *callback_args,
1405 **primitive_params
1406 )
1407 while time() - start_primitive_time < self.timeout_primitive:
1408 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
1409 if primitive_result_ == "running":
1410 pass
1411 elif primitive_result_ in ("completed", "failed"):
1412 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
1413 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
1414 break
1415 else:
1416 detailed_result = "Invalid N2VC.GetPrimitiveStatus = {} obtained".format(primitive_result_)
1417 primitive_result = "FAILED"
1418 break
1419 await asyncio.sleep(5)
1420 else:
1421 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
1422 return primitive_result, detailed_result
1423 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
1424 return "FAILED", str(e)
1425
1426 async def action(self, nsr_id, nslcmop_id):
1427 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1428 self.logger.debug(logging_text + "Enter")
1429 # get all needed from database
1430 db_nsr = None
1431 db_nslcmop = None
1432 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1433 db_nslcmop_update = {}
1434 nslcmop_operation_state = None
1435 exc = None
1436 try:
1437 step = "Getting information from database"
1438 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1439 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1440
1441 nsr_deployed = db_nsr["_admin"].get("deployed")
1442 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1443 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1444 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1445 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1446
1447 step = "Getting vnfr from database"
1448 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1449 step = "Getting vnfd from database"
1450 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1451
1452 # look if previous tasks in process
1453 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1454 if task_dependency:
1455 step = db_nslcmop_update["detailed-status"] = \
1456 "Waiting for related tasks to be completed: {}".format(task_name)
1457 self.logger.debug(logging_text + step)
1458 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1459 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1460 if pending:
1461 raise LcmException("Timeout waiting related tasks to be completed")
1462
1463 # for backward compatibility
1464 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1465 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1466 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1467 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1468
1469 primitive = db_nslcmop["operationParams"]["primitive"]
1470 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1471
1472 # look for primitive
1473 config_primitive_desc = None
1474 if vdu_id:
1475 for vdu in get_iterable(db_vnfd, "vdu"):
1476 if vdu_id == vdu["id"]:
1477 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
1478 if config_primitive["name"] == primitive:
1479 config_primitive_desc = config_primitive
1480 break
1481 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1482 if config_primitive["name"] == primitive:
1483 config_primitive_desc = config_primitive
1484 break
1485 if not config_primitive_desc:
1486 raise LcmException("Primitive {} not found at vnf-configuration:config-primitive or vdu:"
1487 "vdu-configuration:config-primitive".format(primitive))
1488
1489 vnfr_params = {}
1490 if db_vnfr.get("additionalParamsForVnf"):
1491 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1492
1493 # TODO check if ns is in a proper status
1494 result, result_detail = await self._ns_execute_primitive(
1495 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1496 self._map_primitive_params(config_primitive_desc, primitive_params, vnfr_params))
1497 db_nslcmop_update["detailed-status"] = result_detail
1498 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1499 db_nslcmop_update["statusEnteredTime"] = time()
1500 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1501 return # database update is called inside finally
1502
1503 except (DbException, LcmException) as e:
1504 self.logger.error(logging_text + "Exit Exception {}".format(e))
1505 exc = e
1506 except asyncio.CancelledError:
1507 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1508 exc = "Operation was cancelled"
1509 except Exception as e:
1510 exc = traceback.format_exc()
1511 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1512 finally:
1513 if exc and db_nslcmop:
1514 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1515 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1516 db_nslcmop_update["statusEnteredTime"] = time()
1517 try:
1518 if db_nslcmop_update:
1519 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1520 if db_nsr:
1521 db_nsr_update["_admin.nslcmop"] = None
1522 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1523 except DbException as e:
1524 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1525 self.logger.debug(logging_text + "Exit")
1526 if nslcmop_operation_state:
1527 try:
1528 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1529 "operationState": nslcmop_operation_state},
1530 loop=self.loop)
1531 except Exception as e:
1532 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1533 self.logger.debug(logging_text + "Exit")
1534 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1535
1536 async def scale(self, nsr_id, nslcmop_id):
1537 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1538 self.logger.debug(logging_text + "Enter")
1539 # get all needed from database
1540 db_nsr = None
1541 db_nslcmop = None
1542 db_nslcmop_update = {}
1543 nslcmop_operation_state = None
1544 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1545 exc = None
1546 # in case of error, indicates what part of scale was failed to put nsr at error status
1547 scale_process = None
1548 old_operational_status = ""
1549 old_config_status = ""
1550 vnfr_scaled = False
1551 try:
1552 step = "Getting nslcmop from database"
1553 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1554 step = "Getting nsr from database"
1555 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1556
1557 old_operational_status = db_nsr["operational-status"]
1558 old_config_status = db_nsr["config-status"]
1559
1560 # look if previous tasks in process
1561 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1562 if task_dependency:
1563 step = db_nslcmop_update["detailed-status"] = \
1564 "Waiting for related tasks to be completed: {}".format(task_name)
1565 self.logger.debug(logging_text + step)
1566 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1567 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1568 if pending:
1569 raise LcmException("Timeout waiting related tasks to be completed")
1570
1571 step = "Parsing scaling parameters"
1572 db_nsr_update["operational-status"] = "scaling"
1573 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1574 nsr_deployed = db_nsr["_admin"].get("deployed")
1575 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1576 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1577 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1578 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1579 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1580
1581 # for backward compatibility
1582 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1583 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1584 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1585 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1586
1587 step = "Getting vnfr from database"
1588 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1589 step = "Getting vnfd from database"
1590 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1591 step = "Getting scaling-group-descriptor"
1592 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1593 if scaling_descriptor["name"] == scaling_group:
1594 break
1595 else:
1596 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1597 "at vnfd:scaling-group-descriptor".format(scaling_group))
1598 # cooldown_time = 0
1599 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1600 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1601 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1602 # break
1603
1604 # TODO check if ns is in a proper status
1605 step = "Sending scale order to RO"
1606 nb_scale_op = 0
1607 if not db_nsr["_admin"].get("scaling-group"):
1608 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1609 admin_scale_index = 0
1610 else:
1611 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1612 if admin_scale_info["name"] == scaling_group:
1613 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1614 break
1615 else: # not found, set index one plus last element and add new entry with the name
1616 admin_scale_index += 1
1617 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1618 RO_scaling_info = []
1619 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1620 if scaling_type == "SCALE_OUT":
1621 # count if max-instance-count is reached
1622 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1623 max_instance_count = int(scaling_descriptor["max-instance-count"])
1624 if nb_scale_op >= max_instance_count:
1625 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1626 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1627 nb_scale_op = nb_scale_op + 1
1628 vdu_scaling_info["scaling_direction"] = "OUT"
1629 vdu_scaling_info["vdu-create"] = {}
1630 for vdu_scale_info in scaling_descriptor["vdu"]:
1631 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1632 "type": "create", "count": vdu_scale_info.get("count", 1)})
1633 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1634 elif scaling_type == "SCALE_IN":
1635 # count if min-instance-count is reached
1636 min_instance_count = 0
1637 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1638 min_instance_count = int(scaling_descriptor["min-instance-count"])
1639 if nb_scale_op <= min_instance_count:
1640 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1641 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1642 nb_scale_op = nb_scale_op - 1
1643 vdu_scaling_info["scaling_direction"] = "IN"
1644 vdu_scaling_info["vdu-delete"] = {}
1645 for vdu_scale_info in scaling_descriptor["vdu"]:
1646 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1647 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1648 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1649
1650 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1651 vdu_create = vdu_scaling_info.get("vdu-create")
1652 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1653 if vdu_scaling_info["scaling_direction"] == "IN":
1654 for vdur in reversed(db_vnfr["vdur"]):
1655 if vdu_delete.get(vdur["vdu-id-ref"]):
1656 vdu_delete[vdur["vdu-id-ref"]] -= 1
1657 vdu_scaling_info["vdu"].append({
1658 "name": vdur["name"],
1659 "vdu_id": vdur["vdu-id-ref"],
1660 "interface": []
1661 })
1662 for interface in vdur["interfaces"]:
1663 vdu_scaling_info["vdu"][-1]["interface"].append({
1664 "name": interface["name"],
1665 "ip_address": interface["ip-address"],
1666 "mac_address": interface.get("mac-address"),
1667 })
1668 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1669
1670 # execute primitive service PRE-SCALING
1671 step = "Executing pre-scale vnf-config-primitive"
1672 if scaling_descriptor.get("scaling-config-action"):
1673 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1674 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1675 and scaling_type == "SCALE_IN":
1676 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1677 step = db_nslcmop_update["detailed-status"] = \
1678 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1679
1680 # look for primitive
1681 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1682 if config_primitive["name"] == vnf_config_primitive:
1683 break
1684 else:
1685 raise LcmException(
1686 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1687 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
1688 "primitive".format(scaling_group, config_primitive))
1689
1690 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1691 if db_vnfr.get("additionalParamsForVnf"):
1692 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1693
1694 scale_process = "VCA"
1695 db_nsr_update["config-status"] = "configuring pre-scaling"
1696 result, result_detail = await self._ns_execute_primitive(
1697 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1698 self._map_primitive_params(config_primitive, {}, vnfr_params))
1699 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1700 vnf_config_primitive, result, result_detail))
1701 if result == "FAILED":
1702 raise LcmException(result_detail)
1703 db_nsr_update["config-status"] = old_config_status
1704 scale_process = None
1705
1706 if RO_scaling_info:
1707 scale_process = "RO"
1708 RO = ROclient.ROClient(self.loop, **self.ro_config)
1709 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1710 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1711 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1712 # wait until ready
1713 RO_nslcmop_id = RO_desc["instance_action_id"]
1714 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1715
1716 RO_task_done = False
1717 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1718 detailed_status_old = None
1719 self.logger.debug(logging_text + step)
1720
1721 deployment_timeout = 1 * 3600 # One hour
1722 while deployment_timeout > 0:
1723 if not RO_task_done:
1724 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1725 extra_item_id=RO_nslcmop_id)
1726 ns_status, ns_status_info = RO.check_action_status(desc)
1727 if ns_status == "ERROR":
1728 raise ROclient.ROClientException(ns_status_info)
1729 elif ns_status == "BUILD":
1730 detailed_status = step + "; {}".format(ns_status_info)
1731 elif ns_status == "ACTIVE":
1732 RO_task_done = True
1733 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1734 self.logger.debug(logging_text + step)
1735 else:
1736 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1737 else:
1738 desc = await RO.show("ns", RO_nsr_id)
1739 ns_status, ns_status_info = RO.check_ns_status(desc)
1740 if ns_status == "ERROR":
1741 raise ROclient.ROClientException(ns_status_info)
1742 elif ns_status == "BUILD":
1743 detailed_status = step + "; {}".format(ns_status_info)
1744 elif ns_status == "ACTIVE":
1745 step = detailed_status = \
1746 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1747 if not vnfr_scaled:
1748 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1749 vnfr_scaled = True
1750 try:
1751 desc = await RO.show("ns", RO_nsr_id)
1752 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1753 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1754 break
1755 except LcmExceptionNoMgmtIP:
1756 pass
1757 else:
1758 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1759 if detailed_status != detailed_status_old:
1760 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1761 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1762
1763 await asyncio.sleep(5, loop=self.loop)
1764 deployment_timeout -= 5
1765 if deployment_timeout <= 0:
1766 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1767
1768 # update VDU_SCALING_INFO with the obtained ip_addresses
1769 if vdu_scaling_info["scaling_direction"] == "OUT":
1770 for vdur in reversed(db_vnfr["vdur"]):
1771 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1772 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1773 vdu_scaling_info["vdu"].append({
1774 "name": vdur["name"],
1775 "vdu_id": vdur["vdu-id-ref"],
1776 "interface": []
1777 })
1778 for interface in vdur["interfaces"]:
1779 vdu_scaling_info["vdu"][-1]["interface"].append({
1780 "name": interface["name"],
1781 "ip_address": interface["ip-address"],
1782 "mac_address": interface.get("mac-address"),
1783 })
1784 del vdu_scaling_info["vdu-create"]
1785
1786 scale_process = None
1787 if db_nsr_update:
1788 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1789
1790 # execute primitive service POST-SCALING
1791 step = "Executing post-scale vnf-config-primitive"
1792 if scaling_descriptor.get("scaling-config-action"):
1793 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1794 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1795 and scaling_type == "SCALE_OUT":
1796 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1797 step = db_nslcmop_update["detailed-status"] = \
1798 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1799
1800 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1801 if db_vnfr.get("additionalParamsForVnf"):
1802 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1803
1804 # look for primitive
1805 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1806 if config_primitive["name"] == vnf_config_primitive:
1807 break
1808 else:
1809 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1810 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1811 "match any vnf-configuration:config-primitive".format(scaling_group,
1812 config_primitive))
1813 scale_process = "VCA"
1814 db_nsr_update["config-status"] = "configuring post-scaling"
1815
1816 result, result_detail = await self._ns_execute_primitive(
1817 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1818 self._map_primitive_params(config_primitive, {}, vnfr_params))
1819 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1820 vnf_config_primitive, result, result_detail))
1821 if result == "FAILED":
1822 raise LcmException(result_detail)
1823 db_nsr_update["config-status"] = old_config_status
1824 scale_process = None
1825
1826 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1827 db_nslcmop_update["statusEnteredTime"] = time()
1828 db_nslcmop_update["detailed-status"] = "done"
1829 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1830 db_nsr_update["operational-status"] = old_operational_status
1831 db_nsr_update["config-status"] = old_config_status
1832 return
1833 except (ROclient.ROClientException, DbException, LcmException) as e:
1834 self.logger.error(logging_text + "Exit Exception {}".format(e))
1835 exc = e
1836 except asyncio.CancelledError:
1837 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1838 exc = "Operation was cancelled"
1839 except Exception as e:
1840 exc = traceback.format_exc()
1841 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1842 finally:
1843 if exc:
1844 if db_nslcmop:
1845 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1846 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1847 db_nslcmop_update["statusEnteredTime"] = time()
1848 if db_nsr:
1849 db_nsr_update["operational-status"] = old_operational_status
1850 db_nsr_update["config-status"] = old_config_status
1851 db_nsr_update["detailed-status"] = ""
1852 db_nsr_update["_admin.nslcmop"] = None
1853 if scale_process:
1854 if "VCA" in scale_process:
1855 db_nsr_update["config-status"] = "failed"
1856 if "RO" in scale_process:
1857 db_nsr_update["operational-status"] = "failed"
1858 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1859 exc)
1860 try:
1861 if db_nslcmop and db_nslcmop_update:
1862 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1863 if db_nsr:
1864 db_nsr_update["_admin.nslcmop"] = None
1865 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1866 except DbException as e:
1867 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1868 if nslcmop_operation_state:
1869 try:
1870 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1871 "operationState": nslcmop_operation_state},
1872 loop=self.loop)
1873 # if cooldown_time:
1874 # await asyncio.sleep(cooldown_time)
1875 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1876 except Exception as e:
1877 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1878 self.logger.debug(logging_text + "Exit")
1879 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")