bug 601: wait until charms are removed.
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25
26 import ROclient
27 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
28
29 from osm_common.dbbase import DbException
30 from osm_common.fsbase import FsException
31 from n2vc.vnf import N2VC
32
33 from copy import copy, deepcopy
34 from http import HTTPStatus
35 from time import time
36 from uuid import uuid4
37
38 __author__ = "Alfonso Tierno"
39
40
41 def get_iterable(in_dict, in_key):
42 """
43 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
44 :param in_dict: a dictionary
45 :param in_key: the key to look for at in_dict
46 :return: in_dict[in_var] or () if it is None or not present
47 """
48 if not in_dict.get(in_key):
49 return ()
50 return in_dict[in_key]
51
52
53 def populate_dict(target_dict, key_list, value):
54 """
55 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
56 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
57 :param target_dict: dictionary to be changed
58 :param key_list: list of keys to insert at target_dict
59 :param value:
60 :return: None
61 """
62 for key in key_list[0:-1]:
63 if key not in target_dict:
64 target_dict[key] = {}
65 target_dict = target_dict[key]
66 target_dict[key_list[-1]] = value
67
68
69 class NsLcm(LcmBase):
70 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
71 total_deploy_timeout = 2 * 3600 # global timeout for deployment
72 timeout_charm_delete = 10 * 60
73
74 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
75 """
76 Init, Connect to database, filesystem storage, and messaging
77 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
78 :return: None
79 """
80 # logging
81 self.logger = logging.getLogger('lcm.ns')
82 self.loop = loop
83 self.lcm_tasks = lcm_tasks
84
85 super().__init__(db, msg, fs, self.logger)
86
87 self.ro_config = ro_config
88
89 self.n2vc = N2VC(
90 log=self.logger,
91 server=vca_config['host'],
92 port=vca_config['port'],
93 user=vca_config['user'],
94 secret=vca_config['secret'],
95 # TODO: This should point to the base folder where charms are stored,
96 # if there is a common one (like object storage). Otherwise, leave
97 # it unset and pass it via DeployCharms
98 # artifacts=vca_config[''],
99 artifacts=None,
100 )
101
102 def vnfd2RO(self, vnfd, new_id=None):
103 """
104 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
105 :param vnfd: input vnfd
106 :param new_id: overrides vnf id if provided
107 :return: copy of vnfd
108 """
109 ci_file = None
110 try:
111 vnfd_RO = deepcopy(vnfd)
112 vnfd_RO.pop("_id", None)
113 vnfd_RO.pop("_admin", None)
114 if new_id:
115 vnfd_RO["id"] = new_id
116 for vdu in vnfd_RO.get("vdu", ()):
117 if "cloud-init-file" in vdu:
118 base_folder = vnfd["_admin"]["storage"]
119 clout_init_file = "{}/{}/cloud_init/{}".format(
120 base_folder["folder"],
121 base_folder["pkg-dir"],
122 vdu["cloud-init-file"]
123 )
124 ci_file = self.fs.file_open(clout_init_file, "r")
125 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
126 # convert to base 64 or similar
127 clout_init_content = ci_file.read()
128 ci_file.close()
129 ci_file = None
130 vdu.pop("cloud-init-file", None)
131 vdu["cloud-init"] = clout_init_content
132 # remnove unused by RO configuration, monitoring, scaling
133 vnfd_RO.pop("vnf-configuration", None)
134 vnfd_RO.pop("monitoring-param", None)
135 vnfd_RO.pop("scaling-group-descriptor", None)
136 return vnfd_RO
137 except FsException as e:
138 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
139 finally:
140 if ci_file:
141 ci_file.close()
142
143 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
144 """
145 Callback both for charm status change and task completion
146 :param model_name: Charm model name
147 :param application_name: Charm application name
148 :param status: Can be
149 - blocked: The unit needs manual intervention
150 - maintenance: The unit is actively deploying/configuring
151 - waiting: The unit is waiting for another charm to be ready
152 - active: The unit is deployed, configured, and ready
153 - error: The charm has failed and needs attention.
154 - terminated: The charm has been destroyed
155 - removing,
156 - removed
157 :param message: detailed message error
158 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
159 nsr_id:
160 nslcmop_id:
161 lcmOperationType: currently "instantiate"
162 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
163 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
164 n2vc_event: event used to notify instantiation task that some change has been produced
165 :param task: None for charm status change, or task for completion task callback
166 :return:
167 """
168 try:
169 nsr_id = n2vc_info["nsr_id"]
170 deployed = n2vc_info["deployed"]
171 db_nsr_update = n2vc_info["db_update"]
172 nslcmop_id = n2vc_info["nslcmop_id"]
173 ns_operation = n2vc_info["lcmOperationType"]
174 n2vc_event = n2vc_info["n2vc_event"]
175 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
176 application_name)
177 for vca_index, vca_deployed in enumerate(deployed):
178 if not vca_deployed:
179 continue
180 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
181 break
182 else:
183 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
184 return
185 if task:
186 if task.cancelled():
187 self.logger.debug(logging_text + " task Cancelled")
188 vca_deployed['operational-status'] = "error"
189 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
190 vca_deployed['detailed-status'] = "Task Cancelled"
191 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
192
193 elif task.done():
194 exc = task.exception()
195 if exc:
196 self.logger.error(logging_text + " task Exception={}".format(exc))
197 vca_deployed['operational-status'] = "error"
198 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
199 vca_deployed['detailed-status'] = str(exc)
200 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
201 else:
202 self.logger.debug(logging_text + " task Done")
203 # task is Done, but callback is still ongoing. So ignore
204 return
205 elif status:
206 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
207 if vca_deployed['operational-status'] == status:
208 return # same status, ignore
209 vca_deployed['operational-status'] = status
210 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
211 vca_deployed['detailed-status'] = str(message)
212 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
213 else:
214 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
215 return
216 # wake up instantiate task
217 n2vc_event.set()
218 except Exception as e:
219 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
220
221 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
222 """
223 Creates a RO ns descriptor from OSM ns_instantiate params
224 :param ns_params: OSM instantiate params
225 :return: The RO ns descriptor
226 """
227 vim_2_RO = {}
228 # TODO feature 1417: Check that no instantiation is set over PDU
229 # check if PDU forces a concrete vim-network-id and add it
230 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
231
232 def vim_account_2_RO(vim_account):
233 if vim_account in vim_2_RO:
234 return vim_2_RO[vim_account]
235
236 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
237 if db_vim["_admin"]["operationalState"] != "ENABLED":
238 raise LcmException("VIM={} is not available. operationalState={}".format(
239 vim_account, db_vim["_admin"]["operationalState"]))
240 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
241 vim_2_RO[vim_account] = RO_vim_id
242 return RO_vim_id
243
244 def ip_profile_2_RO(ip_profile):
245 RO_ip_profile = deepcopy((ip_profile))
246 if "dns-server" in RO_ip_profile:
247 if isinstance(RO_ip_profile["dns-server"], list):
248 RO_ip_profile["dns-address"] = []
249 for ds in RO_ip_profile.pop("dns-server"):
250 RO_ip_profile["dns-address"].append(ds['address'])
251 else:
252 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
253 if RO_ip_profile.get("ip-version") == "ipv4":
254 RO_ip_profile["ip-version"] = "IPv4"
255 if RO_ip_profile.get("ip-version") == "ipv6":
256 RO_ip_profile["ip-version"] = "IPv6"
257 if "dhcp-params" in RO_ip_profile:
258 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
259 return RO_ip_profile
260
261 if not ns_params:
262 return None
263 RO_ns_params = {
264 # "name": ns_params["nsName"],
265 # "description": ns_params.get("nsDescription"),
266 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
267 # "scenario": ns_params["nsdId"],
268 }
269 if n2vc_key_list:
270 for vnfd_ref, vnfd in vnfd_dict.items():
271 vdu_needed_access = []
272 mgmt_cp = None
273 if vnfd.get("vnf-configuration"):
274 if vnfd.get("mgmt-interface"):
275 if vnfd["mgmt-interface"].get("vdu-id"):
276 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
277 elif vnfd["mgmt-interface"].get("cp"):
278 mgmt_cp = vnfd["mgmt-interface"]["cp"]
279
280 for vdu in vnfd.get("vdu", ()):
281 if vdu.get("vdu-configuration"):
282 vdu_needed_access.append(vdu["id"])
283 elif mgmt_cp:
284 for vdu_interface in vdu.get("interface"):
285 if vdu_interface.get("external-connection-point-ref") and \
286 vdu_interface["external-connection-point-ref"] == mgmt_cp:
287 vdu_needed_access.append(vdu["id"])
288 mgmt_cp = None
289 break
290
291 if vdu_needed_access:
292 for vnf_member in nsd.get("constituent-vnfd"):
293 if vnf_member["vnfd-id-ref"] != vnfd_ref:
294 continue
295 for vdu in vdu_needed_access:
296 populate_dict(RO_ns_params,
297 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
298 n2vc_key_list)
299
300 if ns_params.get("vduImage"):
301 RO_ns_params["vduImage"] = ns_params["vduImage"]
302
303 if ns_params.get("ssh_keys"):
304 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
305 for vnf_params in get_iterable(ns_params, "vnf"):
306 for constituent_vnfd in nsd["constituent-vnfd"]:
307 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
308 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
309 break
310 else:
311 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
312 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
313 if vnf_params.get("vimAccountId"):
314 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
315 vim_account_2_RO(vnf_params["vimAccountId"]))
316
317 for vdu_params in get_iterable(vnf_params, "vdu"):
318 # TODO feature 1417: check that this VDU exist and it is not a PDU
319 if vdu_params.get("volume"):
320 for volume_params in vdu_params["volume"]:
321 if volume_params.get("vim-volume-id"):
322 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
323 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
324 volume_params["vim-volume-id"])
325 if vdu_params.get("interface"):
326 for interface_params in vdu_params["interface"]:
327 if interface_params.get("ip-address"):
328 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
329 vdu_params["id"], "interfaces", interface_params["name"],
330 "ip_address"),
331 interface_params["ip-address"])
332 if interface_params.get("mac-address"):
333 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
334 vdu_params["id"], "interfaces", interface_params["name"],
335 "mac_address"),
336 interface_params["mac-address"])
337 if interface_params.get("floating-ip-required"):
338 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
339 vdu_params["id"], "interfaces", interface_params["name"],
340 "floating-ip"),
341 interface_params["floating-ip-required"])
342
343 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
344 if internal_vld_params.get("vim-network-name"):
345 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
346 internal_vld_params["name"], "vim-network-name"),
347 internal_vld_params["vim-network-name"])
348 if internal_vld_params.get("ip-profile"):
349 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
350 internal_vld_params["name"], "ip-profile"),
351 ip_profile_2_RO(internal_vld_params["ip-profile"]))
352
353 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
354 # look for interface
355 iface_found = False
356 for vdu_descriptor in vnf_descriptor["vdu"]:
357 for vdu_interface in vdu_descriptor["interface"]:
358 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
359 if icp_params.get("ip-address"):
360 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
361 vdu_descriptor["id"], "interfaces",
362 vdu_interface["name"], "ip_address"),
363 icp_params["ip-address"])
364
365 if icp_params.get("mac-address"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
367 vdu_descriptor["id"], "interfaces",
368 vdu_interface["name"], "mac_address"),
369 icp_params["mac-address"])
370 iface_found = True
371 break
372 if iface_found:
373 break
374 else:
375 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
376 "internal-vld:id-ref={} is not present at vnfd:internal-"
377 "connection-point".format(vnf_params["member-vnf-index"],
378 icp_params["id-ref"]))
379
380 for vld_params in get_iterable(ns_params, "vld"):
381 if "ip-profile" in vld_params:
382 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
383 ip_profile_2_RO(vld_params["ip-profile"]))
384 if vld_params.get("vim-network-name"):
385 RO_vld_sites = []
386 if isinstance(vld_params["vim-network-name"], dict):
387 for vim_account, vim_net in vld_params["vim-network-name"].items():
388 RO_vld_sites.append({
389 "netmap-use": vim_net,
390 "datacenter": vim_account_2_RO(vim_account)
391 })
392 else: # isinstance str
393 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
394 if RO_vld_sites:
395 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
396 if "vnfd-connection-point-ref" in vld_params:
397 for cp_params in vld_params["vnfd-connection-point-ref"]:
398 # look for interface
399 for constituent_vnfd in nsd["constituent-vnfd"]:
400 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
401 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
402 break
403 else:
404 raise LcmException(
405 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
406 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
407 match_cp = False
408 for vdu_descriptor in vnf_descriptor["vdu"]:
409 for interface_descriptor in vdu_descriptor["interface"]:
410 if interface_descriptor.get("external-connection-point-ref") == \
411 cp_params["vnfd-connection-point-ref"]:
412 match_cp = True
413 break
414 if match_cp:
415 break
416 else:
417 raise LcmException(
418 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
419 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
420 cp_params["member-vnf-index-ref"],
421 cp_params["vnfd-connection-point-ref"],
422 vnf_descriptor["id"]))
423 if cp_params.get("ip-address"):
424 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
425 vdu_descriptor["id"], "interfaces",
426 interface_descriptor["name"], "ip_address"),
427 cp_params["ip-address"])
428 if cp_params.get("mac-address"):
429 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
430 vdu_descriptor["id"], "interfaces",
431 interface_descriptor["name"], "mac_address"),
432 cp_params["mac-address"])
433 return RO_ns_params
434
435 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
436 # make a copy to do not change
437 vdu_create = copy(vdu_create)
438 vdu_delete = copy(vdu_delete)
439
440 vdurs = db_vnfr.get("vdur")
441 if vdurs is None:
442 vdurs = []
443 vdu_index = len(vdurs)
444 while vdu_index:
445 vdu_index -= 1
446 vdur = vdurs[vdu_index]
447 if vdur.get("pdu-type"):
448 continue
449 vdu_id_ref = vdur["vdu-id-ref"]
450 if vdu_create and vdu_create.get(vdu_id_ref):
451 for index in range(0, vdu_create[vdu_id_ref]):
452 vdur = deepcopy(vdur)
453 vdur["_id"] = str(uuid4())
454 vdur["count-index"] += 1
455 vdurs.insert(vdu_index+1+index, vdur)
456 del vdu_create[vdu_id_ref]
457 if vdu_delete and vdu_delete.get(vdu_id_ref):
458 del vdurs[vdu_index]
459 vdu_delete[vdu_id_ref] -= 1
460 if not vdu_delete[vdu_id_ref]:
461 del vdu_delete[vdu_id_ref]
462 # check all operations are done
463 if vdu_create or vdu_delete:
464 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
465 vdu_create))
466 if vdu_delete:
467 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
468 vdu_delete))
469
470 vnfr_update = {"vdur": vdurs}
471 db_vnfr["vdur"] = vdurs
472 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
473
474 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
475 """
476 Updates database nsr with the RO info for the created vld
477 :param ns_update_nsr: dictionary to be filled with the updated info
478 :param db_nsr: content of db_nsr. This is also modified
479 :param nsr_desc_RO: nsr descriptor from RO
480 :return: Nothing, LcmException is raised on errors
481 """
482
483 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
484 for net_RO in get_iterable(nsr_desc_RO, "nets"):
485 if vld["id"] != net_RO.get("ns_net_osm_id"):
486 continue
487 vld["vim-id"] = net_RO.get("vim_net_id")
488 vld["name"] = net_RO.get("vim_name")
489 vld["status"] = net_RO.get("status")
490 vld["status-detailed"] = net_RO.get("error_msg")
491 ns_update_nsr["vld.{}".format(vld_index)] = vld
492 break
493 else:
494 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
495
496 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
497 """
498 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
499 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
500 :param nsr_desc_RO: nsr descriptor from RO
501 :return: Nothing, LcmException is raised on errors
502 """
503 for vnf_index, db_vnfr in db_vnfrs.items():
504 for vnf_RO in nsr_desc_RO["vnfs"]:
505 if vnf_RO["member_vnf_index"] != vnf_index:
506 continue
507 vnfr_update = {}
508 if vnf_RO.get("ip_address"):
509 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
510 elif not db_vnfr.get("ip-address"):
511 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
512
513 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
514 vdur_RO_count_index = 0
515 if vdur.get("pdu-type"):
516 continue
517 for vdur_RO in get_iterable(vnf_RO, "vms"):
518 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
519 continue
520 if vdur["count-index"] != vdur_RO_count_index:
521 vdur_RO_count_index += 1
522 continue
523 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
524 vdur["ip-address"] = vdur_RO.get("ip_address")
525 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
526 vdur["name"] = vdur_RO.get("vim_name")
527 vdur["status"] = vdur_RO.get("status")
528 vdur["status-detailed"] = vdur_RO.get("error_msg")
529 for ifacer in get_iterable(vdur, "interfaces"):
530 for interface_RO in get_iterable(vdur_RO, "interfaces"):
531 if ifacer["name"] == interface_RO.get("internal_name"):
532 ifacer["ip-address"] = interface_RO.get("ip_address")
533 ifacer["mac-address"] = interface_RO.get("mac_address")
534 break
535 else:
536 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
537 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
538 vnfr_update["vdur.{}".format(vdu_index)] = vdur
539 break
540 else:
541 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
542 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
543
544 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
545 for net_RO in get_iterable(nsr_desc_RO, "nets"):
546 if vld["id"] != net_RO.get("vnf_net_osm_id"):
547 continue
548 vld["vim-id"] = net_RO.get("vim_net_id")
549 vld["name"] = net_RO.get("vim_name")
550 vld["status"] = net_RO.get("status")
551 vld["status-detailed"] = net_RO.get("error_msg")
552 vnfr_update["vld.{}".format(vld_index)] = vld
553 break
554 else:
555 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
556 vnf_index, vld["id"]))
557
558 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
559 break
560
561 else:
562 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
563
564 async def instantiate(self, nsr_id, nslcmop_id):
565 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
566 self.logger.debug(logging_text + "Enter")
567 # get all needed from database
568 start_deploy = time()
569 db_nsr = None
570 db_nslcmop = None
571 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
572 db_nslcmop_update = {}
573 nslcmop_operation_state = None
574 db_vnfrs = {}
575 RO_descriptor_number = 0 # number of descriptors created at RO
576 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
577 n2vc_info = {}
578 exc = None
579 try:
580 step = "Getting nslcmop={} from db".format(nslcmop_id)
581 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
582 step = "Getting nsr={} from db".format(nsr_id)
583 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
584 ns_params = db_nslcmop.get("operationParams")
585 nsd = db_nsr["nsd"]
586 nsr_name = db_nsr["name"] # TODO short-name??
587
588 # look if previous tasks in process
589 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
590 if task_dependency:
591 step = db_nslcmop_update["detailed-status"] = \
592 "Waiting for related tasks to be completed: {}".format(task_name)
593 self.logger.debug(logging_text + step)
594 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
595 _, pending = await asyncio.wait(task_dependency, timeout=3600)
596 if pending:
597 raise LcmException("Timeout waiting related tasks to be completed")
598
599 step = "Getting vnfrs from db"
600 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
601 db_vnfds_ref = {}
602 db_vnfds = {}
603 for vnfr in db_vnfrs_list:
604 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
605 vnfd_id = vnfr["vnfd-id"]
606 vnfd_ref = vnfr["vnfd-ref"]
607 if vnfd_id not in db_vnfds:
608 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
609 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
610 db_vnfds_ref[vnfd_ref] = vnfd
611 db_vnfds[vnfd_id] = vnfd
612
613 # Get or generates the _admin.deployed,VCA list
614 vca_deployed_list = None
615 if db_nsr["_admin"].get("deployed"):
616 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
617 if vca_deployed_list is None:
618 vca_deployed_list = []
619 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
620 elif isinstance(vca_deployed_list, dict):
621 # maintain backward compatibility. Change a dict to list at database
622 vca_deployed_list = list(vca_deployed_list.values())
623 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
624
625 db_nsr_update["detailed-status"] = "creating"
626 db_nsr_update["operational-status"] = "init"
627
628 RO = ROclient.ROClient(self.loop, **self.ro_config)
629
630 # set state to INSTANTIATED. When instantiated NBI will not delete directly
631 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
632 self.update_db_2("nsrs", nsr_id, db_nsr_update)
633
634 # get vnfds, instantiate at RO
635 for vnfd_id, vnfd in db_vnfds.items():
636 vnfd_ref = vnfd["id"]
637 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
638 # self.logger.debug(logging_text + step)
639 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
640 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
641 RO_descriptor_number += 1
642
643 # look if present
644 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
645 if vnfd_list:
646 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
647 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
648 vnfd_ref, vnfd_list[0]["uuid"]))
649 else:
650 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
651 desc = await RO.create("vnfd", descriptor=vnfd_RO)
652 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
653 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
654 vnfd_ref, desc["uuid"]))
655 self.update_db_2("nsrs", nsr_id, db_nsr_update)
656
657 # create nsd at RO
658 nsd_ref = nsd["id"]
659 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
660 # self.logger.debug(logging_text + step)
661
662 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
663 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
664 RO_descriptor_number += 1
665 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
666 if nsd_list:
667 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
668 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
669 nsd_ref, RO_nsd_uuid))
670 else:
671 nsd_RO = deepcopy(nsd)
672 nsd_RO["id"] = RO_osm_nsd_id
673 nsd_RO.pop("_id", None)
674 nsd_RO.pop("_admin", None)
675 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
676 vnfd_id = c_vnf["vnfd-id-ref"]
677 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
678 desc = await RO.create("nsd", descriptor=nsd_RO)
679 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
680 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
681 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
682 self.update_db_2("nsrs", nsr_id, db_nsr_update)
683
684 # Crate ns at RO
685 # if present use it unless in error status
686 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
687 if RO_nsr_id:
688 try:
689 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
690 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
691 desc = await RO.show("ns", RO_nsr_id)
692 except ROclient.ROClientException as e:
693 if e.http_code != HTTPStatus.NOT_FOUND:
694 raise
695 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
696 if RO_nsr_id:
697 ns_status, ns_status_info = RO.check_ns_status(desc)
698 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
699 if ns_status == "ERROR":
700 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
701 self.logger.debug(logging_text + step)
702 await RO.delete("ns", RO_nsr_id)
703 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
704 if not RO_nsr_id:
705 step = db_nsr_update["detailed-status"] = "Checking dependencies"
706 # self.logger.debug(logging_text + step)
707
708 # check if VIM is creating and wait look if previous tasks in process
709 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
710 if task_dependency:
711 step = "Waiting for related tasks to be completed: {}".format(task_name)
712 self.logger.debug(logging_text + step)
713 await asyncio.wait(task_dependency, timeout=3600)
714 if ns_params.get("vnf"):
715 for vnf in ns_params["vnf"]:
716 if "vimAccountId" in vnf:
717 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
718 vnf["vimAccountId"])
719 if task_dependency:
720 step = "Waiting for related tasks to be completed: {}".format(task_name)
721 self.logger.debug(logging_text + step)
722 await asyncio.wait(task_dependency, timeout=3600)
723
724 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
725
726 # feature 1429. Add n2vc public key to needed VMs
727 n2vc_key = await self.n2vc.GetPublicKey()
728 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
729
730 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
731 desc = await RO.create("ns", descriptor=RO_ns_params,
732 name=db_nsr["name"],
733 scenario=RO_nsd_uuid)
734 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
735 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
736 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
737 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
738 self.update_db_2("nsrs", nsr_id, db_nsr_update)
739
740 # wait until NS is ready
741 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
742 detailed_status_old = None
743 self.logger.debug(logging_text + step)
744
745 while time() <= start_deploy + self.total_deploy_timeout:
746 desc = await RO.show("ns", RO_nsr_id)
747 ns_status, ns_status_info = RO.check_ns_status(desc)
748 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
749 if ns_status == "ERROR":
750 raise ROclient.ROClientException(ns_status_info)
751 elif ns_status == "BUILD":
752 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
753 elif ns_status == "ACTIVE":
754 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
755 try:
756 self.ns_update_vnfr(db_vnfrs, desc)
757 break
758 except LcmExceptionNoMgmtIP:
759 pass
760 else:
761 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
762 if detailed_status != detailed_status_old:
763 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
764 self.update_db_2("nsrs", nsr_id, db_nsr_update)
765 await asyncio.sleep(5, loop=self.loop)
766 else: # total_deploy_timeout
767 raise ROclient.ROClientException("Timeout waiting ns to be ready")
768
769 step = "Updating NSR"
770 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
771
772 db_nsr["detailed-status"] = "Configuring vnfr"
773 self.update_db_2("nsrs", nsr_id, db_nsr_update)
774
775 # The parameters we'll need to deploy a charm
776 number_to_configure = 0
777
778 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, mgmt_ip_address, n2vc_info,
779 config_primitive=None):
780 """An inner function to deploy the charm from either vnf or vdu
781 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
782 """
783 if not mgmt_ip_address:
784 raise LcmException("vnfd/vdu has not management ip address to configure it")
785 # Login to the VCA.
786 # if number_to_configure == 0:
787 # self.logger.debug("Logging into N2VC...")
788 # task = asyncio.ensure_future(self.n2vc.login())
789 # yield from asyncio.wait_for(task, 30.0)
790 # self.logger.debug("Logged into N2VC!")
791
792 # # await self.n2vc.login()
793
794 # Note: The charm needs to exist on disk at the location
795 # specified by charm_path.
796 base_folder = vnfd["_admin"]["storage"]
797 storage_params = self.fs.get_params()
798 charm_path = "{}{}/{}/charms/{}".format(
799 storage_params["path"],
800 base_folder["folder"],
801 base_folder["pkg-dir"],
802 proxy_charm
803 )
804
805 # Setup the runtime parameters for this VNF
806 params = {'rw_mgmt_ip': mgmt_ip_address}
807 if config_primitive:
808 params["initial-config-primitive"] = config_primitive
809
810 # ns_name will be ignored in the current version of N2VC
811 # but will be implemented for the next point release.
812 model_name = "default" # TODO bug 585 nsr_id
813 if vdu_id:
814 vdu_id_text = vdu_id + "-"
815 else:
816 vdu_id_text = "-"
817 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
818
819 vca_index = len(vca_deployed_list)
820 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
821 # 26*26 charm in the same NS
822 application_name = application_name[0:48]
823 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
824 vca_deployed_ = {
825 "member-vnf-index": vnf_index,
826 "vdu_id": vdu_id,
827 "model": model_name,
828 "application": application_name,
829 "operational-status": "init",
830 "detailed-status": "",
831 "vnfd_id": vnfd_id,
832 "vdu_name": vdu_name,
833 "vdu_count_index": vdu_count_index,
834 }
835 vca_deployed_list.append(vca_deployed_)
836 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
837 self.update_db_2("nsrs", nsr_id, db_nsr_update)
838
839 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
840 proxy_charm))
841 if not n2vc_info:
842 n2vc_info["nsr_id"] = nsr_id
843 n2vc_info["nslcmop_id"] = nslcmop_id
844 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
845 n2vc_info["lcmOperationType"] = "instantiate"
846 n2vc_info["deployed"] = vca_deployed_list
847 n2vc_info["db_update"] = db_nsr_update
848 task = asyncio.ensure_future(
849 self.n2vc.DeployCharms(
850 model_name, # The network service name
851 application_name, # The application name
852 vnfd, # The vnf descriptor
853 charm_path, # Path to charm
854 params, # Runtime params, like mgmt ip
855 {}, # for native charms only
856 self.n2vc_callback, # Callback for status changes
857 n2vc_info, # Callback parameter
858 None, # Callback parameter (task)
859 )
860 )
861 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
862 n2vc_info))
863 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
864
865 step = "Looking for needed vnfd to configure"
866 self.logger.debug(logging_text + step)
867
868 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
869 vnfd_id = c_vnf["vnfd-id-ref"]
870 vnf_index = str(c_vnf["member-vnf-index"])
871 vnfd = db_vnfds_ref[vnfd_id]
872
873 # Check if this VNF has a charm configuration
874 vnf_config = vnfd.get("vnf-configuration")
875
876 if vnf_config and vnf_config.get("juju"):
877 proxy_charm = vnf_config["juju"]["charm"]
878 config_primitive = None
879
880 if proxy_charm:
881 if 'initial-config-primitive' in vnf_config:
882 config_primitive = vnf_config['initial-config-primitive']
883
884 # Login to the VCA. If there are multiple calls to login(),
885 # subsequent calls will be a nop and return immediately.
886 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
887 await self.n2vc.login()
888 deploy_charm(vnf_index, None, None, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info,
889 config_primitive)
890 number_to_configure += 1
891
892 # Deploy charms for each VDU that supports one.
893 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
894 vdu_config = vdu.get('vdu-configuration')
895 proxy_charm = None
896 config_primitive = None
897
898 if vdu_config and vdu_config.get("juju"):
899 proxy_charm = vdu_config["juju"]["charm"]
900
901 if 'initial-config-primitive' in vdu_config:
902 config_primitive = vdu_config['initial-config-primitive']
903
904 if proxy_charm:
905 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
906 await self.n2vc.login()
907 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
908 # TODO for the moment only first vdu_id contains a charm deployed
909 if vdur["vdu-id-ref"] != vdu["id"]:
910 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
911 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
912 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
913 vdur["ip-address"], n2vc_info, config_primitive)
914 number_to_configure += 1
915
916 db_nsr_update["operational-status"] = "running"
917 configuration_failed = False
918 if number_to_configure:
919 old_status = "configuring: init: {}".format(number_to_configure)
920 db_nsr_update["config-status"] = old_status
921 db_nsr_update["detailed-status"] = old_status
922 db_nslcmop_update["detailed-status"] = old_status
923
924 # wait until all are configured.
925 while time() <= start_deploy + self.total_deploy_timeout:
926 if db_nsr_update:
927 self.update_db_2("nsrs", nsr_id, db_nsr_update)
928 if db_nslcmop_update:
929 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
930 # TODO add a fake tast that set n2vc_event after some time
931 await n2vc_info["n2vc_event"].wait()
932 n2vc_info["n2vc_event"].clear()
933 all_active = True
934 status_map = {}
935 n2vc_error_text = [] # contain text error list. If empty no one is in error status
936 now = time()
937 for vca_deployed in vca_deployed_list:
938 vca_status = vca_deployed["operational-status"]
939 if vca_status not in status_map:
940 # Initialize it
941 status_map[vca_status] = 0
942 status_map[vca_status] += 1
943
944 if vca_status == "active":
945 vca_deployed.pop("time_first_error", None)
946 vca_deployed.pop("status_first_error", None)
947 continue
948
949 all_active = False
950 if vca_status in ("error", "blocked"):
951 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
952 # if not first time in this status error
953 if not vca_deployed.get("time_first_error"):
954 vca_deployed["time_first_error"] = now
955 continue
956 if vca_deployed.get("time_first_error") and \
957 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
958 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
959 .format(vca_deployed["member-vnf-index"],
960 vca_deployed["vdu_id"], vca_status,
961 vca_deployed["detailed-status-error"]))
962
963 if all_active:
964 break
965 elif n2vc_error_text:
966 db_nsr_update["config-status"] = "failed"
967 error_text = "fail configuring " + ";".join(n2vc_error_text)
968 db_nsr_update["detailed-status"] = error_text
969 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
970 db_nslcmop_update["detailed-status"] = error_text
971 db_nslcmop_update["statusEnteredTime"] = time()
972 configuration_failed = True
973 break
974 else:
975 cs = "configuring: "
976 separator = ""
977 for status, num in status_map.items():
978 cs += separator + "{}: {}".format(status, num)
979 separator = ", "
980 if old_status != cs:
981 db_nsr_update["config-status"] = cs
982 db_nsr_update["detailed-status"] = cs
983 db_nslcmop_update["detailed-status"] = cs
984 old_status = cs
985 else: # total_deploy_timeout
986 raise LcmException("Timeout waiting ns to be configured")
987
988 if not configuration_failed:
989 # all is done
990 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
991 db_nslcmop_update["statusEnteredTime"] = time()
992 db_nslcmop_update["detailed-status"] = "done"
993 db_nsr_update["config-status"] = "configured"
994 db_nsr_update["detailed-status"] = "done"
995
996 return
997
998 except (ROclient.ROClientException, DbException, LcmException) as e:
999 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1000 exc = e
1001 except asyncio.CancelledError:
1002 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1003 exc = "Operation was cancelled"
1004 except Exception as e:
1005 exc = traceback.format_exc()
1006 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1007 exc_info=True)
1008 finally:
1009 if exc:
1010 if db_nsr:
1011 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1012 db_nsr_update["operational-status"] = "failed"
1013 if db_nslcmop:
1014 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1015 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1016 db_nslcmop_update["statusEnteredTime"] = time()
1017 try:
1018 if db_nsr:
1019 db_nsr_update["_admin.nslcmop"] = None
1020 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1021 if db_nslcmop_update:
1022 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1023 except DbException as e:
1024 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1025 if nslcmop_operation_state:
1026 try:
1027 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1028 "operationState": nslcmop_operation_state})
1029 except Exception as e:
1030 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1031
1032 self.logger.debug(logging_text + "Exit")
1033 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1034
1035 async def _destroy_charm(self, model, application):
1036 """
1037 Order N2VC destroy a charm
1038 :param model:
1039 :param application:
1040 :return: True if charm does not exist. False if it exist
1041 """
1042 if not await self.n2vc.HasApplication(model, application):
1043 return True # Already removed
1044 await self.n2vc.RemoveCharms(model, application)
1045 return False
1046
1047 async def _wait_charm_destroyed(self, model, application, timeout):
1048 """
1049 Wait until charm does not exist
1050 :param model:
1051 :param application:
1052 :param timeout:
1053 :return: True if not exist, False if timeout
1054 """
1055 while True:
1056 if not await self.n2vc.HasApplication(model, application):
1057 return True
1058 if timeout < 0:
1059 return False
1060 await asyncio.sleep(10)
1061 timeout -= 10
1062
1063 async def terminate(self, nsr_id, nslcmop_id):
1064 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1065 self.logger.debug(logging_text + "Enter")
1066 db_nsr = None
1067 db_nslcmop = None
1068 exc = None
1069 failed_detail = [] # annotates all failed error messages
1070 vca_time_destroy = None # time of where destroy charm order
1071 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1072 db_nslcmop_update = {}
1073 nslcmop_operation_state = None
1074 try:
1075 step = "Getting nslcmop={} from db".format(nslcmop_id)
1076 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1077 step = "Getting nsr={} from db".format(nsr_id)
1078 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1079 # nsd = db_nsr["nsd"]
1080 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1081 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1082 return
1083 # #TODO check if VIM is creating and wait
1084 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1085
1086 db_nsr_update["operational-status"] = "terminating"
1087 db_nsr_update["config-status"] = "terminating"
1088
1089 if nsr_deployed and nsr_deployed.get("VCA"):
1090 try:
1091 step = "Scheduling configuration charms removing"
1092 db_nsr_update["detailed-status"] = "Deleting charms"
1093 self.logger.debug(logging_text + step)
1094 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1095 # for backward compatibility
1096 if isinstance(nsr_deployed["VCA"], dict):
1097 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1098 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1099 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1100
1101 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1102 if vca_deployed:
1103 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1104 vca_deployed.clear()
1105 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1106 else:
1107 vca_time_destroy = time()
1108 except Exception as e:
1109 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1110
1111 # remove from RO
1112 RO_fail = False
1113 RO = ROclient.ROClient(self.loop, **self.ro_config)
1114
1115 # Delete ns
1116 RO_nsr_id = RO_delete_action = None
1117 if nsr_deployed and nsr_deployed.get("RO"):
1118 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1119 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1120 try:
1121 if RO_nsr_id:
1122 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1123 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1124 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1125 self.logger.debug(logging_text + step)
1126 desc = await RO.delete("ns", RO_nsr_id)
1127 RO_delete_action = desc["action_id"]
1128 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1129 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1130 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1131 if RO_delete_action:
1132 # wait until NS is deleted from VIM
1133 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1134 format(RO_nsr_id, RO_delete_action)
1135 detailed_status_old = None
1136 self.logger.debug(logging_text + step)
1137
1138 delete_timeout = 20 * 60 # 20 minutes
1139 while delete_timeout > 0:
1140 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1141 extra_item_id=RO_delete_action)
1142 ns_status, ns_status_info = RO.check_action_status(desc)
1143 if ns_status == "ERROR":
1144 raise ROclient.ROClientException(ns_status_info)
1145 elif ns_status == "BUILD":
1146 detailed_status = step + "; {}".format(ns_status_info)
1147 elif ns_status == "ACTIVE":
1148 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1149 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1150 break
1151 else:
1152 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1153 if detailed_status != detailed_status_old:
1154 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1155 db_nsr_update["detailed-status"] = detailed_status
1156 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1157 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1158 await asyncio.sleep(5, loop=self.loop)
1159 delete_timeout -= 5
1160 else: # delete_timeout <= 0:
1161 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1162
1163 except ROclient.ROClientException as e:
1164 if e.http_code == 404: # not found
1165 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1166 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1167 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1168 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1169 elif e.http_code == 409: # conflict
1170 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1171 self.logger.debug(logging_text + failed_detail[-1])
1172 RO_fail = True
1173 else:
1174 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1175 self.logger.error(logging_text + failed_detail[-1])
1176 RO_fail = True
1177
1178 # Delete nsd
1179 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1180 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1181 try:
1182 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1183 "Deleting nsd at RO"
1184 await RO.delete("nsd", RO_nsd_id)
1185 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1186 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1187 except ROclient.ROClientException as e:
1188 if e.http_code == 404: # not found
1189 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1190 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1191 elif e.http_code == 409: # conflict
1192 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1193 self.logger.debug(logging_text + failed_detail[-1])
1194 RO_fail = True
1195 else:
1196 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1197 self.logger.error(logging_text + failed_detail[-1])
1198 RO_fail = True
1199
1200 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1201 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1202 if not RO_vnfd_id:
1203 continue
1204 try:
1205 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1206 "Deleting vnfd={} at RO".format(vnf_id)
1207 await RO.delete("vnfd", RO_vnfd_id)
1208 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1209 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1210 except ROclient.ROClientException as e:
1211 if e.http_code == 404: # not found
1212 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1213 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1214 elif e.http_code == 409: # conflict
1215 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1216 self.logger.debug(logging_text + failed_detail[-1])
1217 else:
1218 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1219 self.logger.error(logging_text + failed_detail[-1])
1220
1221 # wait until charm deleted
1222 if vca_time_destroy:
1223 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1224 "Waiting for deletion of configuration charms"
1225 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1226 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1227 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1228 if not vca_deployed:
1229 continue
1230 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1231 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1232 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1233 timeout):
1234 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1235 vca_deployed["application"]))
1236 else:
1237 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1238
1239 if failed_detail:
1240 self.logger.error(logging_text + " ;".join(failed_detail))
1241 db_nsr_update["operational-status"] = "failed"
1242 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1243 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1244 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1245 db_nslcmop_update["statusEnteredTime"] = time()
1246 elif db_nslcmop["operationParams"].get("autoremove"):
1247 self.db.del_one("nsrs", {"_id": nsr_id})
1248 db_nsr = None
1249 db_nsr_update.clear()
1250 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1251 db_nslcmop = None
1252 nslcmop_operation_state = "COMPLETED"
1253 db_nslcmop_update.clear()
1254 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1255 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1256 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1257 self.logger.debug(logging_text + "Delete from database")
1258 else:
1259 db_nsr_update["operational-status"] = "terminated"
1260 db_nsr_update["detailed-status"] = "Done"
1261 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1262 db_nslcmop_update["detailed-status"] = "Done"
1263 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1264 db_nslcmop_update["statusEnteredTime"] = time()
1265
1266 except (ROclient.ROClientException, DbException) as e:
1267 self.logger.error(logging_text + "Exit Exception {}".format(e))
1268 exc = e
1269 except asyncio.CancelledError:
1270 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1271 exc = "Operation was cancelled"
1272 except Exception as e:
1273 exc = traceback.format_exc()
1274 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1275 finally:
1276 if exc and db_nslcmop:
1277 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1278 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1279 db_nslcmop_update["statusEnteredTime"] = time()
1280 try:
1281 if db_nslcmop and db_nslcmop_update:
1282 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1283 if db_nsr:
1284 db_nsr_update["_admin.nslcmop"] = None
1285 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1286 except DbException as e:
1287 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1288 if nslcmop_operation_state:
1289 try:
1290 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1291 "operationState": nslcmop_operation_state})
1292 except Exception as e:
1293 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1294 self.logger.debug(logging_text + "Exit")
1295 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1296
1297 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1298 primitive, primitive_params):
1299
1300 for vca_deployed in db_deployed["VCA"]:
1301 if not vca_deployed:
1302 continue
1303 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1304 continue
1305 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1306 continue
1307 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1308 continue
1309 break
1310 else:
1311 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1312 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1313 model_name = vca_deployed.get("model")
1314 application_name = vca_deployed.get("application")
1315 if not model_name or not application_name:
1316 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1317 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1318 if vca_deployed["operational-status"] != "active":
1319 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1320 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1321 callback = None # self.n2vc_callback
1322 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1323 await self.n2vc.login()
1324 task = asyncio.ensure_future(
1325 self.n2vc.ExecutePrimitive(
1326 model_name,
1327 application_name,
1328 primitive, callback,
1329 *callback_args,
1330 **primitive_params
1331 )
1332 )
1333 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1334 # db_nsr, db_nslcmop, member_vnf_index))
1335 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1336 # wait until completed with timeout
1337 await asyncio.wait((task,), timeout=600)
1338
1339 result = "FAILED" # by default
1340 result_detail = ""
1341 if task.cancelled():
1342 result_detail = "Task has been cancelled"
1343 elif task.done():
1344 exc = task.exception()
1345 if exc:
1346 result_detail = str(exc)
1347 else:
1348 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1349 result = "COMPLETED"
1350 result_detail = "Done"
1351 else: # timeout
1352 # TODO Should it be cancelled?!!
1353 task.cancel()
1354 result_detail = "timeout"
1355 return result, result_detail
1356
1357 async def action(self, nsr_id, nslcmop_id):
1358 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1359 self.logger.debug(logging_text + "Enter")
1360 # get all needed from database
1361 db_nsr = None
1362 db_nslcmop = None
1363 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1364 db_nslcmop_update = {}
1365 nslcmop_operation_state = None
1366 exc = None
1367 try:
1368 step = "Getting information from database"
1369 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1370 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1371 nsr_deployed = db_nsr["_admin"].get("deployed")
1372 nsr_name = db_nsr["name"]
1373 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1374 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1375 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1376 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1377
1378 # look if previous tasks in process
1379 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1380 if task_dependency:
1381 step = db_nslcmop_update["detailed-status"] = \
1382 "Waiting for related tasks to be completed: {}".format(task_name)
1383 self.logger.debug(logging_text + step)
1384 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1385 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1386 if pending:
1387 raise LcmException("Timeout waiting related tasks to be completed")
1388
1389 # for backward compatibility
1390 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1391 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1392 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1393 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1394
1395 # TODO check if ns is in a proper status
1396 primitive = db_nslcmop["operationParams"]["primitive"]
1397 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1398 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1399 vdu_name, vdu_count_index, primitive,
1400 primitive_params)
1401 db_nslcmop_update["detailed-status"] = result_detail
1402 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1403 db_nslcmop_update["statusEnteredTime"] = time()
1404 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1405 return # database update is called inside finally
1406
1407 except (DbException, LcmException) as e:
1408 self.logger.error(logging_text + "Exit Exception {}".format(e))
1409 exc = e
1410 except asyncio.CancelledError:
1411 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1412 exc = "Operation was cancelled"
1413 except Exception as e:
1414 exc = traceback.format_exc()
1415 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1416 finally:
1417 if exc and db_nslcmop:
1418 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1419 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1420 db_nslcmop_update["statusEnteredTime"] = time()
1421 try:
1422 if db_nslcmop_update:
1423 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1424 if db_nsr:
1425 db_nsr_update["_admin.nslcmop"] = None
1426 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1427 except DbException as e:
1428 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1429 self.logger.debug(logging_text + "Exit")
1430 if nslcmop_operation_state:
1431 try:
1432 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1433 "operationState": nslcmop_operation_state})
1434 except Exception as e:
1435 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1436 self.logger.debug(logging_text + "Exit")
1437 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1438
1439 async def scale(self, nsr_id, nslcmop_id):
1440 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1441 self.logger.debug(logging_text + "Enter")
1442 # get all needed from database
1443 db_nsr = None
1444 db_nslcmop = None
1445 db_nslcmop_update = {}
1446 nslcmop_operation_state = None
1447 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1448 exc = None
1449 # in case of error, indicates what part of scale was failed to put nsr at error status
1450 scale_process = None
1451 old_operational_status = ""
1452 old_config_status = ""
1453 vnfr_scaled = False
1454 try:
1455 step = "Getting nslcmop from database"
1456 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1457 step = "Getting nsr from database"
1458 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1459 nsr_name = db_nsr["name"]
1460 old_operational_status = db_nsr["operational-status"]
1461 old_config_status = db_nsr["config-status"]
1462
1463 # look if previous tasks in process
1464 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1465 if task_dependency:
1466 step = db_nslcmop_update["detailed-status"] = \
1467 "Waiting for related tasks to be completed: {}".format(task_name)
1468 self.logger.debug(logging_text + step)
1469 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1470 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1471 if pending:
1472 raise LcmException("Timeout waiting related tasks to be completed")
1473
1474 step = "Parsing scaling parameters"
1475 db_nsr_update["operational-status"] = "scaling"
1476 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1477 nsr_deployed = db_nsr["_admin"].get("deployed")
1478 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1479 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1480 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1481 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1482 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1483
1484 # for backward compatibility
1485 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1486 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1487 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1488 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1489
1490 step = "Getting vnfr from database"
1491 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1492 step = "Getting vnfd from database"
1493 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1494 step = "Getting scaling-group-descriptor"
1495 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1496 if scaling_descriptor["name"] == scaling_group:
1497 break
1498 else:
1499 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1500 "at vnfd:scaling-group-descriptor".format(scaling_group))
1501 # cooldown_time = 0
1502 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1503 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1504 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1505 # break
1506
1507 # TODO check if ns is in a proper status
1508 step = "Sending scale order to RO"
1509 nb_scale_op = 0
1510 if not db_nsr["_admin"].get("scaling-group"):
1511 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1512 admin_scale_index = 0
1513 else:
1514 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1515 if admin_scale_info["name"] == scaling_group:
1516 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1517 break
1518 else: # not found, set index one plus last element and add new entry with the name
1519 admin_scale_index += 1
1520 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1521 RO_scaling_info = []
1522 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1523 if scaling_type == "SCALE_OUT":
1524 # count if max-instance-count is reached
1525 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1526 max_instance_count = int(scaling_descriptor["max-instance-count"])
1527 if nb_scale_op >= max_instance_count:
1528 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1529 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1530 nb_scale_op = nb_scale_op + 1
1531 vdu_scaling_info["scaling_direction"] = "OUT"
1532 vdu_scaling_info["vdu-create"] = {}
1533 for vdu_scale_info in scaling_descriptor["vdu"]:
1534 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1535 "type": "create", "count": vdu_scale_info.get("count", 1)})
1536 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1537 elif scaling_type == "SCALE_IN":
1538 # count if min-instance-count is reached
1539 min_instance_count = 0
1540 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1541 min_instance_count = int(scaling_descriptor["min-instance-count"])
1542 if nb_scale_op <= min_instance_count:
1543 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1544 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1545 nb_scale_op = nb_scale_op - 1
1546 vdu_scaling_info["scaling_direction"] = "IN"
1547 vdu_scaling_info["vdu-delete"] = {}
1548 for vdu_scale_info in scaling_descriptor["vdu"]:
1549 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1550 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1551 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1552
1553 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1554 vdu_create = vdu_scaling_info.get("vdu-create")
1555 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1556 if vdu_scaling_info["scaling_direction"] == "IN":
1557 for vdur in reversed(db_vnfr["vdur"]):
1558 if vdu_delete.get(vdur["vdu-id-ref"]):
1559 vdu_delete[vdur["vdu-id-ref"]] -= 1
1560 vdu_scaling_info["vdu"].append({
1561 "name": vdur["name"],
1562 "vdu_id": vdur["vdu-id-ref"],
1563 "interface": []
1564 })
1565 for interface in vdur["interfaces"]:
1566 vdu_scaling_info["vdu"][-1]["interface"].append({
1567 "name": interface["name"],
1568 "ip_address": interface["ip-address"],
1569 "mac_address": interface.get("mac-address"),
1570 })
1571 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1572
1573 # execute primitive service PRE-SCALING
1574 step = "Executing pre-scale vnf-config-primitive"
1575 if scaling_descriptor.get("scaling-config-action"):
1576 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1577 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1578 and scaling_type == "SCALE_IN":
1579 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1580 step = db_nslcmop_update["detailed-status"] = \
1581 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1582 # look for primitive
1583 primitive_params = {}
1584 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1585 if config_primitive["name"] == vnf_config_primitive:
1586 for parameter in config_primitive.get("parameter", ()):
1587 if 'default-value' in parameter and \
1588 parameter['default-value'] == "<VDU_SCALE_INFO>":
1589 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1590 default_flow_style=True,
1591 width=256)
1592 break
1593 else:
1594 raise LcmException(
1595 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1596 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1597 "primitive".format(scaling_group, config_primitive))
1598 scale_process = "VCA"
1599 db_nsr_update["config-status"] = "configuring pre-scaling"
1600 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1601 None, None, None, vnf_config_primitive,
1602 primitive_params)
1603 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1604 vnf_config_primitive, result, result_detail))
1605 if result == "FAILED":
1606 raise LcmException(result_detail)
1607 db_nsr_update["config-status"] = old_config_status
1608 scale_process = None
1609
1610 if RO_scaling_info:
1611 scale_process = "RO"
1612 RO = ROclient.ROClient(self.loop, **self.ro_config)
1613 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1614 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1615 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1616 # wait until ready
1617 RO_nslcmop_id = RO_desc["instance_action_id"]
1618 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1619
1620 RO_task_done = False
1621 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1622 detailed_status_old = None
1623 self.logger.debug(logging_text + step)
1624
1625 deployment_timeout = 1 * 3600 # One hour
1626 while deployment_timeout > 0:
1627 if not RO_task_done:
1628 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1629 extra_item_id=RO_nslcmop_id)
1630 ns_status, ns_status_info = RO.check_action_status(desc)
1631 if ns_status == "ERROR":
1632 raise ROclient.ROClientException(ns_status_info)
1633 elif ns_status == "BUILD":
1634 detailed_status = step + "; {}".format(ns_status_info)
1635 elif ns_status == "ACTIVE":
1636 RO_task_done = True
1637 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1638 self.logger.debug(logging_text + step)
1639 else:
1640 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1641 else:
1642 desc = await RO.show("ns", RO_nsr_id)
1643 ns_status, ns_status_info = RO.check_ns_status(desc)
1644 if ns_status == "ERROR":
1645 raise ROclient.ROClientException(ns_status_info)
1646 elif ns_status == "BUILD":
1647 detailed_status = step + "; {}".format(ns_status_info)
1648 elif ns_status == "ACTIVE":
1649 step = detailed_status = \
1650 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1651 if not vnfr_scaled:
1652 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1653 vnfr_scaled = True
1654 try:
1655 desc = await RO.show("ns", RO_nsr_id)
1656 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1657 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1658 break
1659 except LcmExceptionNoMgmtIP:
1660 pass
1661 else:
1662 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1663 if detailed_status != detailed_status_old:
1664 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1665 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1666
1667 await asyncio.sleep(5, loop=self.loop)
1668 deployment_timeout -= 5
1669 if deployment_timeout <= 0:
1670 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1671
1672 # update VDU_SCALING_INFO with the obtained ip_addresses
1673 if vdu_scaling_info["scaling_direction"] == "OUT":
1674 for vdur in reversed(db_vnfr["vdur"]):
1675 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1676 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1677 vdu_scaling_info["vdu"].append({
1678 "name": vdur["name"],
1679 "vdu_id": vdur["vdu-id-ref"],
1680 "interface": []
1681 })
1682 for interface in vdur["interfaces"]:
1683 vdu_scaling_info["vdu"][-1]["interface"].append({
1684 "name": interface["name"],
1685 "ip_address": interface["ip-address"],
1686 "mac_address": interface.get("mac-address"),
1687 })
1688 del vdu_scaling_info["vdu-create"]
1689
1690 scale_process = None
1691 if db_nsr_update:
1692 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1693
1694 # execute primitive service POST-SCALING
1695 step = "Executing post-scale vnf-config-primitive"
1696 if scaling_descriptor.get("scaling-config-action"):
1697 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1698 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1699 and scaling_type == "SCALE_OUT":
1700 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1701 step = db_nslcmop_update["detailed-status"] = \
1702 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1703 # look for primitive
1704 primitive_params = {}
1705 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1706 if config_primitive["name"] == vnf_config_primitive:
1707 for parameter in config_primitive.get("parameter", ()):
1708 if 'default-value' in parameter and \
1709 parameter['default-value'] == "<VDU_SCALE_INFO>":
1710 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1711 default_flow_style=True,
1712 width=256)
1713 break
1714 else:
1715 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1716 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1717 "match any vnf-configuration:config-primitive".format(scaling_group,
1718 config_primitive))
1719 scale_process = "VCA"
1720 db_nsr_update["config-status"] = "configuring post-scaling"
1721
1722 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1723 None, None, None, vnf_config_primitive,
1724 primitive_params)
1725 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1726 vnf_config_primitive, result, result_detail))
1727 if result == "FAILED":
1728 raise LcmException(result_detail)
1729 db_nsr_update["config-status"] = old_config_status
1730 scale_process = None
1731
1732 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1733 db_nslcmop_update["statusEnteredTime"] = time()
1734 db_nslcmop_update["detailed-status"] = "done"
1735 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1736 db_nsr_update["operational-status"] = old_operational_status
1737 db_nsr_update["config-status"] = old_config_status
1738 return
1739 except (ROclient.ROClientException, DbException, LcmException) as e:
1740 self.logger.error(logging_text + "Exit Exception {}".format(e))
1741 exc = e
1742 except asyncio.CancelledError:
1743 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1744 exc = "Operation was cancelled"
1745 except Exception as e:
1746 exc = traceback.format_exc()
1747 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1748 finally:
1749 if exc:
1750 if db_nslcmop:
1751 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1752 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1753 db_nslcmop_update["statusEnteredTime"] = time()
1754 if db_nsr:
1755 db_nsr_update["operational-status"] = old_operational_status
1756 db_nsr_update["config-status"] = old_config_status
1757 db_nsr_update["detailed-status"] = ""
1758 db_nsr_update["_admin.nslcmop"] = None
1759 if scale_process:
1760 if "VCA" in scale_process:
1761 db_nsr_update["config-status"] = "failed"
1762 if "RO" in scale_process:
1763 db_nsr_update["operational-status"] = "failed"
1764 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1765 exc)
1766 try:
1767 if db_nslcmop and db_nslcmop_update:
1768 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1769 if db_nsr:
1770 db_nsr_update["_admin.nslcmop"] = None
1771 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1772 except DbException as e:
1773 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1774 if nslcmop_operation_state:
1775 try:
1776 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1777 "operationState": nslcmop_operation_state})
1778 # if cooldown_time:
1779 # await asyncio.sleep(cooldown_time)
1780 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1781 except Exception as e:
1782 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1783 self.logger.debug(logging_text + "Exit")
1784 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")