1aaf2c56dfa91fac22e71adf89a0c66c384f8965
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25
26 import ROclient
27 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
28
29 from osm_common.dbbase import DbException
30 from osm_common.fsbase import FsException
31 from n2vc.vnf import N2VC
32
33 from copy import copy, deepcopy
34 from http import HTTPStatus
35 from time import time
36 from uuid import uuid4
37
38 __author__ = "Alfonso Tierno"
39
40
41 def get_iterable(in_dict, in_key):
42 """
43 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
44 :param in_dict: a dictionary
45 :param in_key: the key to look for at in_dict
46 :return: in_dict[in_var] or () if it is None or not present
47 """
48 if not in_dict.get(in_key):
49 return ()
50 return in_dict[in_key]
51
52
53 def populate_dict(target_dict, key_list, value):
54 """
55 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
56 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
57 :param target_dict: dictionary to be changed
58 :param key_list: list of keys to insert at target_dict
59 :param value:
60 :return: None
61 """
62 for key in key_list[0:-1]:
63 if key not in target_dict:
64 target_dict[key] = {}
65 target_dict = target_dict[key]
66 target_dict[key_list[-1]] = value
67
68
69 class NsLcm(LcmBase):
70 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
71 total_deploy_timeout = 2 * 3600 # global timeout for deployment
72 timeout_charm_delete = 10 * 60
73
74 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
75 """
76 Init, Connect to database, filesystem storage, and messaging
77 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
78 :return: None
79 """
80 # logging
81 self.logger = logging.getLogger('lcm.ns')
82 self.loop = loop
83 self.lcm_tasks = lcm_tasks
84
85 super().__init__(db, msg, fs, self.logger)
86
87 self.ro_config = ro_config
88
89 self.n2vc = N2VC(
90 log=self.logger,
91 server=vca_config['host'],
92 port=vca_config['port'],
93 user=vca_config['user'],
94 secret=vca_config['secret'],
95 # TODO: This should point to the base folder where charms are stored,
96 # if there is a common one (like object storage). Otherwise, leave
97 # it unset and pass it via DeployCharms
98 # artifacts=vca_config[''],
99 artifacts=None,
100 )
101
102 def vnfd2RO(self, vnfd, new_id=None):
103 """
104 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
105 :param vnfd: input vnfd
106 :param new_id: overrides vnf id if provided
107 :return: copy of vnfd
108 """
109 ci_file = None
110 try:
111 vnfd_RO = deepcopy(vnfd)
112 vnfd_RO.pop("_id", None)
113 vnfd_RO.pop("_admin", None)
114 if new_id:
115 vnfd_RO["id"] = new_id
116 for vdu in vnfd_RO.get("vdu", ()):
117 if "cloud-init-file" in vdu:
118 base_folder = vnfd["_admin"]["storage"]
119 clout_init_file = "{}/{}/cloud_init/{}".format(
120 base_folder["folder"],
121 base_folder["pkg-dir"],
122 vdu["cloud-init-file"]
123 )
124 ci_file = self.fs.file_open(clout_init_file, "r")
125 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
126 # convert to base 64 or similar
127 clout_init_content = ci_file.read()
128 ci_file.close()
129 ci_file = None
130 vdu.pop("cloud-init-file", None)
131 vdu["cloud-init"] = clout_init_content
132 # remnove unused by RO configuration, monitoring, scaling
133 vnfd_RO.pop("vnf-configuration", None)
134 vnfd_RO.pop("monitoring-param", None)
135 vnfd_RO.pop("scaling-group-descriptor", None)
136 return vnfd_RO
137 except FsException as e:
138 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
139 finally:
140 if ci_file:
141 ci_file.close()
142
143 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
144 """
145 Callback both for charm status change and task completion
146 :param model_name: Charm model name
147 :param application_name: Charm application name
148 :param status: Can be
149 - blocked: The unit needs manual intervention
150 - maintenance: The unit is actively deploying/configuring
151 - waiting: The unit is waiting for another charm to be ready
152 - active: The unit is deployed, configured, and ready
153 - error: The charm has failed and needs attention.
154 - terminated: The charm has been destroyed
155 - removing,
156 - removed
157 :param message: detailed message error
158 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
159 nsr_id:
160 nslcmop_id:
161 lcmOperationType: currently "instantiate"
162 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
163 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
164 n2vc_event: event used to notify instantiation task that some change has been produced
165 :param task: None for charm status change, or task for completion task callback
166 :return:
167 """
168 try:
169 nsr_id = n2vc_info["nsr_id"]
170 deployed = n2vc_info["deployed"]
171 db_nsr_update = n2vc_info["db_update"]
172 nslcmop_id = n2vc_info["nslcmop_id"]
173 ns_operation = n2vc_info["lcmOperationType"]
174 n2vc_event = n2vc_info["n2vc_event"]
175 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
176 application_name)
177 for vca_index, vca_deployed in enumerate(deployed):
178 if not vca_deployed:
179 continue
180 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
181 break
182 else:
183 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
184 return
185 if task:
186 if task.cancelled():
187 self.logger.debug(logging_text + " task Cancelled")
188 vca_deployed['operational-status'] = "error"
189 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
190 vca_deployed['detailed-status'] = "Task Cancelled"
191 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
192
193 elif task.done():
194 exc = task.exception()
195 if exc:
196 self.logger.error(logging_text + " task Exception={}".format(exc))
197 vca_deployed['operational-status'] = "error"
198 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
199 vca_deployed['detailed-status'] = str(exc)
200 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
201 else:
202 self.logger.debug(logging_text + " task Done")
203 # task is Done, but callback is still ongoing. So ignore
204 return
205 elif status:
206 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
207 if vca_deployed['operational-status'] == status:
208 return # same status, ignore
209 vca_deployed['operational-status'] = status
210 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
211 vca_deployed['detailed-status'] = str(message)
212 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
213 else:
214 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
215 return
216 # wake up instantiate task
217 n2vc_event.set()
218 except Exception as e:
219 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
220
221 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
222 """
223 Creates a RO ns descriptor from OSM ns_instantiate params
224 :param ns_params: OSM instantiate params
225 :return: The RO ns descriptor
226 """
227 vim_2_RO = {}
228 # TODO feature 1417: Check that no instantiation is set over PDU
229 # check if PDU forces a concrete vim-network-id and add it
230 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
231
232 def vim_account_2_RO(vim_account):
233 if vim_account in vim_2_RO:
234 return vim_2_RO[vim_account]
235
236 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
237 if db_vim["_admin"]["operationalState"] != "ENABLED":
238 raise LcmException("VIM={} is not available. operationalState={}".format(
239 vim_account, db_vim["_admin"]["operationalState"]))
240 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
241 vim_2_RO[vim_account] = RO_vim_id
242 return RO_vim_id
243
244 def ip_profile_2_RO(ip_profile):
245 RO_ip_profile = deepcopy((ip_profile))
246 if "dns-server" in RO_ip_profile:
247 if isinstance(RO_ip_profile["dns-server"], list):
248 RO_ip_profile["dns-address"] = []
249 for ds in RO_ip_profile.pop("dns-server"):
250 RO_ip_profile["dns-address"].append(ds['address'])
251 else:
252 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
253 if RO_ip_profile.get("ip-version") == "ipv4":
254 RO_ip_profile["ip-version"] = "IPv4"
255 if RO_ip_profile.get("ip-version") == "ipv6":
256 RO_ip_profile["ip-version"] = "IPv6"
257 if "dhcp-params" in RO_ip_profile:
258 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
259 return RO_ip_profile
260
261 if not ns_params:
262 return None
263 RO_ns_params = {
264 # "name": ns_params["nsName"],
265 # "description": ns_params.get("nsDescription"),
266 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
267 # "scenario": ns_params["nsdId"],
268 }
269 if n2vc_key_list:
270 for vnfd_ref, vnfd in vnfd_dict.items():
271 vdu_needed_access = []
272 mgmt_cp = None
273 if vnfd.get("vnf-configuration"):
274 if vnfd.get("mgmt-interface"):
275 if vnfd["mgmt-interface"].get("vdu-id"):
276 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
277 elif vnfd["mgmt-interface"].get("cp"):
278 mgmt_cp = vnfd["mgmt-interface"]["cp"]
279
280 for vdu in vnfd.get("vdu", ()):
281 if vdu.get("vdu-configuration"):
282 vdu_needed_access.append(vdu["id"])
283 elif mgmt_cp:
284 for vdu_interface in vdu.get("interface"):
285 if vdu_interface.get("external-connection-point-ref") and \
286 vdu_interface["external-connection-point-ref"] == mgmt_cp:
287 vdu_needed_access.append(vdu["id"])
288 mgmt_cp = None
289 break
290
291 if vdu_needed_access:
292 for vnf_member in nsd.get("constituent-vnfd"):
293 if vnf_member["vnfd-id-ref"] != vnfd_ref:
294 continue
295 for vdu in vdu_needed_access:
296 populate_dict(RO_ns_params,
297 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
298 n2vc_key_list)
299
300 if ns_params.get("vduImage"):
301 RO_ns_params["vduImage"] = ns_params["vduImage"]
302
303 if ns_params.get("ssh_keys"):
304 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
305 for vnf_params in get_iterable(ns_params, "vnf"):
306 for constituent_vnfd in nsd["constituent-vnfd"]:
307 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
308 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
309 break
310 else:
311 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
312 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
313 if vnf_params.get("vimAccountId"):
314 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
315 vim_account_2_RO(vnf_params["vimAccountId"]))
316
317 for vdu_params in get_iterable(vnf_params, "vdu"):
318 # TODO feature 1417: check that this VDU exist and it is not a PDU
319 if vdu_params.get("volume"):
320 for volume_params in vdu_params["volume"]:
321 if volume_params.get("vim-volume-id"):
322 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
323 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
324 volume_params["vim-volume-id"])
325 if vdu_params.get("interface"):
326 for interface_params in vdu_params["interface"]:
327 if interface_params.get("ip-address"):
328 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
329 vdu_params["id"], "interfaces", interface_params["name"],
330 "ip_address"),
331 interface_params["ip-address"])
332 if interface_params.get("mac-address"):
333 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
334 vdu_params["id"], "interfaces", interface_params["name"],
335 "mac_address"),
336 interface_params["mac-address"])
337 if interface_params.get("floating-ip-required"):
338 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
339 vdu_params["id"], "interfaces", interface_params["name"],
340 "floating-ip"),
341 interface_params["floating-ip-required"])
342
343 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
344 if internal_vld_params.get("vim-network-name"):
345 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
346 internal_vld_params["name"], "vim-network-name"),
347 internal_vld_params["vim-network-name"])
348 if internal_vld_params.get("vim-network-id"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
350 internal_vld_params["name"], "vim-network-id"),
351 internal_vld_params["vim-network-id"])
352 if internal_vld_params.get("ip-profile"):
353 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
354 internal_vld_params["name"], "ip-profile"),
355 ip_profile_2_RO(internal_vld_params["ip-profile"]))
356
357 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
358 # look for interface
359 iface_found = False
360 for vdu_descriptor in vnf_descriptor["vdu"]:
361 for vdu_interface in vdu_descriptor["interface"]:
362 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
363 if icp_params.get("ip-address"):
364 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
365 vdu_descriptor["id"], "interfaces",
366 vdu_interface["name"], "ip_address"),
367 icp_params["ip-address"])
368
369 if icp_params.get("mac-address"):
370 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
371 vdu_descriptor["id"], "interfaces",
372 vdu_interface["name"], "mac_address"),
373 icp_params["mac-address"])
374 iface_found = True
375 break
376 if iface_found:
377 break
378 else:
379 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
380 "internal-vld:id-ref={} is not present at vnfd:internal-"
381 "connection-point".format(vnf_params["member-vnf-index"],
382 icp_params["id-ref"]))
383
384 for vld_params in get_iterable(ns_params, "vld"):
385 if "ip-profile" in vld_params:
386 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
387 ip_profile_2_RO(vld_params["ip-profile"]))
388 if vld_params.get("vim-network-name"):
389 RO_vld_sites = []
390 if isinstance(vld_params["vim-network-name"], dict):
391 for vim_account, vim_net in vld_params["vim-network-name"].items():
392 RO_vld_sites.append({
393 "netmap-use": vim_net,
394 "datacenter": vim_account_2_RO(vim_account)
395 })
396 else: # isinstance str
397 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
398 if RO_vld_sites:
399 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
400 if vld_params.get("vim-network-id"):
401 RO_vld_sites = []
402 if isinstance(vld_params["vim-network-id"], dict):
403 for vim_account, vim_net in vld_params["vim-network-id"].items():
404 RO_vld_sites.append({
405 "netmap-use": vim_net,
406 "datacenter": vim_account_2_RO(vim_account)
407 })
408 else: # isinstance str
409 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
410 if RO_vld_sites:
411 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
412 if "vnfd-connection-point-ref" in vld_params:
413 for cp_params in vld_params["vnfd-connection-point-ref"]:
414 # look for interface
415 for constituent_vnfd in nsd["constituent-vnfd"]:
416 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
417 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
418 break
419 else:
420 raise LcmException(
421 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
422 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
423 match_cp = False
424 for vdu_descriptor in vnf_descriptor["vdu"]:
425 for interface_descriptor in vdu_descriptor["interface"]:
426 if interface_descriptor.get("external-connection-point-ref") == \
427 cp_params["vnfd-connection-point-ref"]:
428 match_cp = True
429 break
430 if match_cp:
431 break
432 else:
433 raise LcmException(
434 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
435 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
436 cp_params["member-vnf-index-ref"],
437 cp_params["vnfd-connection-point-ref"],
438 vnf_descriptor["id"]))
439 if cp_params.get("ip-address"):
440 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
441 vdu_descriptor["id"], "interfaces",
442 interface_descriptor["name"], "ip_address"),
443 cp_params["ip-address"])
444 if cp_params.get("mac-address"):
445 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
446 vdu_descriptor["id"], "interfaces",
447 interface_descriptor["name"], "mac_address"),
448 cp_params["mac-address"])
449 return RO_ns_params
450
451 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
452 # make a copy to do not change
453 vdu_create = copy(vdu_create)
454 vdu_delete = copy(vdu_delete)
455
456 vdurs = db_vnfr.get("vdur")
457 if vdurs is None:
458 vdurs = []
459 vdu_index = len(vdurs)
460 while vdu_index:
461 vdu_index -= 1
462 vdur = vdurs[vdu_index]
463 if vdur.get("pdu-type"):
464 continue
465 vdu_id_ref = vdur["vdu-id-ref"]
466 if vdu_create and vdu_create.get(vdu_id_ref):
467 for index in range(0, vdu_create[vdu_id_ref]):
468 vdur = deepcopy(vdur)
469 vdur["_id"] = str(uuid4())
470 vdur["count-index"] += 1
471 vdurs.insert(vdu_index+1+index, vdur)
472 del vdu_create[vdu_id_ref]
473 if vdu_delete and vdu_delete.get(vdu_id_ref):
474 del vdurs[vdu_index]
475 vdu_delete[vdu_id_ref] -= 1
476 if not vdu_delete[vdu_id_ref]:
477 del vdu_delete[vdu_id_ref]
478 # check all operations are done
479 if vdu_create or vdu_delete:
480 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
481 vdu_create))
482 if vdu_delete:
483 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
484 vdu_delete))
485
486 vnfr_update = {"vdur": vdurs}
487 db_vnfr["vdur"] = vdurs
488 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
489
490 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
491 """
492 Updates database nsr with the RO info for the created vld
493 :param ns_update_nsr: dictionary to be filled with the updated info
494 :param db_nsr: content of db_nsr. This is also modified
495 :param nsr_desc_RO: nsr descriptor from RO
496 :return: Nothing, LcmException is raised on errors
497 """
498
499 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
500 for net_RO in get_iterable(nsr_desc_RO, "nets"):
501 if vld["id"] != net_RO.get("ns_net_osm_id"):
502 continue
503 vld["vim-id"] = net_RO.get("vim_net_id")
504 vld["name"] = net_RO.get("vim_name")
505 vld["status"] = net_RO.get("status")
506 vld["status-detailed"] = net_RO.get("error_msg")
507 ns_update_nsr["vld.{}".format(vld_index)] = vld
508 break
509 else:
510 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
511
512 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
513 """
514 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
515 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
516 :param nsr_desc_RO: nsr descriptor from RO
517 :return: Nothing, LcmException is raised on errors
518 """
519 for vnf_index, db_vnfr in db_vnfrs.items():
520 for vnf_RO in nsr_desc_RO["vnfs"]:
521 if vnf_RO["member_vnf_index"] != vnf_index:
522 continue
523 vnfr_update = {}
524 if vnf_RO.get("ip_address"):
525 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
526 elif not db_vnfr.get("ip-address"):
527 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
528
529 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
530 vdur_RO_count_index = 0
531 if vdur.get("pdu-type"):
532 continue
533 for vdur_RO in get_iterable(vnf_RO, "vms"):
534 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
535 continue
536 if vdur["count-index"] != vdur_RO_count_index:
537 vdur_RO_count_index += 1
538 continue
539 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
540 vdur["ip-address"] = vdur_RO.get("ip_address")
541 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
542 vdur["name"] = vdur_RO.get("vim_name")
543 vdur["status"] = vdur_RO.get("status")
544 vdur["status-detailed"] = vdur_RO.get("error_msg")
545 for ifacer in get_iterable(vdur, "interfaces"):
546 for interface_RO in get_iterable(vdur_RO, "interfaces"):
547 if ifacer["name"] == interface_RO.get("internal_name"):
548 ifacer["ip-address"] = interface_RO.get("ip_address")
549 ifacer["mac-address"] = interface_RO.get("mac_address")
550 break
551 else:
552 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
553 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
554 vnfr_update["vdur.{}".format(vdu_index)] = vdur
555 break
556 else:
557 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
558 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
559
560 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
561 for net_RO in get_iterable(nsr_desc_RO, "nets"):
562 if vld["id"] != net_RO.get("vnf_net_osm_id"):
563 continue
564 vld["vim-id"] = net_RO.get("vim_net_id")
565 vld["name"] = net_RO.get("vim_name")
566 vld["status"] = net_RO.get("status")
567 vld["status-detailed"] = net_RO.get("error_msg")
568 vnfr_update["vld.{}".format(vld_index)] = vld
569 break
570 else:
571 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
572 vnf_index, vld["id"]))
573
574 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
575 break
576
577 else:
578 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
579
580 async def instantiate(self, nsr_id, nslcmop_id):
581 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
582 self.logger.debug(logging_text + "Enter")
583 # get all needed from database
584 start_deploy = time()
585 db_nsr = None
586 db_nslcmop = None
587 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
588 db_nslcmop_update = {}
589 nslcmop_operation_state = None
590 db_vnfrs = {}
591 RO_descriptor_number = 0 # number of descriptors created at RO
592 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
593 n2vc_info = {}
594 exc = None
595 try:
596 step = "Getting nslcmop={} from db".format(nslcmop_id)
597 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
598 step = "Getting nsr={} from db".format(nsr_id)
599 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
600 ns_params = db_nslcmop.get("operationParams")
601 nsd = db_nsr["nsd"]
602 nsr_name = db_nsr["name"] # TODO short-name??
603
604 # look if previous tasks in process
605 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
606 if task_dependency:
607 step = db_nslcmop_update["detailed-status"] = \
608 "Waiting for related tasks to be completed: {}".format(task_name)
609 self.logger.debug(logging_text + step)
610 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
611 _, pending = await asyncio.wait(task_dependency, timeout=3600)
612 if pending:
613 raise LcmException("Timeout waiting related tasks to be completed")
614
615 step = "Getting vnfrs from db"
616 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
617 db_vnfds_ref = {}
618 db_vnfds = {}
619 for vnfr in db_vnfrs_list:
620 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
621 vnfd_id = vnfr["vnfd-id"]
622 vnfd_ref = vnfr["vnfd-ref"]
623 if vnfd_id not in db_vnfds:
624 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
625 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
626 db_vnfds_ref[vnfd_ref] = vnfd
627 db_vnfds[vnfd_id] = vnfd
628
629 # Get or generates the _admin.deployed,VCA list
630 vca_deployed_list = None
631 if db_nsr["_admin"].get("deployed"):
632 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
633 if vca_deployed_list is None:
634 vca_deployed_list = []
635 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
636 elif isinstance(vca_deployed_list, dict):
637 # maintain backward compatibility. Change a dict to list at database
638 vca_deployed_list = list(vca_deployed_list.values())
639 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
640
641 db_nsr_update["detailed-status"] = "creating"
642 db_nsr_update["operational-status"] = "init"
643
644 RO = ROclient.ROClient(self.loop, **self.ro_config)
645
646 # set state to INSTANTIATED. When instantiated NBI will not delete directly
647 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
648 self.update_db_2("nsrs", nsr_id, db_nsr_update)
649
650 # get vnfds, instantiate at RO
651 for vnfd_id, vnfd in db_vnfds.items():
652 vnfd_ref = vnfd["id"]
653 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
654 # self.logger.debug(logging_text + step)
655 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
656 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
657 RO_descriptor_number += 1
658
659 # look if present
660 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
661 if vnfd_list:
662 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
663 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
664 vnfd_ref, vnfd_list[0]["uuid"]))
665 else:
666 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
667 desc = await RO.create("vnfd", descriptor=vnfd_RO)
668 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
669 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
670 vnfd_ref, desc["uuid"]))
671 self.update_db_2("nsrs", nsr_id, db_nsr_update)
672
673 # create nsd at RO
674 nsd_ref = nsd["id"]
675 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
676 # self.logger.debug(logging_text + step)
677
678 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
679 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
680 RO_descriptor_number += 1
681 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
682 if nsd_list:
683 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
684 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
685 nsd_ref, RO_nsd_uuid))
686 else:
687 nsd_RO = deepcopy(nsd)
688 nsd_RO["id"] = RO_osm_nsd_id
689 nsd_RO.pop("_id", None)
690 nsd_RO.pop("_admin", None)
691 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
692 vnfd_id = c_vnf["vnfd-id-ref"]
693 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
694 desc = await RO.create("nsd", descriptor=nsd_RO)
695 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
696 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
697 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
698 self.update_db_2("nsrs", nsr_id, db_nsr_update)
699
700 # Crate ns at RO
701 # if present use it unless in error status
702 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
703 if RO_nsr_id:
704 try:
705 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
706 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
707 desc = await RO.show("ns", RO_nsr_id)
708 except ROclient.ROClientException as e:
709 if e.http_code != HTTPStatus.NOT_FOUND:
710 raise
711 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
712 if RO_nsr_id:
713 ns_status, ns_status_info = RO.check_ns_status(desc)
714 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
715 if ns_status == "ERROR":
716 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
717 self.logger.debug(logging_text + step)
718 await RO.delete("ns", RO_nsr_id)
719 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
720 if not RO_nsr_id:
721 step = db_nsr_update["detailed-status"] = "Checking dependencies"
722 # self.logger.debug(logging_text + step)
723
724 # check if VIM is creating and wait look if previous tasks in process
725 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
726 if task_dependency:
727 step = "Waiting for related tasks to be completed: {}".format(task_name)
728 self.logger.debug(logging_text + step)
729 await asyncio.wait(task_dependency, timeout=3600)
730 if ns_params.get("vnf"):
731 for vnf in ns_params["vnf"]:
732 if "vimAccountId" in vnf:
733 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
734 vnf["vimAccountId"])
735 if task_dependency:
736 step = "Waiting for related tasks to be completed: {}".format(task_name)
737 self.logger.debug(logging_text + step)
738 await asyncio.wait(task_dependency, timeout=3600)
739
740 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
741
742 # feature 1429. Add n2vc public key to needed VMs
743 n2vc_key = await self.n2vc.GetPublicKey()
744 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
745
746 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
747 desc = await RO.create("ns", descriptor=RO_ns_params,
748 name=db_nsr["name"],
749 scenario=RO_nsd_uuid)
750 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
751 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
752 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
753 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
754 self.update_db_2("nsrs", nsr_id, db_nsr_update)
755
756 # wait until NS is ready
757 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
758 detailed_status_old = None
759 self.logger.debug(logging_text + step)
760
761 while time() <= start_deploy + self.total_deploy_timeout:
762 desc = await RO.show("ns", RO_nsr_id)
763 ns_status, ns_status_info = RO.check_ns_status(desc)
764 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
765 if ns_status == "ERROR":
766 raise ROclient.ROClientException(ns_status_info)
767 elif ns_status == "BUILD":
768 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
769 elif ns_status == "ACTIVE":
770 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
771 try:
772 self.ns_update_vnfr(db_vnfrs, desc)
773 break
774 except LcmExceptionNoMgmtIP:
775 pass
776 else:
777 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
778 if detailed_status != detailed_status_old:
779 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 await asyncio.sleep(5, loop=self.loop)
782 else: # total_deploy_timeout
783 raise ROclient.ROClientException("Timeout waiting ns to be ready")
784
785 step = "Updating NSR"
786 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
787
788 db_nsr["detailed-status"] = "Configuring vnfr"
789 self.update_db_2("nsrs", nsr_id, db_nsr_update)
790
791 # The parameters we'll need to deploy a charm
792 number_to_configure = 0
793
794 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, mgmt_ip_address, n2vc_info,
795 config_primitive=None):
796 """An inner function to deploy the charm from either vnf or vdu
797 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
798 """
799 if not mgmt_ip_address:
800 raise LcmException("vnfd/vdu has not management ip address to configure it")
801 # Login to the VCA.
802 # if number_to_configure == 0:
803 # self.logger.debug("Logging into N2VC...")
804 # task = asyncio.ensure_future(self.n2vc.login())
805 # yield from asyncio.wait_for(task, 30.0)
806 # self.logger.debug("Logged into N2VC!")
807
808 # # await self.n2vc.login()
809
810 # Note: The charm needs to exist on disk at the location
811 # specified by charm_path.
812 base_folder = vnfd["_admin"]["storage"]
813 storage_params = self.fs.get_params()
814 charm_path = "{}{}/{}/charms/{}".format(
815 storage_params["path"],
816 base_folder["folder"],
817 base_folder["pkg-dir"],
818 proxy_charm
819 )
820
821 # Setup the runtime parameters for this VNF
822 params = {'rw_mgmt_ip': mgmt_ip_address}
823 if config_primitive:
824 params["initial-config-primitive"] = config_primitive
825
826 # ns_name will be ignored in the current version of N2VC
827 # but will be implemented for the next point release.
828 model_name = "default" # TODO bug 585 nsr_id
829 if vdu_id:
830 vdu_id_text = vdu_id + "-"
831 else:
832 vdu_id_text = "-"
833 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
834
835 vca_index = len(vca_deployed_list)
836 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
837 # 26*26 charm in the same NS
838 application_name = application_name[0:48]
839 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
840 vca_deployed_ = {
841 "member-vnf-index": vnf_index,
842 "vdu_id": vdu_id,
843 "model": model_name,
844 "application": application_name,
845 "operational-status": "init",
846 "detailed-status": "",
847 "vnfd_id": vnfd_id,
848 "vdu_name": vdu_name,
849 "vdu_count_index": vdu_count_index,
850 }
851 vca_deployed_list.append(vca_deployed_)
852 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
853 self.update_db_2("nsrs", nsr_id, db_nsr_update)
854
855 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
856 proxy_charm))
857 if not n2vc_info:
858 n2vc_info["nsr_id"] = nsr_id
859 n2vc_info["nslcmop_id"] = nslcmop_id
860 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
861 n2vc_info["lcmOperationType"] = "instantiate"
862 n2vc_info["deployed"] = vca_deployed_list
863 n2vc_info["db_update"] = db_nsr_update
864 task = asyncio.ensure_future(
865 self.n2vc.DeployCharms(
866 model_name, # The network service name
867 application_name, # The application name
868 vnfd, # The vnf descriptor
869 charm_path, # Path to charm
870 params, # Runtime params, like mgmt ip
871 {}, # for native charms only
872 self.n2vc_callback, # Callback for status changes
873 n2vc_info, # Callback parameter
874 None, # Callback parameter (task)
875 )
876 )
877 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
878 n2vc_info))
879 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
880
881 step = "Looking for needed vnfd to configure"
882 self.logger.debug(logging_text + step)
883
884 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
885 vnfd_id = c_vnf["vnfd-id-ref"]
886 vnf_index = str(c_vnf["member-vnf-index"])
887 vnfd = db_vnfds_ref[vnfd_id]
888
889 # Check if this VNF has a charm configuration
890 vnf_config = vnfd.get("vnf-configuration")
891
892 if vnf_config and vnf_config.get("juju"):
893 proxy_charm = vnf_config["juju"]["charm"]
894 config_primitive = None
895
896 if proxy_charm:
897 if 'initial-config-primitive' in vnf_config:
898 config_primitive = vnf_config['initial-config-primitive']
899
900 # Login to the VCA. If there are multiple calls to login(),
901 # subsequent calls will be a nop and return immediately.
902 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
903 await self.n2vc.login()
904 deploy_charm(vnf_index, None, None, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info,
905 config_primitive)
906 number_to_configure += 1
907
908 # Deploy charms for each VDU that supports one.
909 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
910 vdu_config = vdu.get('vdu-configuration')
911 proxy_charm = None
912 config_primitive = None
913
914 if vdu_config and vdu_config.get("juju"):
915 proxy_charm = vdu_config["juju"]["charm"]
916
917 if 'initial-config-primitive' in vdu_config:
918 config_primitive = vdu_config['initial-config-primitive']
919
920 if proxy_charm:
921 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
922 await self.n2vc.login()
923 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
924 # TODO for the moment only first vdu_id contains a charm deployed
925 if vdur["vdu-id-ref"] != vdu["id"]:
926 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
927 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
928 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
929 vdur["ip-address"], n2vc_info, config_primitive)
930 number_to_configure += 1
931
932 db_nsr_update["operational-status"] = "running"
933 configuration_failed = False
934 if number_to_configure:
935 old_status = "configuring: init: {}".format(number_to_configure)
936 db_nsr_update["config-status"] = old_status
937 db_nsr_update["detailed-status"] = old_status
938 db_nslcmop_update["detailed-status"] = old_status
939
940 # wait until all are configured.
941 while time() <= start_deploy + self.total_deploy_timeout:
942 if db_nsr_update:
943 self.update_db_2("nsrs", nsr_id, db_nsr_update)
944 if db_nslcmop_update:
945 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
946 # TODO add a fake tast that set n2vc_event after some time
947 await n2vc_info["n2vc_event"].wait()
948 n2vc_info["n2vc_event"].clear()
949 all_active = True
950 status_map = {}
951 n2vc_error_text = [] # contain text error list. If empty no one is in error status
952 now = time()
953 for vca_deployed in vca_deployed_list:
954 vca_status = vca_deployed["operational-status"]
955 if vca_status not in status_map:
956 # Initialize it
957 status_map[vca_status] = 0
958 status_map[vca_status] += 1
959
960 if vca_status == "active":
961 vca_deployed.pop("time_first_error", None)
962 vca_deployed.pop("status_first_error", None)
963 continue
964
965 all_active = False
966 if vca_status in ("error", "blocked"):
967 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
968 # if not first time in this status error
969 if not vca_deployed.get("time_first_error"):
970 vca_deployed["time_first_error"] = now
971 continue
972 if vca_deployed.get("time_first_error") and \
973 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
974 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
975 .format(vca_deployed["member-vnf-index"],
976 vca_deployed["vdu_id"], vca_status,
977 vca_deployed["detailed-status-error"]))
978
979 if all_active:
980 break
981 elif n2vc_error_text:
982 db_nsr_update["config-status"] = "failed"
983 error_text = "fail configuring " + ";".join(n2vc_error_text)
984 db_nsr_update["detailed-status"] = error_text
985 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
986 db_nslcmop_update["detailed-status"] = error_text
987 db_nslcmop_update["statusEnteredTime"] = time()
988 configuration_failed = True
989 break
990 else:
991 cs = "configuring: "
992 separator = ""
993 for status, num in status_map.items():
994 cs += separator + "{}: {}".format(status, num)
995 separator = ", "
996 if old_status != cs:
997 db_nsr_update["config-status"] = cs
998 db_nsr_update["detailed-status"] = cs
999 db_nslcmop_update["detailed-status"] = cs
1000 old_status = cs
1001 else: # total_deploy_timeout
1002 raise LcmException("Timeout waiting ns to be configured")
1003
1004 if not configuration_failed:
1005 # all is done
1006 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1007 db_nslcmop_update["statusEnteredTime"] = time()
1008 db_nslcmop_update["detailed-status"] = "done"
1009 db_nsr_update["config-status"] = "configured"
1010 db_nsr_update["detailed-status"] = "done"
1011
1012 return
1013
1014 except (ROclient.ROClientException, DbException, LcmException) as e:
1015 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1016 exc = e
1017 except asyncio.CancelledError:
1018 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1019 exc = "Operation was cancelled"
1020 except Exception as e:
1021 exc = traceback.format_exc()
1022 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1023 exc_info=True)
1024 finally:
1025 if exc:
1026 if db_nsr:
1027 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1028 db_nsr_update["operational-status"] = "failed"
1029 if db_nslcmop:
1030 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1031 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1032 db_nslcmop_update["statusEnteredTime"] = time()
1033 try:
1034 if db_nsr:
1035 db_nsr_update["_admin.nslcmop"] = None
1036 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1037 if db_nslcmop_update:
1038 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1039 except DbException as e:
1040 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1041 if nslcmop_operation_state:
1042 try:
1043 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1044 "operationState": nslcmop_operation_state})
1045 except Exception as e:
1046 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1047
1048 self.logger.debug(logging_text + "Exit")
1049 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1050
1051 async def _destroy_charm(self, model, application):
1052 """
1053 Order N2VC destroy a charm
1054 :param model:
1055 :param application:
1056 :return: True if charm does not exist. False if it exist
1057 """
1058 if not await self.n2vc.HasApplication(model, application):
1059 return True # Already removed
1060 await self.n2vc.RemoveCharms(model, application)
1061 return False
1062
1063 async def _wait_charm_destroyed(self, model, application, timeout):
1064 """
1065 Wait until charm does not exist
1066 :param model:
1067 :param application:
1068 :param timeout:
1069 :return: True if not exist, False if timeout
1070 """
1071 while True:
1072 if not await self.n2vc.HasApplication(model, application):
1073 return True
1074 if timeout < 0:
1075 return False
1076 await asyncio.sleep(10)
1077 timeout -= 10
1078
1079 async def terminate(self, nsr_id, nslcmop_id):
1080 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1081 self.logger.debug(logging_text + "Enter")
1082 db_nsr = None
1083 db_nslcmop = None
1084 exc = None
1085 failed_detail = [] # annotates all failed error messages
1086 vca_time_destroy = None # time of where destroy charm order
1087 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1088 db_nslcmop_update = {}
1089 nslcmop_operation_state = None
1090 try:
1091 step = "Getting nslcmop={} from db".format(nslcmop_id)
1092 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1093 step = "Getting nsr={} from db".format(nsr_id)
1094 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1095 # nsd = db_nsr["nsd"]
1096 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1097 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1098 return
1099 # #TODO check if VIM is creating and wait
1100 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1101
1102 db_nsr_update["operational-status"] = "terminating"
1103 db_nsr_update["config-status"] = "terminating"
1104
1105 if nsr_deployed and nsr_deployed.get("VCA"):
1106 try:
1107 step = "Scheduling configuration charms removing"
1108 db_nsr_update["detailed-status"] = "Deleting charms"
1109 self.logger.debug(logging_text + step)
1110 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1111 # for backward compatibility
1112 if isinstance(nsr_deployed["VCA"], dict):
1113 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1114 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1115 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1116
1117 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1118 if vca_deployed:
1119 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1120 vca_deployed.clear()
1121 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1122 else:
1123 vca_time_destroy = time()
1124 except Exception as e:
1125 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1126
1127 # remove from RO
1128 RO_fail = False
1129 RO = ROclient.ROClient(self.loop, **self.ro_config)
1130
1131 # Delete ns
1132 RO_nsr_id = RO_delete_action = None
1133 if nsr_deployed and nsr_deployed.get("RO"):
1134 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1135 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1136 try:
1137 if RO_nsr_id:
1138 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1139 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1141 self.logger.debug(logging_text + step)
1142 desc = await RO.delete("ns", RO_nsr_id)
1143 RO_delete_action = desc["action_id"]
1144 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1145 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1146 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1147 if RO_delete_action:
1148 # wait until NS is deleted from VIM
1149 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1150 format(RO_nsr_id, RO_delete_action)
1151 detailed_status_old = None
1152 self.logger.debug(logging_text + step)
1153
1154 delete_timeout = 20 * 60 # 20 minutes
1155 while delete_timeout > 0:
1156 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1157 extra_item_id=RO_delete_action)
1158 ns_status, ns_status_info = RO.check_action_status(desc)
1159 if ns_status == "ERROR":
1160 raise ROclient.ROClientException(ns_status_info)
1161 elif ns_status == "BUILD":
1162 detailed_status = step + "; {}".format(ns_status_info)
1163 elif ns_status == "ACTIVE":
1164 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1165 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1166 break
1167 else:
1168 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1169 if detailed_status != detailed_status_old:
1170 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1171 db_nsr_update["detailed-status"] = detailed_status
1172 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1173 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1174 await asyncio.sleep(5, loop=self.loop)
1175 delete_timeout -= 5
1176 else: # delete_timeout <= 0:
1177 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1178
1179 except ROclient.ROClientException as e:
1180 if e.http_code == 404: # not found
1181 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1182 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1183 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1184 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1185 elif e.http_code == 409: # conflict
1186 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1187 self.logger.debug(logging_text + failed_detail[-1])
1188 RO_fail = True
1189 else:
1190 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1191 self.logger.error(logging_text + failed_detail[-1])
1192 RO_fail = True
1193
1194 # Delete nsd
1195 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1196 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1197 try:
1198 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1199 "Deleting nsd at RO"
1200 await RO.delete("nsd", RO_nsd_id)
1201 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1202 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1203 except ROclient.ROClientException as e:
1204 if e.http_code == 404: # not found
1205 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1206 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1207 elif e.http_code == 409: # conflict
1208 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1209 self.logger.debug(logging_text + failed_detail[-1])
1210 RO_fail = True
1211 else:
1212 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1213 self.logger.error(logging_text + failed_detail[-1])
1214 RO_fail = True
1215
1216 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1217 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1218 if not RO_vnfd_id:
1219 continue
1220 try:
1221 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1222 "Deleting vnfd={} at RO".format(vnf_id)
1223 await RO.delete("vnfd", RO_vnfd_id)
1224 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1225 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1226 except ROclient.ROClientException as e:
1227 if e.http_code == 404: # not found
1228 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1229 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1230 elif e.http_code == 409: # conflict
1231 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1232 self.logger.debug(logging_text + failed_detail[-1])
1233 else:
1234 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1235 self.logger.error(logging_text + failed_detail[-1])
1236
1237 # wait until charm deleted
1238 if vca_time_destroy:
1239 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1240 "Waiting for deletion of configuration charms"
1241 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1242 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1243 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1244 if not vca_deployed:
1245 continue
1246 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1247 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1248 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1249 timeout):
1250 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1251 vca_deployed["application"]))
1252 else:
1253 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1254
1255 if failed_detail:
1256 self.logger.error(logging_text + " ;".join(failed_detail))
1257 db_nsr_update["operational-status"] = "failed"
1258 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1259 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1260 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1261 db_nslcmop_update["statusEnteredTime"] = time()
1262 elif db_nslcmop["operationParams"].get("autoremove"):
1263 self.db.del_one("nsrs", {"_id": nsr_id})
1264 db_nsr = None
1265 db_nsr_update.clear()
1266 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1267 db_nslcmop = None
1268 nslcmop_operation_state = "COMPLETED"
1269 db_nslcmop_update.clear()
1270 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1271 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1272 {"_admin.usageState": "NOT_IN_USE", "_admin.usage": None})
1273 self.logger.debug(logging_text + "Delete from database")
1274 else:
1275 db_nsr_update["operational-status"] = "terminated"
1276 db_nsr_update["detailed-status"] = "Done"
1277 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1278 db_nslcmop_update["detailed-status"] = "Done"
1279 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1280 db_nslcmop_update["statusEnteredTime"] = time()
1281
1282 except (ROclient.ROClientException, DbException) as e:
1283 self.logger.error(logging_text + "Exit Exception {}".format(e))
1284 exc = e
1285 except asyncio.CancelledError:
1286 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1287 exc = "Operation was cancelled"
1288 except Exception as e:
1289 exc = traceback.format_exc()
1290 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1291 finally:
1292 if exc and db_nslcmop:
1293 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1294 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1295 db_nslcmop_update["statusEnteredTime"] = time()
1296 try:
1297 if db_nslcmop and db_nslcmop_update:
1298 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1299 if db_nsr:
1300 db_nsr_update["_admin.nslcmop"] = None
1301 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1302 except DbException as e:
1303 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1304 if nslcmop_operation_state:
1305 try:
1306 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1307 "operationState": nslcmop_operation_state})
1308 except Exception as e:
1309 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1310 self.logger.debug(logging_text + "Exit")
1311 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1312
1313 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1314 primitive, primitive_params):
1315
1316 for vca_deployed in db_deployed["VCA"]:
1317 if not vca_deployed:
1318 continue
1319 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1320 continue
1321 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1322 continue
1323 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1324 continue
1325 break
1326 else:
1327 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1328 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1329 model_name = vca_deployed.get("model")
1330 application_name = vca_deployed.get("application")
1331 if not model_name or not application_name:
1332 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1333 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1334 if vca_deployed["operational-status"] != "active":
1335 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1336 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1337 callback = None # self.n2vc_callback
1338 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1339 await self.n2vc.login()
1340 task = asyncio.ensure_future(
1341 self.n2vc.ExecutePrimitive(
1342 model_name,
1343 application_name,
1344 primitive, callback,
1345 *callback_args,
1346 **primitive_params
1347 )
1348 )
1349 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1350 # db_nsr, db_nslcmop, member_vnf_index))
1351 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1352 # wait until completed with timeout
1353 await asyncio.wait((task,), timeout=600)
1354
1355 result = "FAILED" # by default
1356 result_detail = ""
1357 if task.cancelled():
1358 result_detail = "Task has been cancelled"
1359 elif task.done():
1360 exc = task.exception()
1361 if exc:
1362 result_detail = str(exc)
1363 else:
1364 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1365 result = "COMPLETED"
1366 result_detail = "Done"
1367 else: # timeout
1368 # TODO Should it be cancelled?!!
1369 task.cancel()
1370 result_detail = "timeout"
1371 return result, result_detail
1372
1373 async def action(self, nsr_id, nslcmop_id):
1374 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1375 self.logger.debug(logging_text + "Enter")
1376 # get all needed from database
1377 db_nsr = None
1378 db_nslcmop = None
1379 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1380 db_nslcmop_update = {}
1381 nslcmop_operation_state = None
1382 exc = None
1383 try:
1384 step = "Getting information from database"
1385 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1386 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1387 nsr_deployed = db_nsr["_admin"].get("deployed")
1388 nsr_name = db_nsr["name"]
1389 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1390 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1391 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1392 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1393
1394 # look if previous tasks in process
1395 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1396 if task_dependency:
1397 step = db_nslcmop_update["detailed-status"] = \
1398 "Waiting for related tasks to be completed: {}".format(task_name)
1399 self.logger.debug(logging_text + step)
1400 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1401 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1402 if pending:
1403 raise LcmException("Timeout waiting related tasks to be completed")
1404
1405 # for backward compatibility
1406 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1407 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1408 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1409 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1410
1411 # TODO check if ns is in a proper status
1412 primitive = db_nslcmop["operationParams"]["primitive"]
1413 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1414 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1415 vdu_name, vdu_count_index, primitive,
1416 primitive_params)
1417 db_nslcmop_update["detailed-status"] = result_detail
1418 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1419 db_nslcmop_update["statusEnteredTime"] = time()
1420 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1421 return # database update is called inside finally
1422
1423 except (DbException, LcmException) as e:
1424 self.logger.error(logging_text + "Exit Exception {}".format(e))
1425 exc = e
1426 except asyncio.CancelledError:
1427 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1428 exc = "Operation was cancelled"
1429 except Exception as e:
1430 exc = traceback.format_exc()
1431 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1432 finally:
1433 if exc and db_nslcmop:
1434 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1435 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1436 db_nslcmop_update["statusEnteredTime"] = time()
1437 try:
1438 if db_nslcmop_update:
1439 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1440 if db_nsr:
1441 db_nsr_update["_admin.nslcmop"] = None
1442 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1443 except DbException as e:
1444 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1445 self.logger.debug(logging_text + "Exit")
1446 if nslcmop_operation_state:
1447 try:
1448 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1449 "operationState": nslcmop_operation_state})
1450 except Exception as e:
1451 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1452 self.logger.debug(logging_text + "Exit")
1453 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1454
1455 async def scale(self, nsr_id, nslcmop_id):
1456 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1457 self.logger.debug(logging_text + "Enter")
1458 # get all needed from database
1459 db_nsr = None
1460 db_nslcmop = None
1461 db_nslcmop_update = {}
1462 nslcmop_operation_state = None
1463 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1464 exc = None
1465 # in case of error, indicates what part of scale was failed to put nsr at error status
1466 scale_process = None
1467 old_operational_status = ""
1468 old_config_status = ""
1469 vnfr_scaled = False
1470 try:
1471 step = "Getting nslcmop from database"
1472 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1473 step = "Getting nsr from database"
1474 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1475 nsr_name = db_nsr["name"]
1476 old_operational_status = db_nsr["operational-status"]
1477 old_config_status = db_nsr["config-status"]
1478
1479 # look if previous tasks in process
1480 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1481 if task_dependency:
1482 step = db_nslcmop_update["detailed-status"] = \
1483 "Waiting for related tasks to be completed: {}".format(task_name)
1484 self.logger.debug(logging_text + step)
1485 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1486 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1487 if pending:
1488 raise LcmException("Timeout waiting related tasks to be completed")
1489
1490 step = "Parsing scaling parameters"
1491 db_nsr_update["operational-status"] = "scaling"
1492 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1493 nsr_deployed = db_nsr["_admin"].get("deployed")
1494 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1495 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1496 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1497 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1498 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1499
1500 # for backward compatibility
1501 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1502 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1503 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1504 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1505
1506 step = "Getting vnfr from database"
1507 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1508 step = "Getting vnfd from database"
1509 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1510 step = "Getting scaling-group-descriptor"
1511 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1512 if scaling_descriptor["name"] == scaling_group:
1513 break
1514 else:
1515 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1516 "at vnfd:scaling-group-descriptor".format(scaling_group))
1517 # cooldown_time = 0
1518 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1519 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1520 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1521 # break
1522
1523 # TODO check if ns is in a proper status
1524 step = "Sending scale order to RO"
1525 nb_scale_op = 0
1526 if not db_nsr["_admin"].get("scaling-group"):
1527 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1528 admin_scale_index = 0
1529 else:
1530 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1531 if admin_scale_info["name"] == scaling_group:
1532 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1533 break
1534 else: # not found, set index one plus last element and add new entry with the name
1535 admin_scale_index += 1
1536 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1537 RO_scaling_info = []
1538 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1539 if scaling_type == "SCALE_OUT":
1540 # count if max-instance-count is reached
1541 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1542 max_instance_count = int(scaling_descriptor["max-instance-count"])
1543 if nb_scale_op >= max_instance_count:
1544 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1545 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1546 nb_scale_op = nb_scale_op + 1
1547 vdu_scaling_info["scaling_direction"] = "OUT"
1548 vdu_scaling_info["vdu-create"] = {}
1549 for vdu_scale_info in scaling_descriptor["vdu"]:
1550 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1551 "type": "create", "count": vdu_scale_info.get("count", 1)})
1552 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1553 elif scaling_type == "SCALE_IN":
1554 # count if min-instance-count is reached
1555 min_instance_count = 0
1556 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1557 min_instance_count = int(scaling_descriptor["min-instance-count"])
1558 if nb_scale_op <= min_instance_count:
1559 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1560 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1561 nb_scale_op = nb_scale_op - 1
1562 vdu_scaling_info["scaling_direction"] = "IN"
1563 vdu_scaling_info["vdu-delete"] = {}
1564 for vdu_scale_info in scaling_descriptor["vdu"]:
1565 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1566 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1567 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1568
1569 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1570 vdu_create = vdu_scaling_info.get("vdu-create")
1571 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1572 if vdu_scaling_info["scaling_direction"] == "IN":
1573 for vdur in reversed(db_vnfr["vdur"]):
1574 if vdu_delete.get(vdur["vdu-id-ref"]):
1575 vdu_delete[vdur["vdu-id-ref"]] -= 1
1576 vdu_scaling_info["vdu"].append({
1577 "name": vdur["name"],
1578 "vdu_id": vdur["vdu-id-ref"],
1579 "interface": []
1580 })
1581 for interface in vdur["interfaces"]:
1582 vdu_scaling_info["vdu"][-1]["interface"].append({
1583 "name": interface["name"],
1584 "ip_address": interface["ip-address"],
1585 "mac_address": interface.get("mac-address"),
1586 })
1587 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1588
1589 # execute primitive service PRE-SCALING
1590 step = "Executing pre-scale vnf-config-primitive"
1591 if scaling_descriptor.get("scaling-config-action"):
1592 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1593 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1594 and scaling_type == "SCALE_IN":
1595 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1596 step = db_nslcmop_update["detailed-status"] = \
1597 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1598 # look for primitive
1599 primitive_params = {}
1600 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1601 if config_primitive["name"] == vnf_config_primitive:
1602 for parameter in config_primitive.get("parameter", ()):
1603 if 'default-value' in parameter and \
1604 parameter['default-value'] == "<VDU_SCALE_INFO>":
1605 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1606 default_flow_style=True,
1607 width=256)
1608 break
1609 else:
1610 raise LcmException(
1611 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1612 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1613 "primitive".format(scaling_group, config_primitive))
1614 scale_process = "VCA"
1615 db_nsr_update["config-status"] = "configuring pre-scaling"
1616 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1617 None, None, None, vnf_config_primitive,
1618 primitive_params)
1619 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1620 vnf_config_primitive, result, result_detail))
1621 if result == "FAILED":
1622 raise LcmException(result_detail)
1623 db_nsr_update["config-status"] = old_config_status
1624 scale_process = None
1625
1626 if RO_scaling_info:
1627 scale_process = "RO"
1628 RO = ROclient.ROClient(self.loop, **self.ro_config)
1629 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1630 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1631 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1632 # wait until ready
1633 RO_nslcmop_id = RO_desc["instance_action_id"]
1634 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1635
1636 RO_task_done = False
1637 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1638 detailed_status_old = None
1639 self.logger.debug(logging_text + step)
1640
1641 deployment_timeout = 1 * 3600 # One hour
1642 while deployment_timeout > 0:
1643 if not RO_task_done:
1644 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1645 extra_item_id=RO_nslcmop_id)
1646 ns_status, ns_status_info = RO.check_action_status(desc)
1647 if ns_status == "ERROR":
1648 raise ROclient.ROClientException(ns_status_info)
1649 elif ns_status == "BUILD":
1650 detailed_status = step + "; {}".format(ns_status_info)
1651 elif ns_status == "ACTIVE":
1652 RO_task_done = True
1653 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1654 self.logger.debug(logging_text + step)
1655 else:
1656 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1657 else:
1658 desc = await RO.show("ns", RO_nsr_id)
1659 ns_status, ns_status_info = RO.check_ns_status(desc)
1660 if ns_status == "ERROR":
1661 raise ROclient.ROClientException(ns_status_info)
1662 elif ns_status == "BUILD":
1663 detailed_status = step + "; {}".format(ns_status_info)
1664 elif ns_status == "ACTIVE":
1665 step = detailed_status = \
1666 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1667 if not vnfr_scaled:
1668 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1669 vnfr_scaled = True
1670 try:
1671 desc = await RO.show("ns", RO_nsr_id)
1672 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1673 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1674 break
1675 except LcmExceptionNoMgmtIP:
1676 pass
1677 else:
1678 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1679 if detailed_status != detailed_status_old:
1680 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1681 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1682
1683 await asyncio.sleep(5, loop=self.loop)
1684 deployment_timeout -= 5
1685 if deployment_timeout <= 0:
1686 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1687
1688 # update VDU_SCALING_INFO with the obtained ip_addresses
1689 if vdu_scaling_info["scaling_direction"] == "OUT":
1690 for vdur in reversed(db_vnfr["vdur"]):
1691 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1692 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1693 vdu_scaling_info["vdu"].append({
1694 "name": vdur["name"],
1695 "vdu_id": vdur["vdu-id-ref"],
1696 "interface": []
1697 })
1698 for interface in vdur["interfaces"]:
1699 vdu_scaling_info["vdu"][-1]["interface"].append({
1700 "name": interface["name"],
1701 "ip_address": interface["ip-address"],
1702 "mac_address": interface.get("mac-address"),
1703 })
1704 del vdu_scaling_info["vdu-create"]
1705
1706 scale_process = None
1707 if db_nsr_update:
1708 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1709
1710 # execute primitive service POST-SCALING
1711 step = "Executing post-scale vnf-config-primitive"
1712 if scaling_descriptor.get("scaling-config-action"):
1713 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1714 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1715 and scaling_type == "SCALE_OUT":
1716 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1717 step = db_nslcmop_update["detailed-status"] = \
1718 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1719 # look for primitive
1720 primitive_params = {}
1721 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1722 if config_primitive["name"] == vnf_config_primitive:
1723 for parameter in config_primitive.get("parameter", ()):
1724 if 'default-value' in parameter and \
1725 parameter['default-value'] == "<VDU_SCALE_INFO>":
1726 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1727 default_flow_style=True,
1728 width=256)
1729 break
1730 else:
1731 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1732 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1733 "match any vnf-configuration:config-primitive".format(scaling_group,
1734 config_primitive))
1735 scale_process = "VCA"
1736 db_nsr_update["config-status"] = "configuring post-scaling"
1737
1738 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1739 None, None, None, vnf_config_primitive,
1740 primitive_params)
1741 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1742 vnf_config_primitive, result, result_detail))
1743 if result == "FAILED":
1744 raise LcmException(result_detail)
1745 db_nsr_update["config-status"] = old_config_status
1746 scale_process = None
1747
1748 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1749 db_nslcmop_update["statusEnteredTime"] = time()
1750 db_nslcmop_update["detailed-status"] = "done"
1751 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1752 db_nsr_update["operational-status"] = old_operational_status
1753 db_nsr_update["config-status"] = old_config_status
1754 return
1755 except (ROclient.ROClientException, DbException, LcmException) as e:
1756 self.logger.error(logging_text + "Exit Exception {}".format(e))
1757 exc = e
1758 except asyncio.CancelledError:
1759 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1760 exc = "Operation was cancelled"
1761 except Exception as e:
1762 exc = traceback.format_exc()
1763 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1764 finally:
1765 if exc:
1766 if db_nslcmop:
1767 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1768 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1769 db_nslcmop_update["statusEnteredTime"] = time()
1770 if db_nsr:
1771 db_nsr_update["operational-status"] = old_operational_status
1772 db_nsr_update["config-status"] = old_config_status
1773 db_nsr_update["detailed-status"] = ""
1774 db_nsr_update["_admin.nslcmop"] = None
1775 if scale_process:
1776 if "VCA" in scale_process:
1777 db_nsr_update["config-status"] = "failed"
1778 if "RO" in scale_process:
1779 db_nsr_update["operational-status"] = "failed"
1780 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1781 exc)
1782 try:
1783 if db_nslcmop and db_nslcmop_update:
1784 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1785 if db_nsr:
1786 db_nsr_update["_admin.nslcmop"] = None
1787 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1788 except DbException as e:
1789 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1790 if nslcmop_operation_state:
1791 try:
1792 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1793 "operationState": nslcmop_operation_state})
1794 # if cooldown_time:
1795 # await asyncio.sleep(cooldown_time)
1796 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1797 except Exception as e:
1798 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1799 self.logger.debug(logging_text + "Exit")
1800 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")