NS parameters for initial-config-primitive
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 2 * 3600 # global timeout for deployment
73 timeout_charm_delete = 10 * 60
74
75 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
76 """
77 Init, Connect to database, filesystem storage, and messaging
78 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
79 :return: None
80 """
81 # logging
82 self.logger = logging.getLogger('lcm.ns')
83 self.loop = loop
84 self.lcm_tasks = lcm_tasks
85
86 super().__init__(db, msg, fs, self.logger)
87
88 self.ro_config = ro_config
89
90 self.n2vc = N2VC(
91 log=self.logger,
92 server=vca_config['host'],
93 port=vca_config['port'],
94 user=vca_config['user'],
95 secret=vca_config['secret'],
96 # TODO: This should point to the base folder where charms are stored,
97 # if there is a common one (like object storage). Otherwise, leave
98 # it unset and pass it via DeployCharms
99 # artifacts=vca_config[''],
100 artifacts=None,
101 )
102
103 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
104 """
105 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
106 :param vnfd: input vnfd
107 :param new_id: overrides vnf id if provided
108 :param additionalParams: Instantiation params for VNFs provided
109 :param nsrId: Id of the NSR
110 :return: copy of vnfd
111 """
112 try:
113 vnfd_RO = deepcopy(vnfd)
114 # remove unused by RO configuration, monitoring, scaling and internal keys
115 vnfd_RO.pop("_id", None)
116 vnfd_RO.pop("_admin", None)
117 vnfd_RO.pop("vnf-configuration", None)
118 vnfd_RO.pop("monitoring-param", None)
119 vnfd_RO.pop("scaling-group-descriptor", None)
120 if new_id:
121 vnfd_RO["id"] = new_id
122
123 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
124 for vdu in get_iterable(vnfd_RO, "vdu"):
125 cloud_init_file = None
126 if vdu.get("cloud-init-file"):
127 base_folder = vnfd["_admin"]["storage"]
128 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
129 vdu["cloud-init-file"])
130 with self.fs.file_open(cloud_init_file, "r") as ci_file:
131 cloud_init_content = ci_file.read()
132 vdu.pop("cloud-init-file", None)
133 elif vdu.get("cloud-init"):
134 cloud_init_content = vdu["cloud-init"]
135 else:
136 continue
137
138 env = Environment()
139 ast = env.parse(cloud_init_content)
140 mandatory_vars = meta.find_undeclared_variables(ast)
141 if mandatory_vars:
142 for var in mandatory_vars:
143 if not additionalParams or var not in additionalParams.keys():
144 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
145 "file, must be provided in the instantiation parameters inside the "
146 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
147 template = Template(cloud_init_content)
148 cloud_init_content = template.render(additionalParams)
149 vdu["cloud-init"] = cloud_init_content
150
151 return vnfd_RO
152 except FsException as e:
153 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
154 format(vnfd["id"], vdu[id], cloud_init_file, e))
155 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
156 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
157 format(vnfd["id"], vdu["id"], e))
158
159 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
160 """
161 Callback both for charm status change and task completion
162 :param model_name: Charm model name
163 :param application_name: Charm application name
164 :param status: Can be
165 - blocked: The unit needs manual intervention
166 - maintenance: The unit is actively deploying/configuring
167 - waiting: The unit is waiting for another charm to be ready
168 - active: The unit is deployed, configured, and ready
169 - error: The charm has failed and needs attention.
170 - terminated: The charm has been destroyed
171 - removing,
172 - removed
173 :param message: detailed message error
174 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
175 nsr_id:
176 nslcmop_id:
177 lcmOperationType: currently "instantiate"
178 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
179 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
180 n2vc_event: event used to notify instantiation task that some change has been produced
181 :param task: None for charm status change, or task for completion task callback
182 :return:
183 """
184 try:
185 nsr_id = n2vc_info["nsr_id"]
186 deployed = n2vc_info["deployed"]
187 db_nsr_update = n2vc_info["db_update"]
188 nslcmop_id = n2vc_info["nslcmop_id"]
189 ns_operation = n2vc_info["lcmOperationType"]
190 n2vc_event = n2vc_info["n2vc_event"]
191 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
192 application_name)
193 for vca_index, vca_deployed in enumerate(deployed):
194 if not vca_deployed:
195 continue
196 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
197 break
198 else:
199 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
200 return
201 if task:
202 if task.cancelled():
203 self.logger.debug(logging_text + " task Cancelled")
204 vca_deployed['operational-status'] = "error"
205 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
206 vca_deployed['detailed-status'] = "Task Cancelled"
207 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
208
209 elif task.done():
210 exc = task.exception()
211 if exc:
212 self.logger.error(logging_text + " task Exception={}".format(exc))
213 vca_deployed['operational-status'] = "error"
214 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
215 vca_deployed['detailed-status'] = str(exc)
216 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
217 else:
218 self.logger.debug(logging_text + " task Done")
219 # task is Done, but callback is still ongoing. So ignore
220 return
221 elif status:
222 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
223 if vca_deployed['operational-status'] == status:
224 return # same status, ignore
225 vca_deployed['operational-status'] = status
226 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
227 vca_deployed['detailed-status'] = str(message)
228 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
229 else:
230 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
231 return
232 # wake up instantiate task
233 n2vc_event.set()
234 except Exception as e:
235 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
236
237 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
238 """
239 Creates a RO ns descriptor from OSM ns_instantiate params
240 :param ns_params: OSM instantiate params
241 :return: The RO ns descriptor
242 """
243 vim_2_RO = {}
244 # TODO feature 1417: Check that no instantiation is set over PDU
245 # check if PDU forces a concrete vim-network-id and add it
246 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
247
248 def vim_account_2_RO(vim_account):
249 if vim_account in vim_2_RO:
250 return vim_2_RO[vim_account]
251
252 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
253 if db_vim["_admin"]["operationalState"] != "ENABLED":
254 raise LcmException("VIM={} is not available. operationalState={}".format(
255 vim_account, db_vim["_admin"]["operationalState"]))
256 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
257 vim_2_RO[vim_account] = RO_vim_id
258 return RO_vim_id
259
260 def ip_profile_2_RO(ip_profile):
261 RO_ip_profile = deepcopy((ip_profile))
262 if "dns-server" in RO_ip_profile:
263 if isinstance(RO_ip_profile["dns-server"], list):
264 RO_ip_profile["dns-address"] = []
265 for ds in RO_ip_profile.pop("dns-server"):
266 RO_ip_profile["dns-address"].append(ds['address'])
267 else:
268 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
269 if RO_ip_profile.get("ip-version") == "ipv4":
270 RO_ip_profile["ip-version"] = "IPv4"
271 if RO_ip_profile.get("ip-version") == "ipv6":
272 RO_ip_profile["ip-version"] = "IPv6"
273 if "dhcp-params" in RO_ip_profile:
274 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
275 return RO_ip_profile
276
277 if not ns_params:
278 return None
279 RO_ns_params = {
280 # "name": ns_params["nsName"],
281 # "description": ns_params.get("nsDescription"),
282 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
283 # "scenario": ns_params["nsdId"],
284 }
285 if n2vc_key_list:
286 for vnfd_ref, vnfd in vnfd_dict.items():
287 vdu_needed_access = []
288 mgmt_cp = None
289 if vnfd.get("vnf-configuration"):
290 if vnfd.get("mgmt-interface"):
291 if vnfd["mgmt-interface"].get("vdu-id"):
292 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
293 elif vnfd["mgmt-interface"].get("cp"):
294 mgmt_cp = vnfd["mgmt-interface"]["cp"]
295
296 for vdu in vnfd.get("vdu", ()):
297 if vdu.get("vdu-configuration"):
298 vdu_needed_access.append(vdu["id"])
299 elif mgmt_cp:
300 for vdu_interface in vdu.get("interface"):
301 if vdu_interface.get("external-connection-point-ref") and \
302 vdu_interface["external-connection-point-ref"] == mgmt_cp:
303 vdu_needed_access.append(vdu["id"])
304 mgmt_cp = None
305 break
306
307 if vdu_needed_access:
308 for vnf_member in nsd.get("constituent-vnfd"):
309 if vnf_member["vnfd-id-ref"] != vnfd_ref:
310 continue
311 for vdu in vdu_needed_access:
312 populate_dict(RO_ns_params,
313 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
314 n2vc_key_list)
315
316 if ns_params.get("vduImage"):
317 RO_ns_params["vduImage"] = ns_params["vduImage"]
318
319 if ns_params.get("ssh_keys"):
320 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
321 for vnf_params in get_iterable(ns_params, "vnf"):
322 for constituent_vnfd in nsd["constituent-vnfd"]:
323 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
324 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
325 break
326 else:
327 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
328 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
329 if vnf_params.get("vimAccountId"):
330 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
331 vim_account_2_RO(vnf_params["vimAccountId"]))
332
333 for vdu_params in get_iterable(vnf_params, "vdu"):
334 # TODO feature 1417: check that this VDU exist and it is not a PDU
335 if vdu_params.get("volume"):
336 for volume_params in vdu_params["volume"]:
337 if volume_params.get("vim-volume-id"):
338 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
339 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
340 volume_params["vim-volume-id"])
341 if vdu_params.get("interface"):
342 for interface_params in vdu_params["interface"]:
343 if interface_params.get("ip-address"):
344 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
345 vdu_params["id"], "interfaces", interface_params["name"],
346 "ip_address"),
347 interface_params["ip-address"])
348 if interface_params.get("mac-address"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
350 vdu_params["id"], "interfaces", interface_params["name"],
351 "mac_address"),
352 interface_params["mac-address"])
353 if interface_params.get("floating-ip-required"):
354 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
355 vdu_params["id"], "interfaces", interface_params["name"],
356 "floating-ip"),
357 interface_params["floating-ip-required"])
358
359 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
360 if internal_vld_params.get("vim-network-name"):
361 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
362 internal_vld_params["name"], "vim-network-name"),
363 internal_vld_params["vim-network-name"])
364 if internal_vld_params.get("vim-network-id"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
366 internal_vld_params["name"], "vim-network-id"),
367 internal_vld_params["vim-network-id"])
368 if internal_vld_params.get("ip-profile"):
369 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
370 internal_vld_params["name"], "ip-profile"),
371 ip_profile_2_RO(internal_vld_params["ip-profile"]))
372
373 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
374 # look for interface
375 iface_found = False
376 for vdu_descriptor in vnf_descriptor["vdu"]:
377 for vdu_interface in vdu_descriptor["interface"]:
378 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
379 if icp_params.get("ip-address"):
380 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
381 vdu_descriptor["id"], "interfaces",
382 vdu_interface["name"], "ip_address"),
383 icp_params["ip-address"])
384
385 if icp_params.get("mac-address"):
386 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
387 vdu_descriptor["id"], "interfaces",
388 vdu_interface["name"], "mac_address"),
389 icp_params["mac-address"])
390 iface_found = True
391 break
392 if iface_found:
393 break
394 else:
395 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
396 "internal-vld:id-ref={} is not present at vnfd:internal-"
397 "connection-point".format(vnf_params["member-vnf-index"],
398 icp_params["id-ref"]))
399
400 for vld_params in get_iterable(ns_params, "vld"):
401 if "ip-profile" in vld_params:
402 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
403 ip_profile_2_RO(vld_params["ip-profile"]))
404 if vld_params.get("vim-network-name"):
405 RO_vld_sites = []
406 if isinstance(vld_params["vim-network-name"], dict):
407 for vim_account, vim_net in vld_params["vim-network-name"].items():
408 RO_vld_sites.append({
409 "netmap-use": vim_net,
410 "datacenter": vim_account_2_RO(vim_account)
411 })
412 else: # isinstance str
413 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
414 if RO_vld_sites:
415 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
416 if vld_params.get("vim-network-id"):
417 RO_vld_sites = []
418 if isinstance(vld_params["vim-network-id"], dict):
419 for vim_account, vim_net in vld_params["vim-network-id"].items():
420 RO_vld_sites.append({
421 "netmap-use": vim_net,
422 "datacenter": vim_account_2_RO(vim_account)
423 })
424 else: # isinstance str
425 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
426 if RO_vld_sites:
427 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
428 if "vnfd-connection-point-ref" in vld_params:
429 for cp_params in vld_params["vnfd-connection-point-ref"]:
430 # look for interface
431 for constituent_vnfd in nsd["constituent-vnfd"]:
432 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
433 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
434 break
435 else:
436 raise LcmException(
437 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
438 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
439 match_cp = False
440 for vdu_descriptor in vnf_descriptor["vdu"]:
441 for interface_descriptor in vdu_descriptor["interface"]:
442 if interface_descriptor.get("external-connection-point-ref") == \
443 cp_params["vnfd-connection-point-ref"]:
444 match_cp = True
445 break
446 if match_cp:
447 break
448 else:
449 raise LcmException(
450 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
451 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
452 cp_params["member-vnf-index-ref"],
453 cp_params["vnfd-connection-point-ref"],
454 vnf_descriptor["id"]))
455 if cp_params.get("ip-address"):
456 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
457 vdu_descriptor["id"], "interfaces",
458 interface_descriptor["name"], "ip_address"),
459 cp_params["ip-address"])
460 if cp_params.get("mac-address"):
461 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
462 vdu_descriptor["id"], "interfaces",
463 interface_descriptor["name"], "mac_address"),
464 cp_params["mac-address"])
465 return RO_ns_params
466
467 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
468 # make a copy to do not change
469 vdu_create = copy(vdu_create)
470 vdu_delete = copy(vdu_delete)
471
472 vdurs = db_vnfr.get("vdur")
473 if vdurs is None:
474 vdurs = []
475 vdu_index = len(vdurs)
476 while vdu_index:
477 vdu_index -= 1
478 vdur = vdurs[vdu_index]
479 if vdur.get("pdu-type"):
480 continue
481 vdu_id_ref = vdur["vdu-id-ref"]
482 if vdu_create and vdu_create.get(vdu_id_ref):
483 for index in range(0, vdu_create[vdu_id_ref]):
484 vdur = deepcopy(vdur)
485 vdur["_id"] = str(uuid4())
486 vdur["count-index"] += 1
487 vdurs.insert(vdu_index+1+index, vdur)
488 del vdu_create[vdu_id_ref]
489 if vdu_delete and vdu_delete.get(vdu_id_ref):
490 del vdurs[vdu_index]
491 vdu_delete[vdu_id_ref] -= 1
492 if not vdu_delete[vdu_id_ref]:
493 del vdu_delete[vdu_id_ref]
494 # check all operations are done
495 if vdu_create or vdu_delete:
496 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
497 vdu_create))
498 if vdu_delete:
499 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
500 vdu_delete))
501
502 vnfr_update = {"vdur": vdurs}
503 db_vnfr["vdur"] = vdurs
504 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
505
506 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
507 """
508 Updates database nsr with the RO info for the created vld
509 :param ns_update_nsr: dictionary to be filled with the updated info
510 :param db_nsr: content of db_nsr. This is also modified
511 :param nsr_desc_RO: nsr descriptor from RO
512 :return: Nothing, LcmException is raised on errors
513 """
514
515 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
516 for net_RO in get_iterable(nsr_desc_RO, "nets"):
517 if vld["id"] != net_RO.get("ns_net_osm_id"):
518 continue
519 vld["vim-id"] = net_RO.get("vim_net_id")
520 vld["name"] = net_RO.get("vim_name")
521 vld["status"] = net_RO.get("status")
522 vld["status-detailed"] = net_RO.get("error_msg")
523 ns_update_nsr["vld.{}".format(vld_index)] = vld
524 break
525 else:
526 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
527
528 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
529 """
530 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
531 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
532 :param nsr_desc_RO: nsr descriptor from RO
533 :return: Nothing, LcmException is raised on errors
534 """
535 for vnf_index, db_vnfr in db_vnfrs.items():
536 for vnf_RO in nsr_desc_RO["vnfs"]:
537 if vnf_RO["member_vnf_index"] != vnf_index:
538 continue
539 vnfr_update = {}
540 if vnf_RO.get("ip_address"):
541 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
542 elif not db_vnfr.get("ip-address"):
543 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
544
545 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
546 vdur_RO_count_index = 0
547 if vdur.get("pdu-type"):
548 continue
549 for vdur_RO in get_iterable(vnf_RO, "vms"):
550 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
551 continue
552 if vdur["count-index"] != vdur_RO_count_index:
553 vdur_RO_count_index += 1
554 continue
555 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
556 vdur["ip-address"] = vdur_RO.get("ip_address")
557 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
558 vdur["name"] = vdur_RO.get("vim_name")
559 vdur["status"] = vdur_RO.get("status")
560 vdur["status-detailed"] = vdur_RO.get("error_msg")
561 for ifacer in get_iterable(vdur, "interfaces"):
562 for interface_RO in get_iterable(vdur_RO, "interfaces"):
563 if ifacer["name"] == interface_RO.get("internal_name"):
564 ifacer["ip-address"] = interface_RO.get("ip_address")
565 ifacer["mac-address"] = interface_RO.get("mac_address")
566 break
567 else:
568 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
569 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
570 vnfr_update["vdur.{}".format(vdu_index)] = vdur
571 break
572 else:
573 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
574 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
575
576 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
577 for net_RO in get_iterable(nsr_desc_RO, "nets"):
578 if vld["id"] != net_RO.get("vnf_net_osm_id"):
579 continue
580 vld["vim-id"] = net_RO.get("vim_net_id")
581 vld["name"] = net_RO.get("vim_name")
582 vld["status"] = net_RO.get("status")
583 vld["status-detailed"] = net_RO.get("error_msg")
584 vnfr_update["vld.{}".format(vld_index)] = vld
585 break
586 else:
587 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
588 vnf_index, vld["id"]))
589
590 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
591 break
592
593 else:
594 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
595
596 async def instantiate(self, nsr_id, nslcmop_id):
597 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
598 self.logger.debug(logging_text + "Enter")
599 # get all needed from database
600 start_deploy = time()
601 db_nsr = None
602 db_nslcmop = None
603 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
604 db_nslcmop_update = {}
605 nslcmop_operation_state = None
606 db_vnfrs = {}
607 RO_descriptor_number = 0 # number of descriptors created at RO
608 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
609 n2vc_info = {}
610 exc = None
611 try:
612 step = "Getting nslcmop={} from db".format(nslcmop_id)
613 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
614 step = "Getting nsr={} from db".format(nsr_id)
615 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
616 ns_params = db_nslcmop.get("operationParams")
617 nsd = db_nsr["nsd"]
618 nsr_name = db_nsr["name"] # TODO short-name??
619
620 # look if previous tasks in process
621 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
622 if task_dependency:
623 step = db_nslcmop_update["detailed-status"] = \
624 "Waiting for related tasks to be completed: {}".format(task_name)
625 self.logger.debug(logging_text + step)
626 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
627 _, pending = await asyncio.wait(task_dependency, timeout=3600)
628 if pending:
629 raise LcmException("Timeout waiting related tasks to be completed")
630
631 step = "Getting vnfrs from db"
632 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
633 db_vnfds_ref = {}
634 db_vnfds = {}
635 for vnfr in db_vnfrs_list:
636 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
637 vnfd_id = vnfr["vnfd-id"]
638 vnfd_ref = vnfr["vnfd-ref"]
639 if vnfd_id not in db_vnfds:
640 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
641 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
642 db_vnfds_ref[vnfd_ref] = vnfd
643 db_vnfds[vnfd_id] = vnfd
644
645 # Get or generates the _admin.deployed,VCA list
646 vca_deployed_list = None
647 if db_nsr["_admin"].get("deployed"):
648 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
649 if vca_deployed_list is None:
650 vca_deployed_list = []
651 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
652 elif isinstance(vca_deployed_list, dict):
653 # maintain backward compatibility. Change a dict to list at database
654 vca_deployed_list = list(vca_deployed_list.values())
655 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
656
657 db_nsr_update["detailed-status"] = "creating"
658 db_nsr_update["operational-status"] = "init"
659
660 RO = ROclient.ROClient(self.loop, **self.ro_config)
661
662 # set state to INSTANTIATED. When instantiated NBI will not delete directly
663 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
665
666 # get vnfds, instantiate at RO
667 for c_vnf in nsd.get("constituent-vnfd", ()):
668 member_vnf_index = c_vnf["member-vnf-index"]
669 vnfd_ref = vnfd["id"]
670 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
671 # self.logger.debug(logging_text + step)
672 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
673 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
674 RO_descriptor_number += 1
675
676 # look if present
677 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
678 if vnfd_list:
679 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
680 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
681 vnfd_ref, vnfd_list[0]["uuid"]))
682 else:
683 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
684 get("additionalParamsForVnf"), nsr_id)
685 desc = await RO.create("vnfd", descriptor=vnfd_RO)
686 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
687 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
688 vnfd_ref, desc["uuid"]))
689 self.update_db_2("nsrs", nsr_id, db_nsr_update)
690
691 # create nsd at RO
692 nsd_ref = nsd["id"]
693 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
694 # self.logger.debug(logging_text + step)
695
696 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
697 RO_descriptor_number += 1
698 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
699 if nsd_list:
700 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
701 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
702 nsd_ref, RO_nsd_uuid))
703 else:
704 nsd_RO = deepcopy(nsd)
705 nsd_RO["id"] = RO_osm_nsd_id
706 nsd_RO.pop("_id", None)
707 nsd_RO.pop("_admin", None)
708 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
709 member_vnf_index = c_vnf["member-vnf-index"]
710 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
711 for c_vld in nsd_RO.get("vld", ()):
712 for cp in c_vld.get("vnfd-connection-point-ref", ()):
713 member_vnf_index = cp["member-vnf-index-ref"]
714 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
715
716 desc = await RO.create("nsd", descriptor=nsd_RO)
717 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
718 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
719 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
720 self.update_db_2("nsrs", nsr_id, db_nsr_update)
721
722 # Crate ns at RO
723 # if present use it unless in error status
724 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
725 if RO_nsr_id:
726 try:
727 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
728 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
729 desc = await RO.show("ns", RO_nsr_id)
730 except ROclient.ROClientException as e:
731 if e.http_code != HTTPStatus.NOT_FOUND:
732 raise
733 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
734 if RO_nsr_id:
735 ns_status, ns_status_info = RO.check_ns_status(desc)
736 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
737 if ns_status == "ERROR":
738 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
739 self.logger.debug(logging_text + step)
740 await RO.delete("ns", RO_nsr_id)
741 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
742 if not RO_nsr_id:
743 step = db_nsr_update["detailed-status"] = "Checking dependencies"
744 # self.logger.debug(logging_text + step)
745
746 # check if VIM is creating and wait look if previous tasks in process
747 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
748 if task_dependency:
749 step = "Waiting for related tasks to be completed: {}".format(task_name)
750 self.logger.debug(logging_text + step)
751 await asyncio.wait(task_dependency, timeout=3600)
752 if ns_params.get("vnf"):
753 for vnf in ns_params["vnf"]:
754 if "vimAccountId" in vnf:
755 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
756 vnf["vimAccountId"])
757 if task_dependency:
758 step = "Waiting for related tasks to be completed: {}".format(task_name)
759 self.logger.debug(logging_text + step)
760 await asyncio.wait(task_dependency, timeout=3600)
761
762 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
763
764 # feature 1429. Add n2vc public key to needed VMs
765 n2vc_key = await self.n2vc.GetPublicKey()
766 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
767
768 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
769 desc = await RO.create("ns", descriptor=RO_ns_params,
770 name=db_nsr["name"],
771 scenario=RO_nsd_uuid)
772 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
773 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
774 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
775 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
776 self.update_db_2("nsrs", nsr_id, db_nsr_update)
777
778 # wait until NS is ready
779 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
780 detailed_status_old = None
781 self.logger.debug(logging_text + step)
782
783 while time() <= start_deploy + self.total_deploy_timeout:
784 desc = await RO.show("ns", RO_nsr_id)
785 ns_status, ns_status_info = RO.check_ns_status(desc)
786 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
787 if ns_status == "ERROR":
788 raise ROclient.ROClientException(ns_status_info)
789 elif ns_status == "BUILD":
790 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
791 elif ns_status == "ACTIVE":
792 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
793 try:
794 self.ns_update_vnfr(db_vnfrs, desc)
795 break
796 except LcmExceptionNoMgmtIP:
797 pass
798 else:
799 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
800 if detailed_status != detailed_status_old:
801 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
802 self.update_db_2("nsrs", nsr_id, db_nsr_update)
803 await asyncio.sleep(5, loop=self.loop)
804 else: # total_deploy_timeout
805 raise ROclient.ROClientException("Timeout waiting ns to be ready")
806
807 step = "Updating NSR"
808 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
809
810 db_nsr["detailed-status"] = "Configuring vnfr"
811 self.update_db_2("nsrs", nsr_id, db_nsr_update)
812
813 # The parameters we'll need to deploy a charm
814 number_to_configure = 0
815
816 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info):
817 """An inner function to deploy the charm from either vnf or vdu
818 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
819 """
820 if not charm_params["rw_mgmt_ip"]:
821 raise LcmException("vnfd/vdu has not management ip address to configure it")
822 # Login to the VCA.
823 # if number_to_configure == 0:
824 # self.logger.debug("Logging into N2VC...")
825 # task = asyncio.ensure_future(self.n2vc.login())
826 # yield from asyncio.wait_for(task, 30.0)
827 # self.logger.debug("Logged into N2VC!")
828
829 # # await self.n2vc.login()
830
831 # Note: The charm needs to exist on disk at the location
832 # specified by charm_path.
833 base_folder = vnfd["_admin"]["storage"]
834 storage_params = self.fs.get_params()
835 charm_path = "{}{}/{}/charms/{}".format(
836 storage_params["path"],
837 base_folder["folder"],
838 base_folder["pkg-dir"],
839 proxy_charm
840 )
841
842 # ns_name will be ignored in the current version of N2VC
843 # but will be implemented for the next point release.
844 model_name = "default" # TODO bug 585 nsr_id
845 if vdu_id:
846 vdu_id_text = vdu_id + "-"
847 else:
848 vdu_id_text = "-"
849 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
850
851 vca_index = len(vca_deployed_list)
852 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
853 # 26*26 charm in the same NS
854 application_name = application_name[0:48]
855 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
856 vca_deployed_ = {
857 "member-vnf-index": vnf_index,
858 "vdu_id": vdu_id,
859 "model": model_name,
860 "application": application_name,
861 "operational-status": "init",
862 "detailed-status": "",
863 "vnfd_id": vnfd_id,
864 "vdu_name": vdu_name,
865 "vdu_count_index": vdu_count_index,
866 }
867 vca_deployed_list.append(vca_deployed_)
868 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
869 self.update_db_2("nsrs", nsr_id, db_nsr_update)
870
871 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
872 proxy_charm))
873 if not n2vc_info:
874 n2vc_info["nsr_id"] = nsr_id
875 n2vc_info["nslcmop_id"] = nslcmop_id
876 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
877 n2vc_info["lcmOperationType"] = "instantiate"
878 n2vc_info["deployed"] = vca_deployed_list
879 n2vc_info["db_update"] = db_nsr_update
880 task = asyncio.ensure_future(
881 self.n2vc.DeployCharms(
882 model_name, # The network service name
883 application_name, # The application name
884 vnfd, # The vnf descriptor
885 charm_path, # Path to charm
886 charm_params, # Runtime params, like mgmt ip
887 {}, # for native charms only
888 self.n2vc_callback, # Callback for status changes
889 n2vc_info, # Callback parameter
890 None, # Callback parameter (task)
891 )
892 )
893 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
894 n2vc_info))
895 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
896
897 step = "Looking for needed vnfd to configure"
898 self.logger.debug(logging_text + step)
899
900 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
901 vnfd_id = c_vnf["vnfd-id-ref"]
902 vnf_index = str(c_vnf["member-vnf-index"])
903 vnfd = db_vnfds_ref[vnfd_id]
904
905 # Get additional parameters
906 vnfr_params = {}
907 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
908 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
909 for k, v in vnfr_params.items():
910 if isinstance(v, str) and v.startswith("!!yaml "):
911 vnfr_params[k] = yaml.safe_load(v[7:])
912
913 # Check if this VNF has a charm configuration
914 vnf_config = vnfd.get("vnf-configuration")
915 if vnf_config and vnf_config.get("juju"):
916 proxy_charm = vnf_config["juju"]["charm"]
917
918 if proxy_charm:
919 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
920 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
921 charm_params = {
922 "user_values": vnfr_params,
923 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
924 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {}
925 }
926
927 # Login to the VCA. If there are multiple calls to login(),
928 # subsequent calls will be a nop and return immediately.
929 await self.n2vc.login()
930 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
931 number_to_configure += 1
932
933 # Deploy charms for each VDU that supports one.
934 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
935 vdu_config = vdu.get('vdu-configuration')
936 proxy_charm = None
937
938 if vdu_config and vdu_config.get("juju"):
939 proxy_charm = vdu_config["juju"]["charm"]
940
941 if proxy_charm:
942 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
943 await self.n2vc.login()
944 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
945 # TODO for the moment only first vdu_id contains a charm deployed
946 if vdur["vdu-id-ref"] != vdu["id"]:
947 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
948 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
949 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
950 charm_params = {
951 "user_values": vnfr_params,
952 "rw_mgmt_ip": vdur["ip-address"],
953 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
954 }
955 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
956 charm_params, n2vc_info)
957 number_to_configure += 1
958
959 db_nsr_update["operational-status"] = "running"
960 configuration_failed = False
961 if number_to_configure:
962 old_status = "configuring: init: {}".format(number_to_configure)
963 db_nsr_update["config-status"] = old_status
964 db_nsr_update["detailed-status"] = old_status
965 db_nslcmop_update["detailed-status"] = old_status
966
967 # wait until all are configured.
968 while time() <= start_deploy + self.total_deploy_timeout:
969 if db_nsr_update:
970 self.update_db_2("nsrs", nsr_id, db_nsr_update)
971 if db_nslcmop_update:
972 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
973 # TODO add a fake task that set n2vc_event after some time
974 await n2vc_info["n2vc_event"].wait()
975 n2vc_info["n2vc_event"].clear()
976 all_active = True
977 status_map = {}
978 n2vc_error_text = [] # contain text error list. If empty no one is in error status
979 now = time()
980 for vca_deployed in vca_deployed_list:
981 vca_status = vca_deployed["operational-status"]
982 if vca_status not in status_map:
983 # Initialize it
984 status_map[vca_status] = 0
985 status_map[vca_status] += 1
986
987 if vca_status == "active":
988 vca_deployed.pop("time_first_error", None)
989 vca_deployed.pop("status_first_error", None)
990 continue
991
992 all_active = False
993 if vca_status in ("error", "blocked"):
994 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
995 # if not first time in this status error
996 if not vca_deployed.get("time_first_error"):
997 vca_deployed["time_first_error"] = now
998 continue
999 if vca_deployed.get("time_first_error") and \
1000 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1001 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1002 .format(vca_deployed["member-vnf-index"],
1003 vca_deployed["vdu_id"], vca_status,
1004 vca_deployed["detailed-status-error"]))
1005
1006 if all_active:
1007 break
1008 elif n2vc_error_text:
1009 db_nsr_update["config-status"] = "failed"
1010 error_text = "fail configuring " + ";".join(n2vc_error_text)
1011 db_nsr_update["detailed-status"] = error_text
1012 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1013 db_nslcmop_update["detailed-status"] = error_text
1014 db_nslcmop_update["statusEnteredTime"] = time()
1015 configuration_failed = True
1016 break
1017 else:
1018 cs = "configuring: "
1019 separator = ""
1020 for status, num in status_map.items():
1021 cs += separator + "{}: {}".format(status, num)
1022 separator = ", "
1023 if old_status != cs:
1024 db_nsr_update["config-status"] = cs
1025 db_nsr_update["detailed-status"] = cs
1026 db_nslcmop_update["detailed-status"] = cs
1027 old_status = cs
1028 else: # total_deploy_timeout
1029 raise LcmException("Timeout waiting ns to be configured")
1030
1031 if not configuration_failed:
1032 # all is done
1033 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1034 db_nslcmop_update["statusEnteredTime"] = time()
1035 db_nslcmop_update["detailed-status"] = "done"
1036 db_nsr_update["config-status"] = "configured"
1037 db_nsr_update["detailed-status"] = "done"
1038
1039 return
1040
1041 except (ROclient.ROClientException, DbException, LcmException) as e:
1042 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1043 exc = e
1044 except asyncio.CancelledError:
1045 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1046 exc = "Operation was cancelled"
1047 except Exception as e:
1048 exc = traceback.format_exc()
1049 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1050 exc_info=True)
1051 finally:
1052 if exc:
1053 if db_nsr:
1054 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1055 db_nsr_update["operational-status"] = "failed"
1056 if db_nslcmop:
1057 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1058 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1059 db_nslcmop_update["statusEnteredTime"] = time()
1060 try:
1061 if db_nsr:
1062 db_nsr_update["_admin.nslcmop"] = None
1063 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1064 if db_nslcmop_update:
1065 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1066 except DbException as e:
1067 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1068 if nslcmop_operation_state:
1069 try:
1070 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1071 "operationState": nslcmop_operation_state},
1072 loop=self.loop)
1073 except Exception as e:
1074 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1075
1076 self.logger.debug(logging_text + "Exit")
1077 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1078
1079 async def _destroy_charm(self, model, application):
1080 """
1081 Order N2VC destroy a charm
1082 :param model:
1083 :param application:
1084 :return: True if charm does not exist. False if it exist
1085 """
1086 if not await self.n2vc.HasApplication(model, application):
1087 return True # Already removed
1088 await self.n2vc.RemoveCharms(model, application)
1089 return False
1090
1091 async def _wait_charm_destroyed(self, model, application, timeout):
1092 """
1093 Wait until charm does not exist
1094 :param model:
1095 :param application:
1096 :param timeout:
1097 :return: True if not exist, False if timeout
1098 """
1099 while True:
1100 if not await self.n2vc.HasApplication(model, application):
1101 return True
1102 if timeout < 0:
1103 return False
1104 await asyncio.sleep(10)
1105 timeout -= 10
1106
1107 async def terminate(self, nsr_id, nslcmop_id):
1108 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1109 self.logger.debug(logging_text + "Enter")
1110 db_nsr = None
1111 db_nslcmop = None
1112 exc = None
1113 failed_detail = [] # annotates all failed error messages
1114 vca_time_destroy = None # time of where destroy charm order
1115 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1116 db_nslcmop_update = {}
1117 nslcmop_operation_state = None
1118 try:
1119 step = "Getting nslcmop={} from db".format(nslcmop_id)
1120 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1121 step = "Getting nsr={} from db".format(nsr_id)
1122 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1123 # nsd = db_nsr["nsd"]
1124 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1125 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1126 return
1127 # #TODO check if VIM is creating and wait
1128 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1129
1130 db_nsr_update["operational-status"] = "terminating"
1131 db_nsr_update["config-status"] = "terminating"
1132
1133 if nsr_deployed and nsr_deployed.get("VCA"):
1134 try:
1135 step = "Scheduling configuration charms removing"
1136 db_nsr_update["detailed-status"] = "Deleting charms"
1137 self.logger.debug(logging_text + step)
1138 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1139 # for backward compatibility
1140 if isinstance(nsr_deployed["VCA"], dict):
1141 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1142 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1143 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1144
1145 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1146 if vca_deployed:
1147 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1148 vca_deployed.clear()
1149 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1150 else:
1151 vca_time_destroy = time()
1152 except Exception as e:
1153 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1154
1155 # remove from RO
1156 RO_fail = False
1157 RO = ROclient.ROClient(self.loop, **self.ro_config)
1158
1159 # Delete ns
1160 RO_nsr_id = RO_delete_action = None
1161 if nsr_deployed and nsr_deployed.get("RO"):
1162 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1163 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1164 try:
1165 if RO_nsr_id:
1166 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1167 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1168 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1169 self.logger.debug(logging_text + step)
1170 desc = await RO.delete("ns", RO_nsr_id)
1171 RO_delete_action = desc["action_id"]
1172 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1173 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1174 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1175 if RO_delete_action:
1176 # wait until NS is deleted from VIM
1177 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1178 format(RO_nsr_id, RO_delete_action)
1179 detailed_status_old = None
1180 self.logger.debug(logging_text + step)
1181
1182 delete_timeout = 20 * 60 # 20 minutes
1183 while delete_timeout > 0:
1184 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1185 extra_item_id=RO_delete_action)
1186 ns_status, ns_status_info = RO.check_action_status(desc)
1187 if ns_status == "ERROR":
1188 raise ROclient.ROClientException(ns_status_info)
1189 elif ns_status == "BUILD":
1190 detailed_status = step + "; {}".format(ns_status_info)
1191 elif ns_status == "ACTIVE":
1192 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1193 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1194 break
1195 else:
1196 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1197 if detailed_status != detailed_status_old:
1198 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1199 db_nsr_update["detailed-status"] = detailed_status
1200 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1201 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1202 await asyncio.sleep(5, loop=self.loop)
1203 delete_timeout -= 5
1204 else: # delete_timeout <= 0:
1205 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1206
1207 except ROclient.ROClientException as e:
1208 if e.http_code == 404: # not found
1209 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1210 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1211 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1212 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1213 elif e.http_code == 409: # conflict
1214 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1215 self.logger.debug(logging_text + failed_detail[-1])
1216 RO_fail = True
1217 else:
1218 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1219 self.logger.error(logging_text + failed_detail[-1])
1220 RO_fail = True
1221
1222 # Delete nsd
1223 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1224 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1225 try:
1226 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1227 "Deleting nsd at RO"
1228 await RO.delete("nsd", RO_nsd_id)
1229 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1230 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1231 except ROclient.ROClientException as e:
1232 if e.http_code == 404: # not found
1233 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1234 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1235 elif e.http_code == 409: # conflict
1236 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1237 self.logger.debug(logging_text + failed_detail[-1])
1238 RO_fail = True
1239 else:
1240 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1241 self.logger.error(logging_text + failed_detail[-1])
1242 RO_fail = True
1243
1244 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1245 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1246 if not RO_vnfd_id:
1247 continue
1248 try:
1249 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1250 "Deleting vnfd={} at RO".format(vnf_id)
1251 await RO.delete("vnfd", RO_vnfd_id)
1252 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1253 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1254 except ROclient.ROClientException as e:
1255 if e.http_code == 404: # not found
1256 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1257 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1258 elif e.http_code == 409: # conflict
1259 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1260 self.logger.debug(logging_text + failed_detail[-1])
1261 else:
1262 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1263 self.logger.error(logging_text + failed_detail[-1])
1264
1265 # wait until charm deleted
1266 if vca_time_destroy:
1267 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1268 "Waiting for deletion of configuration charms"
1269 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1270 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1271 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1272 if not vca_deployed:
1273 continue
1274 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1275 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1276 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1277 timeout):
1278 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1279 vca_deployed["application"]))
1280 else:
1281 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1282
1283 if failed_detail:
1284 self.logger.error(logging_text + " ;".join(failed_detail))
1285 db_nsr_update["operational-status"] = "failed"
1286 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1287 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1288 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1289 db_nslcmop_update["statusEnteredTime"] = time()
1290 elif db_nslcmop["operationParams"].get("autoremove"):
1291 self.db.del_one("nsrs", {"_id": nsr_id})
1292 db_nsr = None
1293 db_nsr_update.clear()
1294 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1295 db_nslcmop = None
1296 nslcmop_operation_state = "COMPLETED"
1297 db_nslcmop_update.clear()
1298 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1299 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1300 {"_admin.usageState": "NOT_IN_USE", "_admin.usage": None})
1301 self.logger.debug(logging_text + "Delete from database")
1302 else:
1303 db_nsr_update["operational-status"] = "terminated"
1304 db_nsr_update["detailed-status"] = "Done"
1305 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1306 db_nslcmop_update["detailed-status"] = "Done"
1307 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1308 db_nslcmop_update["statusEnteredTime"] = time()
1309
1310 except (ROclient.ROClientException, DbException) as e:
1311 self.logger.error(logging_text + "Exit Exception {}".format(e))
1312 exc = e
1313 except asyncio.CancelledError:
1314 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1315 exc = "Operation was cancelled"
1316 except Exception as e:
1317 exc = traceback.format_exc()
1318 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1319 finally:
1320 if exc and db_nslcmop:
1321 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1322 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1323 db_nslcmop_update["statusEnteredTime"] = time()
1324 try:
1325 if db_nslcmop and db_nslcmop_update:
1326 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1327 if db_nsr:
1328 db_nsr_update["_admin.nslcmop"] = None
1329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1330 except DbException as e:
1331 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1332 if nslcmop_operation_state:
1333 try:
1334 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1335 "operationState": nslcmop_operation_state},
1336 loop=self.loop)
1337 except Exception as e:
1338 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1339 self.logger.debug(logging_text + "Exit")
1340 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1341
1342 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1343 primitive, primitive_params):
1344
1345 for vca_deployed in db_deployed["VCA"]:
1346 if not vca_deployed:
1347 continue
1348 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1349 continue
1350 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1351 continue
1352 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1353 continue
1354 break
1355 else:
1356 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1357 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1358 model_name = vca_deployed.get("model")
1359 application_name = vca_deployed.get("application")
1360 if not model_name or not application_name:
1361 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1362 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1363 if vca_deployed["operational-status"] != "active":
1364 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1365 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1366 callback = None # self.n2vc_callback
1367 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1368 await self.n2vc.login()
1369 task = asyncio.ensure_future(
1370 self.n2vc.ExecutePrimitive(
1371 model_name,
1372 application_name,
1373 primitive, callback,
1374 *callback_args,
1375 **primitive_params
1376 )
1377 )
1378 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1379 # db_nsr, db_nslcmop, member_vnf_index))
1380 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1381 # wait until completed with timeout
1382 await asyncio.wait((task,), timeout=600)
1383
1384 result = "FAILED" # by default
1385 result_detail = ""
1386 if task.cancelled():
1387 result_detail = "Task has been cancelled"
1388 elif task.done():
1389 exc = task.exception()
1390 if exc:
1391 result_detail = str(exc)
1392 else:
1393 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1394 result = "COMPLETED"
1395 result_detail = "Done"
1396 else: # timeout
1397 # TODO Should it be cancelled?!!
1398 task.cancel()
1399 result_detail = "timeout"
1400 return result, result_detail
1401
1402 async def action(self, nsr_id, nslcmop_id):
1403 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1404 self.logger.debug(logging_text + "Enter")
1405 # get all needed from database
1406 db_nsr = None
1407 db_nslcmop = None
1408 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1409 db_nslcmop_update = {}
1410 nslcmop_operation_state = None
1411 exc = None
1412 try:
1413 step = "Getting information from database"
1414 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1415 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1416 nsr_deployed = db_nsr["_admin"].get("deployed")
1417 nsr_name = db_nsr["name"]
1418 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1419 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1420 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1421 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1422
1423 # look if previous tasks in process
1424 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1425 if task_dependency:
1426 step = db_nslcmop_update["detailed-status"] = \
1427 "Waiting for related tasks to be completed: {}".format(task_name)
1428 self.logger.debug(logging_text + step)
1429 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1430 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1431 if pending:
1432 raise LcmException("Timeout waiting related tasks to be completed")
1433
1434 # for backward compatibility
1435 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1436 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1437 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1438 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1439
1440 # TODO check if ns is in a proper status
1441 primitive = db_nslcmop["operationParams"]["primitive"]
1442 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1443 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1444 vdu_name, vdu_count_index, primitive,
1445 primitive_params)
1446 db_nslcmop_update["detailed-status"] = result_detail
1447 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1448 db_nslcmop_update["statusEnteredTime"] = time()
1449 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1450 return # database update is called inside finally
1451
1452 except (DbException, LcmException) as e:
1453 self.logger.error(logging_text + "Exit Exception {}".format(e))
1454 exc = e
1455 except asyncio.CancelledError:
1456 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1457 exc = "Operation was cancelled"
1458 except Exception as e:
1459 exc = traceback.format_exc()
1460 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1461 finally:
1462 if exc and db_nslcmop:
1463 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1464 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1465 db_nslcmop_update["statusEnteredTime"] = time()
1466 try:
1467 if db_nslcmop_update:
1468 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1469 if db_nsr:
1470 db_nsr_update["_admin.nslcmop"] = None
1471 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1472 except DbException as e:
1473 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1474 self.logger.debug(logging_text + "Exit")
1475 if nslcmop_operation_state:
1476 try:
1477 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1478 "operationState": nslcmop_operation_state},
1479 loop=self.loop)
1480 except Exception as e:
1481 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1482 self.logger.debug(logging_text + "Exit")
1483 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1484
1485 async def scale(self, nsr_id, nslcmop_id):
1486 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1487 self.logger.debug(logging_text + "Enter")
1488 # get all needed from database
1489 db_nsr = None
1490 db_nslcmop = None
1491 db_nslcmop_update = {}
1492 nslcmop_operation_state = None
1493 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1494 exc = None
1495 # in case of error, indicates what part of scale was failed to put nsr at error status
1496 scale_process = None
1497 old_operational_status = ""
1498 old_config_status = ""
1499 vnfr_scaled = False
1500 try:
1501 step = "Getting nslcmop from database"
1502 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1503 step = "Getting nsr from database"
1504 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1505 nsr_name = db_nsr["name"]
1506 old_operational_status = db_nsr["operational-status"]
1507 old_config_status = db_nsr["config-status"]
1508
1509 # look if previous tasks in process
1510 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1511 if task_dependency:
1512 step = db_nslcmop_update["detailed-status"] = \
1513 "Waiting for related tasks to be completed: {}".format(task_name)
1514 self.logger.debug(logging_text + step)
1515 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1516 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1517 if pending:
1518 raise LcmException("Timeout waiting related tasks to be completed")
1519
1520 step = "Parsing scaling parameters"
1521 db_nsr_update["operational-status"] = "scaling"
1522 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1523 nsr_deployed = db_nsr["_admin"].get("deployed")
1524 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1525 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1526 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1527 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1528 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1529
1530 # for backward compatibility
1531 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1532 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1533 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1534 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1535
1536 step = "Getting vnfr from database"
1537 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1538 step = "Getting vnfd from database"
1539 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1540 step = "Getting scaling-group-descriptor"
1541 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1542 if scaling_descriptor["name"] == scaling_group:
1543 break
1544 else:
1545 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1546 "at vnfd:scaling-group-descriptor".format(scaling_group))
1547 # cooldown_time = 0
1548 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1549 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1550 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1551 # break
1552
1553 # TODO check if ns is in a proper status
1554 step = "Sending scale order to RO"
1555 nb_scale_op = 0
1556 if not db_nsr["_admin"].get("scaling-group"):
1557 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1558 admin_scale_index = 0
1559 else:
1560 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1561 if admin_scale_info["name"] == scaling_group:
1562 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1563 break
1564 else: # not found, set index one plus last element and add new entry with the name
1565 admin_scale_index += 1
1566 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1567 RO_scaling_info = []
1568 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1569 if scaling_type == "SCALE_OUT":
1570 # count if max-instance-count is reached
1571 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1572 max_instance_count = int(scaling_descriptor["max-instance-count"])
1573 if nb_scale_op >= max_instance_count:
1574 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1575 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1576 nb_scale_op = nb_scale_op + 1
1577 vdu_scaling_info["scaling_direction"] = "OUT"
1578 vdu_scaling_info["vdu-create"] = {}
1579 for vdu_scale_info in scaling_descriptor["vdu"]:
1580 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1581 "type": "create", "count": vdu_scale_info.get("count", 1)})
1582 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1583 elif scaling_type == "SCALE_IN":
1584 # count if min-instance-count is reached
1585 min_instance_count = 0
1586 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1587 min_instance_count = int(scaling_descriptor["min-instance-count"])
1588 if nb_scale_op <= min_instance_count:
1589 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1590 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1591 nb_scale_op = nb_scale_op - 1
1592 vdu_scaling_info["scaling_direction"] = "IN"
1593 vdu_scaling_info["vdu-delete"] = {}
1594 for vdu_scale_info in scaling_descriptor["vdu"]:
1595 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1596 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1597 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1598
1599 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1600 vdu_create = vdu_scaling_info.get("vdu-create")
1601 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1602 if vdu_scaling_info["scaling_direction"] == "IN":
1603 for vdur in reversed(db_vnfr["vdur"]):
1604 if vdu_delete.get(vdur["vdu-id-ref"]):
1605 vdu_delete[vdur["vdu-id-ref"]] -= 1
1606 vdu_scaling_info["vdu"].append({
1607 "name": vdur["name"],
1608 "vdu_id": vdur["vdu-id-ref"],
1609 "interface": []
1610 })
1611 for interface in vdur["interfaces"]:
1612 vdu_scaling_info["vdu"][-1]["interface"].append({
1613 "name": interface["name"],
1614 "ip_address": interface["ip-address"],
1615 "mac_address": interface.get("mac-address"),
1616 })
1617 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1618
1619 # execute primitive service PRE-SCALING
1620 step = "Executing pre-scale vnf-config-primitive"
1621 if scaling_descriptor.get("scaling-config-action"):
1622 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1623 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1624 and scaling_type == "SCALE_IN":
1625 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1626 step = db_nslcmop_update["detailed-status"] = \
1627 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1628 # look for primitive
1629 primitive_params = {}
1630 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1631 if config_primitive["name"] == vnf_config_primitive:
1632 for parameter in config_primitive.get("parameter", ()):
1633 if 'default-value' in parameter and \
1634 parameter['default-value'] == "<VDU_SCALE_INFO>":
1635 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1636 default_flow_style=True,
1637 width=256)
1638 break
1639 else:
1640 raise LcmException(
1641 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1642 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1643 "primitive".format(scaling_group, config_primitive))
1644 scale_process = "VCA"
1645 db_nsr_update["config-status"] = "configuring pre-scaling"
1646 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1647 None, None, None, vnf_config_primitive,
1648 primitive_params)
1649 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1650 vnf_config_primitive, result, result_detail))
1651 if result == "FAILED":
1652 raise LcmException(result_detail)
1653 db_nsr_update["config-status"] = old_config_status
1654 scale_process = None
1655
1656 if RO_scaling_info:
1657 scale_process = "RO"
1658 RO = ROclient.ROClient(self.loop, **self.ro_config)
1659 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1660 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1661 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1662 # wait until ready
1663 RO_nslcmop_id = RO_desc["instance_action_id"]
1664 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1665
1666 RO_task_done = False
1667 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1668 detailed_status_old = None
1669 self.logger.debug(logging_text + step)
1670
1671 deployment_timeout = 1 * 3600 # One hour
1672 while deployment_timeout > 0:
1673 if not RO_task_done:
1674 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1675 extra_item_id=RO_nslcmop_id)
1676 ns_status, ns_status_info = RO.check_action_status(desc)
1677 if ns_status == "ERROR":
1678 raise ROclient.ROClientException(ns_status_info)
1679 elif ns_status == "BUILD":
1680 detailed_status = step + "; {}".format(ns_status_info)
1681 elif ns_status == "ACTIVE":
1682 RO_task_done = True
1683 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1684 self.logger.debug(logging_text + step)
1685 else:
1686 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1687 else:
1688 desc = await RO.show("ns", RO_nsr_id)
1689 ns_status, ns_status_info = RO.check_ns_status(desc)
1690 if ns_status == "ERROR":
1691 raise ROclient.ROClientException(ns_status_info)
1692 elif ns_status == "BUILD":
1693 detailed_status = step + "; {}".format(ns_status_info)
1694 elif ns_status == "ACTIVE":
1695 step = detailed_status = \
1696 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1697 if not vnfr_scaled:
1698 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1699 vnfr_scaled = True
1700 try:
1701 desc = await RO.show("ns", RO_nsr_id)
1702 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1703 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1704 break
1705 except LcmExceptionNoMgmtIP:
1706 pass
1707 else:
1708 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1709 if detailed_status != detailed_status_old:
1710 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1711 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1712
1713 await asyncio.sleep(5, loop=self.loop)
1714 deployment_timeout -= 5
1715 if deployment_timeout <= 0:
1716 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1717
1718 # update VDU_SCALING_INFO with the obtained ip_addresses
1719 if vdu_scaling_info["scaling_direction"] == "OUT":
1720 for vdur in reversed(db_vnfr["vdur"]):
1721 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1722 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1723 vdu_scaling_info["vdu"].append({
1724 "name": vdur["name"],
1725 "vdu_id": vdur["vdu-id-ref"],
1726 "interface": []
1727 })
1728 for interface in vdur["interfaces"]:
1729 vdu_scaling_info["vdu"][-1]["interface"].append({
1730 "name": interface["name"],
1731 "ip_address": interface["ip-address"],
1732 "mac_address": interface.get("mac-address"),
1733 })
1734 del vdu_scaling_info["vdu-create"]
1735
1736 scale_process = None
1737 if db_nsr_update:
1738 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1739
1740 # execute primitive service POST-SCALING
1741 step = "Executing post-scale vnf-config-primitive"
1742 if scaling_descriptor.get("scaling-config-action"):
1743 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1744 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1745 and scaling_type == "SCALE_OUT":
1746 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1747 step = db_nslcmop_update["detailed-status"] = \
1748 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1749 # look for primitive
1750 primitive_params = {}
1751 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1752 if config_primitive["name"] == vnf_config_primitive:
1753 for parameter in config_primitive.get("parameter", ()):
1754 if 'default-value' in parameter and \
1755 parameter['default-value'] == "<VDU_SCALE_INFO>":
1756 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1757 default_flow_style=True,
1758 width=256)
1759 break
1760 else:
1761 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1762 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1763 "match any vnf-configuration:config-primitive".format(scaling_group,
1764 config_primitive))
1765 scale_process = "VCA"
1766 db_nsr_update["config-status"] = "configuring post-scaling"
1767
1768 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1769 None, None, None, vnf_config_primitive,
1770 primitive_params)
1771 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1772 vnf_config_primitive, result, result_detail))
1773 if result == "FAILED":
1774 raise LcmException(result_detail)
1775 db_nsr_update["config-status"] = old_config_status
1776 scale_process = None
1777
1778 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1779 db_nslcmop_update["statusEnteredTime"] = time()
1780 db_nslcmop_update["detailed-status"] = "done"
1781 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1782 db_nsr_update["operational-status"] = old_operational_status
1783 db_nsr_update["config-status"] = old_config_status
1784 return
1785 except (ROclient.ROClientException, DbException, LcmException) as e:
1786 self.logger.error(logging_text + "Exit Exception {}".format(e))
1787 exc = e
1788 except asyncio.CancelledError:
1789 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1790 exc = "Operation was cancelled"
1791 except Exception as e:
1792 exc = traceback.format_exc()
1793 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1794 finally:
1795 if exc:
1796 if db_nslcmop:
1797 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1798 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1799 db_nslcmop_update["statusEnteredTime"] = time()
1800 if db_nsr:
1801 db_nsr_update["operational-status"] = old_operational_status
1802 db_nsr_update["config-status"] = old_config_status
1803 db_nsr_update["detailed-status"] = ""
1804 db_nsr_update["_admin.nslcmop"] = None
1805 if scale_process:
1806 if "VCA" in scale_process:
1807 db_nsr_update["config-status"] = "failed"
1808 if "RO" in scale_process:
1809 db_nsr_update["operational-status"] = "failed"
1810 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1811 exc)
1812 try:
1813 if db_nslcmop and db_nslcmop_update:
1814 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1815 if db_nsr:
1816 db_nsr_update["_admin.nslcmop"] = None
1817 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1818 except DbException as e:
1819 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1820 if nslcmop_operation_state:
1821 try:
1822 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1823 "operationState": nslcmop_operation_state},
1824 loop=self.loop)
1825 # if cooldown_time:
1826 # await asyncio.sleep(cooldown_time)
1827 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1828 except Exception as e:
1829 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1830 self.logger.debug(logging_text + "Exit")
1831 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")