Feature 1417 Fixing PDUs.
[osm/LCM.git] / osm_lcm / ns.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import logging
7 import logging.handlers
8 import functools
9 import traceback
10
11 import ROclient
12 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
13
14 from osm_common.dbbase import DbException
15 from osm_common.fsbase import FsException
16 from n2vc.vnf import N2VC
17
18 from copy import copy, deepcopy
19 from http import HTTPStatus
20 from time import time
21 from uuid import uuid4
22
23 __author__ = "Alfonso Tierno"
24
25
26 def get_iterable(in_dict, in_key):
27 """
28 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
29 :param in_dict: a dictionary
30 :param in_key: the key to look for at in_dict
31 :return: in_dict[in_var] or () if it is None or not present
32 """
33 if not in_dict.get(in_key):
34 return ()
35 return in_dict[in_key]
36
37
38 def populate_dict(target_dict, key_list, value):
39 """
40 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
41 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
42 :param target_dict: dictionary to be changed
43 :param key_list: list of keys to insert at target_dict
44 :param value:
45 :return: None
46 """
47 for key in key_list[0:-1]:
48 if key not in target_dict:
49 target_dict[key] = {}
50 target_dict = target_dict[key]
51 target_dict[key_list[-1]] = value
52
53
54 class NsLcm(LcmBase):
55 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
56 total_deploy_timeout = 2 * 3600 # global timeout for deployment
57
58 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
59 """
60 Init, Connect to database, filesystem storage, and messaging
61 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
62 :return: None
63 """
64 # logging
65 self.logger = logging.getLogger('lcm.ns')
66 self.loop = loop
67 self.lcm_tasks = lcm_tasks
68
69 super().__init__(db, msg, fs, self.logger)
70
71 self.ro_config = ro_config
72
73 self.n2vc = N2VC(
74 log=self.logger,
75 server=vca_config['host'],
76 port=vca_config['port'],
77 user=vca_config['user'],
78 secret=vca_config['secret'],
79 # TODO: This should point to the base folder where charms are stored,
80 # if there is a common one (like object storage). Otherwise, leave
81 # it unset and pass it via DeployCharms
82 # artifacts=vca_config[''],
83 artifacts=None,
84 )
85
86 def vnfd2RO(self, vnfd, new_id=None):
87 """
88 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
89 :param vnfd: input vnfd
90 :param new_id: overrides vnf id if provided
91 :return: copy of vnfd
92 """
93 ci_file = None
94 try:
95 vnfd_RO = deepcopy(vnfd)
96 vnfd_RO.pop("_id", None)
97 vnfd_RO.pop("_admin", None)
98 if new_id:
99 vnfd_RO["id"] = new_id
100 for vdu in vnfd_RO.get("vdu", ()):
101 if "cloud-init-file" in vdu:
102 base_folder = vnfd["_admin"]["storage"]
103 clout_init_file = "{}/{}/cloud_init/{}".format(
104 base_folder["folder"],
105 base_folder["pkg-dir"],
106 vdu["cloud-init-file"]
107 )
108 ci_file = self.fs.file_open(clout_init_file, "r")
109 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
110 # convert to base 64 or similar
111 clout_init_content = ci_file.read()
112 ci_file.close()
113 ci_file = None
114 vdu.pop("cloud-init-file", None)
115 vdu["cloud-init"] = clout_init_content
116 # remnove unused by RO configuration, monitoring, scaling
117 vnfd_RO.pop("vnf-configuration", None)
118 vnfd_RO.pop("monitoring-param", None)
119 vnfd_RO.pop("scaling-group-descriptor", None)
120 return vnfd_RO
121 except FsException as e:
122 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
123 finally:
124 if ci_file:
125 ci_file.close()
126
127 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
128 """
129 Callback both for charm status change and task completion
130 :param model_name: Charm model name
131 :param application_name: Charm application name
132 :param status: Can be
133 - blocked: The unit needs manual intervention
134 - maintenance: The unit is actively deploying/configuring
135 - waiting: The unit is waiting for another charm to be ready
136 - active: The unit is deployed, configured, and ready
137 - error: The charm has failed and needs attention.
138 - terminated: The charm has been destroyed
139 - removing,
140 - removed
141 :param message: detailed message error
142 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
143 nsr_id:
144 nslcmop_id:
145 lcmOperationType: currently "instantiate"
146 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
147 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
148 n2vc_event: event used to notify instantiation task that some change has been produced
149 :param task: None for charm status change, or task for completion task callback
150 :return:
151 """
152 try:
153 nsr_id = n2vc_info["nsr_id"]
154 deployed = n2vc_info["deployed"]
155 db_nsr_update = n2vc_info["db_update"]
156 nslcmop_id = n2vc_info["nslcmop_id"]
157 ns_operation = n2vc_info["lcmOperationType"]
158 n2vc_event = n2vc_info["n2vc_event"]
159 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
160 application_name)
161 vca_deployed = deployed.get(application_name)
162 if not vca_deployed:
163 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
164 return
165
166 if task:
167 if task.cancelled():
168 self.logger.debug(logging_text + " task Cancelled")
169 vca_deployed['operational-status'] = "error"
170 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error"
171 vca_deployed['detailed-status'] = "Task Cancelled"
172 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = "Task Cancelled"
173
174 elif task.done():
175 exc = task.exception()
176 if exc:
177 self.logger.error(logging_text + " task Exception={}".format(exc))
178 vca_deployed['operational-status'] = "error"
179 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error"
180 vca_deployed['detailed-status'] = str(exc)
181 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(exc)
182 else:
183 self.logger.debug(logging_text + " task Done")
184 # task is Done, but callback is still ongoing. So ignore
185 return
186 elif status:
187 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
188 if vca_deployed['operational-status'] == status:
189 return # same status, ignore
190 vca_deployed['operational-status'] = status
191 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = status
192 vca_deployed['detailed-status'] = str(message)
193 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(message)
194 else:
195 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
196 return
197 # wake up instantiate task
198 n2vc_event.set()
199 except Exception as e:
200 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
201
202 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
203 """
204 Creates a RO ns descriptor from OSM ns_instantiate params
205 :param ns_params: OSM instantiate params
206 :return: The RO ns descriptor
207 """
208 vim_2_RO = {}
209 # TODO feature 1417: Check that no instantiation is set over PDU
210 # check if PDU forces a concrete vim-network-id and add it
211 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
212
213 def vim_account_2_RO(vim_account):
214 if vim_account in vim_2_RO:
215 return vim_2_RO[vim_account]
216
217 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
218 if db_vim["_admin"]["operationalState"] != "ENABLED":
219 raise LcmException("VIM={} is not available. operationalState={}".format(
220 vim_account, db_vim["_admin"]["operationalState"]))
221 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
222 vim_2_RO[vim_account] = RO_vim_id
223 return RO_vim_id
224
225 def ip_profile_2_RO(ip_profile):
226 RO_ip_profile = deepcopy((ip_profile))
227 if "dns-server" in RO_ip_profile:
228 if isinstance(RO_ip_profile["dns-server"], list):
229 RO_ip_profile["dns-address"] = []
230 for ds in RO_ip_profile.pop("dns-server"):
231 RO_ip_profile["dns-address"].append(ds['address'])
232 else:
233 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
234 if RO_ip_profile.get("ip-version") == "ipv4":
235 RO_ip_profile["ip-version"] = "IPv4"
236 if RO_ip_profile.get("ip-version") == "ipv6":
237 RO_ip_profile["ip-version"] = "IPv6"
238 if "dhcp-params" in RO_ip_profile:
239 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
240 return RO_ip_profile
241
242 if not ns_params:
243 return None
244 RO_ns_params = {
245 # "name": ns_params["nsName"],
246 # "description": ns_params.get("nsDescription"),
247 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
248 # "scenario": ns_params["nsdId"],
249 }
250 if n2vc_key_list:
251 for vnfd_ref, vnfd in vnfd_dict.items():
252 vdu_needed_access = []
253 mgmt_cp = None
254 if vnfd.get("vnf-configuration"):
255 if vnfd.get("mgmt-interface"):
256 if vnfd["mgmt-interface"].get("vdu-id"):
257 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
258 elif vnfd["mgmt-interface"].get("cp"):
259 mgmt_cp = vnfd["mgmt-interface"]["cp"]
260
261 for vdu in vnfd.get("vdu", ()):
262 if vdu.get("vdu-configuration"):
263 vdu_needed_access.append(vdu["id"])
264 elif mgmt_cp:
265 for vdu_interface in vdu.get("interface"):
266 if vdu_interface.get("external-connection-point-ref") and \
267 vdu_interface["external-connection-point-ref"] == mgmt_cp:
268 vdu_needed_access.append(vdu["id"])
269 mgmt_cp = None
270 break
271
272 if vdu_needed_access:
273 for vnf_member in nsd.get("constituent-vnfd"):
274 if vnf_member["vnfd-id-ref"] != vnfd_ref:
275 continue
276 for vdu in vdu_needed_access:
277 populate_dict(RO_ns_params,
278 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
279 n2vc_key_list)
280
281 if ns_params.get("vduImage"):
282 RO_ns_params["vduImage"] = ns_params["vduImage"]
283
284 if ns_params.get("ssh_keys"):
285 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
286 for vnf_params in get_iterable(ns_params, "vnf"):
287 for constituent_vnfd in nsd["constituent-vnfd"]:
288 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
289 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
290 break
291 else:
292 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
293 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
294 if vnf_params.get("vimAccountId"):
295 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
296 vim_account_2_RO(vnf_params["vimAccountId"]))
297
298 for vdu_params in get_iterable(vnf_params, "vdu"):
299 # TODO feature 1417: check that this VDU exist and it is not a PDU
300 if vdu_params.get("volume"):
301 for volume_params in vdu_params["volume"]:
302 if volume_params.get("vim-volume-id"):
303 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
304 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
305 volume_params["vim-volume-id"])
306 if vdu_params.get("interface"):
307 for interface_params in vdu_params["interface"]:
308 if interface_params.get("ip-address"):
309 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
310 vdu_params["id"], "interfaces", interface_params["name"],
311 "ip_address"),
312 interface_params["ip-address"])
313 if interface_params.get("mac-address"):
314 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
315 vdu_params["id"], "interfaces", interface_params["name"],
316 "mac_address"),
317 interface_params["mac-address"])
318 if interface_params.get("floating-ip-required"):
319 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
320 vdu_params["id"], "interfaces", interface_params["name"],
321 "floating-ip"),
322 interface_params["floating-ip-required"])
323
324 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
325 if internal_vld_params.get("vim-network-name"):
326 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
327 internal_vld_params["name"], "vim-network-name"),
328 internal_vld_params["vim-network-name"])
329 if internal_vld_params.get("ip-profile"):
330 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
331 internal_vld_params["name"], "ip-profile"),
332 ip_profile_2_RO(internal_vld_params["ip-profile"]))
333
334 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
335 # look for interface
336 iface_found = False
337 for vdu_descriptor in vnf_descriptor["vdu"]:
338 for vdu_interface in vdu_descriptor["interface"]:
339 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
340 if icp_params.get("ip-address"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
342 vdu_descriptor["id"], "interfaces",
343 vdu_interface["name"], "ip_address"),
344 icp_params["ip-address"])
345
346 if icp_params.get("mac-address"):
347 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
348 vdu_descriptor["id"], "interfaces",
349 vdu_interface["name"], "mac_address"),
350 icp_params["mac-address"])
351 iface_found = True
352 break
353 if iface_found:
354 break
355 else:
356 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
357 "internal-vld:id-ref={} is not present at vnfd:internal-"
358 "connection-point".format(vnf_params["member-vnf-index"],
359 icp_params["id-ref"]))
360
361 for vld_params in get_iterable(ns_params, "vld"):
362 if "ip-profile" in vld_params:
363 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
364 ip_profile_2_RO(vld_params["ip-profile"]))
365 if vld_params.get("vim-network-name"):
366 RO_vld_sites = []
367 if isinstance(vld_params["vim-network-name"], dict):
368 for vim_account, vim_net in vld_params["vim-network-name"].items():
369 RO_vld_sites.append({
370 "netmap-use": vim_net,
371 "datacenter": vim_account_2_RO(vim_account)
372 })
373 else: # isinstance str
374 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
375 if RO_vld_sites:
376 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
377 if "vnfd-connection-point-ref" in vld_params:
378 for cp_params in vld_params["vnfd-connection-point-ref"]:
379 # look for interface
380 for constituent_vnfd in nsd["constituent-vnfd"]:
381 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
382 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
383 break
384 else:
385 raise LcmException(
386 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
387 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
388 match_cp = False
389 for vdu_descriptor in vnf_descriptor["vdu"]:
390 for interface_descriptor in vdu_descriptor["interface"]:
391 if interface_descriptor.get("external-connection-point-ref") == \
392 cp_params["vnfd-connection-point-ref"]:
393 match_cp = True
394 break
395 if match_cp:
396 break
397 else:
398 raise LcmException(
399 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
400 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
401 cp_params["member-vnf-index-ref"],
402 cp_params["vnfd-connection-point-ref"],
403 vnf_descriptor["id"]))
404 if cp_params.get("ip-address"):
405 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
406 vdu_descriptor["id"], "interfaces",
407 interface_descriptor["name"], "ip_address"),
408 cp_params["ip-address"])
409 if cp_params.get("mac-address"):
410 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
411 vdu_descriptor["id"], "interfaces",
412 interface_descriptor["name"], "mac_address"),
413 cp_params["mac-address"])
414 return RO_ns_params
415
416 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
417 # make a copy to do not change
418 vdu_create = copy(vdu_create)
419 vdu_delete = copy(vdu_delete)
420
421 vdurs = db_vnfr.get("vdur")
422 if vdurs is None:
423 vdurs = []
424 vdu_index = len(vdurs)
425 while vdu_index:
426 vdu_index -= 1
427 vdur = vdurs[vdu_index]
428 if vdur.get("pdu-type"):
429 continue
430 vdu_id_ref = vdur["vdu-id-ref"]
431 if vdu_create and vdu_create.get(vdu_id_ref):
432 for index in range(0, vdu_create[vdu_id_ref]):
433 vdur = deepcopy(vdur)
434 vdur["_id"] = str(uuid4())
435 vdur["count-index"] += 1
436 vdurs.insert(vdu_index+1+index, vdur)
437 del vdu_create[vdu_id_ref]
438 if vdu_delete and vdu_delete.get(vdu_id_ref):
439 del vdurs[vdu_index]
440 vdu_delete[vdu_id_ref] -= 1
441 if not vdu_delete[vdu_id_ref]:
442 del vdu_delete[vdu_id_ref]
443 # check all operations are done
444 if vdu_create or vdu_delete:
445 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
446 vdu_create))
447 if vdu_delete:
448 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
449 vdu_delete))
450
451 vnfr_update = {"vdur": vdurs}
452 db_vnfr["vdur"] = vdurs
453 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
454
455 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
456 """
457 Updates database nsr with the RO info for the created vld
458 :param ns_update_nsr: dictionary to be filled with the updated info
459 :param db_nsr: content of db_nsr. This is also modified
460 :param nsr_desc_RO: nsr descriptor from RO
461 :return: Nothing, LcmException is raised on errors
462 """
463
464 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
465 for net_RO in get_iterable(nsr_desc_RO, "nets"):
466 if vld["id"] != net_RO.get("ns_net_osm_id"):
467 continue
468 vld["vim-id"] = net_RO.get("vim_net_id")
469 vld["name"] = net_RO.get("vim_name")
470 vld["status"] = net_RO.get("status")
471 vld["status-detailed"] = net_RO.get("error_msg")
472 ns_update_nsr["vld.{}".format(vld_index)] = vld
473 break
474 else:
475 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
476
477 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
478 """
479 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
480 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
481 :param nsr_desc_RO: nsr descriptor from RO
482 :return: Nothing, LcmException is raised on errors
483 """
484 for vnf_index, db_vnfr in db_vnfrs.items():
485 for vnf_RO in nsr_desc_RO["vnfs"]:
486 if vnf_RO["member_vnf_index"] != vnf_index:
487 continue
488 vnfr_update = {}
489 if vnf_RO.get("ip_address"):
490 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
491 elif not db_vnfr.get("ip-address"):
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
493
494 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
495 vdur_RO_count_index = 0
496 if vdur.get("pdu-type"):
497 continue
498 for vdur_RO in get_iterable(vnf_RO, "vms"):
499 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
500 continue
501 if vdur["count-index"] != vdur_RO_count_index:
502 vdur_RO_count_index += 1
503 continue
504 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
505 vdur["ip-address"] = vdur_RO.get("ip_address")
506 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
507 vdur["name"] = vdur_RO.get("vim_name")
508 vdur["status"] = vdur_RO.get("status")
509 vdur["status-detailed"] = vdur_RO.get("error_msg")
510 for ifacer in get_iterable(vdur, "interfaces"):
511 for interface_RO in get_iterable(vdur_RO, "interfaces"):
512 if ifacer["name"] == interface_RO.get("internal_name"):
513 ifacer["ip-address"] = interface_RO.get("ip_address")
514 ifacer["mac-address"] = interface_RO.get("mac_address")
515 break
516 else:
517 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
518 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
519 vnfr_update["vdur.{}".format(vdu_index)] = vdur
520 break
521 else:
522 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
523 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
524
525 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
526 for net_RO in get_iterable(nsr_desc_RO, "nets"):
527 if vld["id"] != net_RO.get("vnf_net_osm_id"):
528 continue
529 vld["vim-id"] = net_RO.get("vim_net_id")
530 vld["name"] = net_RO.get("vim_name")
531 vld["status"] = net_RO.get("status")
532 vld["status-detailed"] = net_RO.get("error_msg")
533 vnfr_update["vld.{}".format(vld_index)] = vld
534 break
535 else:
536 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
537 vnf_index, vld["id"]))
538
539 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
540 break
541
542 else:
543 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
544
545 async def instantiate(self, nsr_id, nslcmop_id):
546 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
547 self.logger.debug(logging_text + "Enter")
548 # get all needed from database
549 start_deploy = time()
550 db_nsr = None
551 db_nslcmop = None
552 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
553 db_nslcmop_update = {}
554 nslcmop_operation_state = None
555 db_vnfrs = {}
556 RO_descriptor_number = 0 # number of descriptors created at RO
557 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
558 n2vc_info = {}
559 exc = None
560 try:
561 step = "Getting nslcmop={} from db".format(nslcmop_id)
562 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
563 step = "Getting nsr={} from db".format(nsr_id)
564 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
565 ns_params = db_nslcmop.get("operationParams")
566 nsd = db_nsr["nsd"]
567 nsr_name = db_nsr["name"] # TODO short-name??
568
569 # look if previous tasks in process
570 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
571 if task_dependency:
572 step = db_nslcmop_update["detailed-status"] = \
573 "Waiting for related tasks to be completed: {}".format(task_name)
574 self.logger.debug(logging_text + step)
575 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
576 _, pending = await asyncio.wait(task_dependency, timeout=3600)
577 if pending:
578 raise LcmException("Timeout waiting related tasks to be completed")
579
580 step = "Getting vnfrs from db"
581 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
582 db_vnfds_ref = {}
583 db_vnfds = {}
584 for vnfr in db_vnfrs_list:
585 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
586 vnfd_id = vnfr["vnfd-id"]
587 vnfd_ref = vnfr["vnfd-ref"]
588 if vnfd_id not in db_vnfds:
589 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
590 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
591 db_vnfds_ref[vnfd_ref] = vnfd
592 db_vnfds[vnfd_id] = vnfd
593
594 nsr_lcm = db_nsr["_admin"].get("deployed")
595 if not nsr_lcm:
596 nsr_lcm = db_nsr["_admin"]["deployed"] = {
597 "id": nsr_id,
598 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
599 # "nsr_ip": {},
600 "VCA": {},
601 }
602 db_nsr_update["detailed-status"] = "creating"
603 db_nsr_update["operational-status"] = "init"
604
605 RO = ROclient.ROClient(self.loop, **self.ro_config)
606
607 # get vnfds, instantiate at RO
608 for vnfd_id, vnfd in db_vnfds.items():
609 vnfd_ref = vnfd["id"]
610 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
611 # self.logger.debug(logging_text + step)
612 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
613 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
614 RO_descriptor_number += 1
615
616 # look if present
617 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
618 if vnfd_list:
619 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
620 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
621 vnfd_ref, vnfd_list[0]["uuid"]))
622 else:
623 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
624 desc = await RO.create("vnfd", descriptor=vnfd_RO)
625 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
626 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
627 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
628 vnfd_ref, desc["uuid"]))
629 self.update_db_2("nsrs", nsr_id, db_nsr_update)
630
631 # create nsd at RO
632 nsd_ref = nsd["id"]
633 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
634 # self.logger.debug(logging_text + step)
635
636 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
637 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
638 RO_descriptor_number += 1
639 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
640 if nsd_list:
641 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
642 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
643 nsd_ref, RO_nsd_uuid))
644 else:
645 nsd_RO = deepcopy(nsd)
646 nsd_RO["id"] = RO_osm_nsd_id
647 nsd_RO.pop("_id", None)
648 nsd_RO.pop("_admin", None)
649 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
650 vnfd_id = c_vnf["vnfd-id-ref"]
651 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
652 desc = await RO.create("nsd", descriptor=nsd_RO)
653 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
654 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
655 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
656 self.update_db_2("nsrs", nsr_id, db_nsr_update)
657
658 # Crate ns at RO
659 # if present use it unless in error status
660 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
661 if RO_nsr_id:
662 try:
663 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
664 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
665 desc = await RO.show("ns", RO_nsr_id)
666 except ROclient.ROClientException as e:
667 if e.http_code != HTTPStatus.NOT_FOUND:
668 raise
669 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
670 if RO_nsr_id:
671 ns_status, ns_status_info = RO.check_ns_status(desc)
672 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
673 if ns_status == "ERROR":
674 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
675 self.logger.debug(logging_text + step)
676 await RO.delete("ns", RO_nsr_id)
677 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
678 if not RO_nsr_id:
679 step = db_nsr_update["detailed-status"] = "Checking dependencies"
680 # self.logger.debug(logging_text + step)
681
682 # check if VIM is creating and wait look if previous tasks in process
683 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
684 if task_dependency:
685 step = "Waiting for related tasks to be completed: {}".format(task_name)
686 self.logger.debug(logging_text + step)
687 await asyncio.wait(task_dependency, timeout=3600)
688 if ns_params.get("vnf"):
689 for vnf in ns_params["vnf"]:
690 if "vimAccountId" in vnf:
691 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
692 vnf["vimAccountId"])
693 if task_dependency:
694 step = "Waiting for related tasks to be completed: {}".format(task_name)
695 self.logger.debug(logging_text + step)
696 await asyncio.wait(task_dependency, timeout=3600)
697
698 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
699
700 # feature 1429. Add n2vc public key to needed VMs
701 n2vc_key = await self.n2vc.GetPublicKey()
702 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
703
704 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
705 desc = await RO.create("ns", descriptor=RO_ns_params,
706 name=db_nsr["name"],
707 scenario=RO_nsd_uuid)
708 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
709 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
710 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
711 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
712 self.update_db_2("nsrs", nsr_id, db_nsr_update)
713
714 # wait until NS is ready
715 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
716 detailed_status_old = None
717 self.logger.debug(logging_text + step)
718
719 while time() <= start_deploy + self.total_deploy_timeout:
720 desc = await RO.show("ns", RO_nsr_id)
721 ns_status, ns_status_info = RO.check_ns_status(desc)
722 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
723 if ns_status == "ERROR":
724 raise ROclient.ROClientException(ns_status_info)
725 elif ns_status == "BUILD":
726 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
727 elif ns_status == "ACTIVE":
728 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
729 try:
730 # nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
731 self.ns_update_vnfr(db_vnfrs, desc)
732 break
733 except LcmExceptionNoMgmtIP:
734 pass
735 else:
736 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
737 if detailed_status != detailed_status_old:
738 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
739 self.update_db_2("nsrs", nsr_id, db_nsr_update)
740 await asyncio.sleep(5, loop=self.loop)
741 else: # total_deploy_timeout
742 raise ROclient.ROClientException("Timeout waiting ns to be ready")
743
744 step = "Updating NSR"
745 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
746
747 db_nsr["detailed-status"] = "Configuring vnfr"
748 self.update_db_2("nsrs", nsr_id, db_nsr_update)
749
750 # The parameters we'll need to deploy a charm
751 number_to_configure = 0
752
753 def deploy(vnf_index, vdu_id, mgmt_ip_address, n2vc_info, config_primitive=None):
754 """An inner function to deploy the charm from either vnf or vdu
755 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
756 """
757 if not mgmt_ip_address:
758 raise LcmException("vnfd/vdu has not management ip address to configure it")
759 # Login to the VCA.
760 # if number_to_configure == 0:
761 # self.logger.debug("Logging into N2VC...")
762 # task = asyncio.ensure_future(self.n2vc.login())
763 # yield from asyncio.wait_for(task, 30.0)
764 # self.logger.debug("Logged into N2VC!")
765
766 # # await self.n2vc.login()
767
768 # Note: The charm needs to exist on disk at the location
769 # specified by charm_path.
770 base_folder = vnfd["_admin"]["storage"]
771 storage_params = self.fs.get_params()
772 charm_path = "{}{}/{}/charms/{}".format(
773 storage_params["path"],
774 base_folder["folder"],
775 base_folder["pkg-dir"],
776 proxy_charm
777 )
778
779 # Setup the runtime parameters for this VNF
780 params = {'rw_mgmt_ip': mgmt_ip_address}
781 if config_primitive:
782 params["initial-config-primitive"] = config_primitive
783
784 # ns_name will be ignored in the current version of N2VC
785 # but will be implemented for the next point release.
786 model_name = 'default'
787 vdu_id_text = "vnfd"
788 if vdu_id:
789 vdu_id_text = vdu_id
790 application_name = self.n2vc.FormatApplicationName(
791 nsr_name,
792 vnf_index,
793 vdu_id_text
794 )
795 if not nsr_lcm.get("VCA"):
796 nsr_lcm["VCA"] = {}
797 nsr_lcm["VCA"][application_name] = db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = {
798 "member-vnf-index": vnf_index,
799 "vdu_id": vdu_id,
800 "model": model_name,
801 "application": application_name,
802 "operational-status": "init",
803 "detailed-status": "",
804 "vnfd_id": vnfd_id,
805 }
806 self.update_db_2("nsrs", nsr_id, db_nsr_update)
807
808 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
809 proxy_charm))
810 if not n2vc_info:
811 n2vc_info["nsr_id"] = nsr_id
812 n2vc_info["nslcmop_id"] = nslcmop_id
813 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
814 n2vc_info["lcmOperationType"] = "instantiate"
815 n2vc_info["deployed"] = nsr_lcm["VCA"]
816 n2vc_info["db_update"] = db_nsr_update
817 task = asyncio.ensure_future(
818 self.n2vc.DeployCharms(
819 model_name, # The network service name
820 application_name, # The application name
821 vnfd, # The vnf descriptor
822 charm_path, # Path to charm
823 params, # Runtime params, like mgmt ip
824 {}, # for native charms only
825 self.n2vc_callback, # Callback for status changes
826 n2vc_info, # Callback parameter
827 None, # Callback parameter (task)
828 )
829 )
830 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
831 n2vc_info))
832 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
833
834 step = "Looking for needed vnfd to configure"
835 self.logger.debug(logging_text + step)
836
837 for c_vnf in nsd.get("constituent-vnfd", ()):
838 vnfd_id = c_vnf["vnfd-id-ref"]
839 vnf_index = str(c_vnf["member-vnf-index"])
840 vnfd = db_vnfds_ref[vnfd_id]
841
842 # Check if this VNF has a charm configuration
843 vnf_config = vnfd.get("vnf-configuration")
844
845 if vnf_config and vnf_config.get("juju"):
846 proxy_charm = vnf_config["juju"]["charm"]
847 config_primitive = None
848
849 if proxy_charm:
850 if 'initial-config-primitive' in vnf_config:
851 config_primitive = vnf_config['initial-config-primitive']
852
853 # Login to the VCA. If there are multiple calls to login(),
854 # subsequent calls will be a nop and return immediately.
855 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
856 await self.n2vc.login()
857 deploy(vnf_index, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info, config_primitive)
858 number_to_configure += 1
859
860 # Deploy charms for each VDU that supports one.
861 vdu_index = 0
862 for vdu in vnfd.get('vdu', ()):
863 vdu_config = vdu.get('vdu-configuration')
864 proxy_charm = None
865 config_primitive = None
866
867 if vdu_config and vdu_config.get("juju"):
868 proxy_charm = vdu_config["juju"]["charm"]
869
870 if 'initial-config-primitive' in vdu_config:
871 config_primitive = vdu_config['initial-config-primitive']
872
873 if proxy_charm:
874 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
875 await self.n2vc.login()
876 deploy(vnf_index, vdu["id"], db_vnfrs[vnf_index]["vdur"][vdu_index]["ip-address"],
877 n2vc_info, config_primitive)
878 number_to_configure += 1
879 vdu_index += 1
880
881 db_nsr_update["operational-status"] = "running"
882 configuration_failed = False
883 if number_to_configure:
884 old_status = "configuring: init: {}".format(number_to_configure)
885 db_nsr_update["config-status"] = old_status
886 db_nsr_update["detailed-status"] = old_status
887 db_nslcmop_update["detailed-status"] = old_status
888
889 # wait until all are configured.
890 while time() <= start_deploy + self.total_deploy_timeout:
891 if db_nsr_update:
892 self.update_db_2("nsrs", nsr_id, db_nsr_update)
893 if db_nslcmop_update:
894 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
895 # TODO add a fake tast that set n2vc_event after some time
896 await n2vc_info["n2vc_event"].wait()
897 n2vc_info["n2vc_event"].clear()
898 all_active = True
899 status_map = {}
900 n2vc_error_text = [] # contain text error list. If empty no one is in error status
901 now = time()
902 for _, vca_info in nsr_lcm["VCA"].items():
903 vca_status = vca_info["operational-status"]
904 if vca_status not in status_map:
905 # Initialize it
906 status_map[vca_status] = 0
907 status_map[vca_status] += 1
908
909 if vca_status == "active":
910 vca_info.pop("time_first_error", None)
911 vca_info.pop("status_first_error", None)
912 continue
913
914 all_active = False
915 if vca_status in ("error", "blocked"):
916 vca_info["detailed-status-error"] = vca_info["detailed-status"]
917 # if not first time in this status error
918 if not vca_info.get("time_first_error"):
919 vca_info["time_first_error"] = now
920 continue
921 if vca_info.get("time_first_error") and \
922 now <= vca_info["time_first_error"] + self.timeout_vca_on_error:
923 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
924 .format(vca_info["member-vnf-index"],
925 vca_info["vdu_id"], vca_status,
926 vca_info["detailed-status-error"]))
927
928 if all_active:
929 break
930 elif n2vc_error_text:
931 db_nsr_update["config-status"] = "failed"
932 error_text = "fail configuring " + ";".join(n2vc_error_text)
933 db_nsr_update["detailed-status"] = error_text
934 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
935 db_nslcmop_update["detailed-status"] = error_text
936 db_nslcmop_update["statusEnteredTime"] = time()
937 configuration_failed = True
938 break
939 else:
940 cs = "configuring: "
941 separator = ""
942 for status, num in status_map.items():
943 cs += separator + "{}: {}".format(status, num)
944 separator = ", "
945 if old_status != cs:
946 db_nsr_update["config-status"] = cs
947 db_nsr_update["detailed-status"] = cs
948 db_nslcmop_update["detailed-status"] = cs
949 old_status = cs
950 else: # total_deploy_timeout
951 raise LcmException("Timeout waiting ns to be configured")
952
953 if not configuration_failed:
954 # all is done
955 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
956 db_nslcmop_update["statusEnteredTime"] = time()
957 db_nslcmop_update["detailed-status"] = "done"
958 db_nsr_update["config-status"] = "configured"
959 db_nsr_update["detailed-status"] = "done"
960
961 return
962
963 except (ROclient.ROClientException, DbException, LcmException) as e:
964 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
965 exc = e
966 except asyncio.CancelledError:
967 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
968 exc = "Operation was cancelled"
969 except Exception as e:
970 exc = traceback.format_exc()
971 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
972 exc_info=True)
973 finally:
974 if exc:
975 if db_nsr:
976 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
977 db_nsr_update["operational-status"] = "failed"
978 if db_nslcmop:
979 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
980 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
981 db_nslcmop_update["statusEnteredTime"] = time()
982 if db_nsr:
983 db_nsr_update["_admin.nslcmop"] = None
984 self.update_db_2("nsrs", nsr_id, db_nsr_update)
985 if db_nslcmop_update:
986 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
987 if nslcmop_operation_state:
988 try:
989 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
990 "operationState": nslcmop_operation_state})
991 except Exception as e:
992 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
993
994 self.logger.debug(logging_text + "Exit")
995 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
996
997 async def terminate(self, nsr_id, nslcmop_id):
998 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
999 self.logger.debug(logging_text + "Enter")
1000 db_nsr = None
1001 db_nslcmop = None
1002 exc = None
1003 failed_detail = [] # annotates all failed error messages
1004 vca_task_list = []
1005 vca_task_dict = {}
1006 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1007 db_nslcmop_update = {}
1008 nslcmop_operation_state = None
1009 try:
1010 step = "Getting nslcmop={} from db".format(nslcmop_id)
1011 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1012 step = "Getting nsr={} from db".format(nsr_id)
1013 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1014 # nsd = db_nsr["nsd"]
1015 nsr_lcm = deepcopy(db_nsr["_admin"].get("deployed"))
1016 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1017 return
1018 # TODO ALF remove
1019 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1020 # #TODO check if VIM is creating and wait
1021 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1022
1023 db_nsr_update["operational-status"] = "terminating"
1024 db_nsr_update["config-status"] = "terminating"
1025
1026 if nsr_lcm and nsr_lcm.get("VCA"):
1027 try:
1028 step = "Scheduling configuration charms removing"
1029 db_nsr_update["detailed-status"] = "Deleting charms"
1030 self.logger.debug(logging_text + step)
1031 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1032 for application_name, deploy_info in nsr_lcm["VCA"].items():
1033 if deploy_info: # TODO it would be desirable having a and deploy_info.get("deployed"):
1034 task = asyncio.ensure_future(
1035 self.n2vc.RemoveCharms(
1036 deploy_info['model'],
1037 application_name,
1038 # self.n2vc_callback,
1039 # db_nsr,
1040 # db_nslcmop,
1041 )
1042 )
1043 vca_task_list.append(task)
1044 vca_task_dict[application_name] = task
1045 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1046 # deploy_info['application'], None, db_nsr,
1047 # db_nslcmop, vnf_index))
1048 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "delete_charm:" + application_name, task)
1049 except Exception as e:
1050 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1051
1052 # remove from RO
1053 RO_fail = False
1054 RO = ROclient.ROClient(self.loop, **self.ro_config)
1055
1056 # Delete ns
1057 RO_nsr_id = RO_delete_action = None
1058 if nsr_lcm and nsr_lcm.get("RO"):
1059 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1060 RO_delete_action = nsr_lcm["RO"].get("nsr_delete_action_id")
1061 try:
1062 if RO_nsr_id:
1063 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1064 self.logger.debug(logging_text + step)
1065 desc = await RO.delete("ns", RO_nsr_id)
1066 RO_delete_action = desc["action_id"]
1067 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1068 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1069 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1070 if RO_delete_action:
1071 # wait until NS is deleted from VIM
1072 step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id)
1073 detailed_status_old = None
1074 self.logger.debug(logging_text + step)
1075
1076 delete_timeout = 20 * 60 # 20 minutes
1077 while delete_timeout > 0:
1078 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1079 extra_item_id=RO_delete_action)
1080 ns_status, ns_status_info = RO.check_action_status(desc)
1081 if ns_status == "ERROR":
1082 raise ROclient.ROClientException(ns_status_info)
1083 elif ns_status == "BUILD":
1084 detailed_status = step + "; {}".format(ns_status_info)
1085 elif ns_status == "ACTIVE":
1086 break
1087 else:
1088 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1089 await asyncio.sleep(5, loop=self.loop)
1090 delete_timeout -= 5
1091 if detailed_status != detailed_status_old:
1092 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1093 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1094 else: # delete_timeout <= 0:
1095 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1096
1097 except ROclient.ROClientException as e:
1098 if e.http_code == 404: # not found
1099 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1100 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1101 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1102 elif e.http_code == 409: # conflict
1103 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1104 self.logger.debug(logging_text + failed_detail[-1])
1105 RO_fail = True
1106 else:
1107 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1108 self.logger.error(logging_text + failed_detail[-1])
1109 RO_fail = True
1110
1111 # Delete nsd
1112 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"):
1113 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1114 try:
1115 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1116 "Deleting nsd at RO"
1117 await RO.delete("nsd", RO_nsd_id)
1118 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1119 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1120 except ROclient.ROClientException as e:
1121 if e.http_code == 404: # not found
1122 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1123 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1124 elif e.http_code == 409: # conflict
1125 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1126 self.logger.debug(logging_text + failed_detail[-1])
1127 RO_fail = True
1128 else:
1129 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1130 self.logger.error(logging_text + failed_detail[-1])
1131 RO_fail = True
1132
1133 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("vnfd_id"):
1134 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1135 if not RO_vnfd_id:
1136 continue
1137 try:
1138 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1139 "Deleting vnfd={} at RO".format(vnf_id)
1140 await RO.delete("vnfd", RO_vnfd_id)
1141 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1142 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1143 except ROclient.ROClientException as e:
1144 if e.http_code == 404: # not found
1145 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1146 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1147 elif e.http_code == 409: # conflict
1148 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1149 self.logger.debug(logging_text + failed_detail[-1])
1150 else:
1151 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1152 self.logger.error(logging_text + failed_detail[-1])
1153
1154 if vca_task_list:
1155 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1156 "Waiting for deletion of configuration charms"
1157 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1158 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1159 await asyncio.wait(vca_task_list, timeout=300)
1160 for application_name, task in vca_task_dict.items():
1161 if task.cancelled():
1162 failed_detail.append("VCA[{}] Deletion has been cancelled".format(application_name))
1163 elif task.done():
1164 exc = task.exception()
1165 if exc:
1166 failed_detail.append("VCA[{}] Deletion exception: {}".format(application_name, exc))
1167 else:
1168 db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = None
1169 else: # timeout
1170 # TODO Should it be cancelled?!!
1171 task.cancel()
1172 failed_detail.append("VCA[{}] Deletion timeout".format(application_name))
1173
1174 if failed_detail:
1175 self.logger.error(logging_text + " ;".join(failed_detail))
1176 db_nsr_update["operational-status"] = "failed"
1177 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1178 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1179 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1180 db_nslcmop_update["statusEnteredTime"] = time()
1181 elif db_nslcmop["operationParams"].get("autoremove"):
1182 self.db.del_one("nsrs", {"_id": nsr_id})
1183 db_nsr_update.clear()
1184 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1185 nslcmop_operation_state = "COMPLETED"
1186 db_nslcmop_update.clear()
1187 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1188 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1189 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1190 self.logger.debug(logging_text + "Delete from database")
1191 else:
1192 db_nsr_update["operational-status"] = "terminated"
1193 db_nsr_update["detailed-status"] = "Done"
1194 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1195 db_nslcmop_update["detailed-status"] = "Done"
1196 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1197 db_nslcmop_update["statusEnteredTime"] = time()
1198
1199 except (ROclient.ROClientException, DbException) as e:
1200 self.logger.error(logging_text + "Exit Exception {}".format(e))
1201 exc = e
1202 except asyncio.CancelledError:
1203 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1204 exc = "Operation was cancelled"
1205 except Exception as e:
1206 exc = traceback.format_exc()
1207 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1208 finally:
1209 if exc and db_nslcmop:
1210 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1211 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1212 db_nslcmop_update["statusEnteredTime"] = time()
1213 if db_nslcmop_update:
1214 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1215 if db_nsr:
1216 db_nsr_update["_admin.nslcmop"] = None
1217 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1218 if nslcmop_operation_state:
1219 try:
1220 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1221 "operationState": nslcmop_operation_state})
1222 except Exception as e:
1223 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1224 self.logger.debug(logging_text + "Exit")
1225 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1226
1227 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, primitive, primitive_params):
1228
1229 vdu_id_text = "vnfd"
1230 if vdu_id:
1231 vdu_id_text = vdu_id
1232 application_name = self.n2vc.FormatApplicationName(
1233 nsr_name,
1234 member_vnf_index,
1235 vdu_id_text
1236 )
1237 vca_deployed = db_deployed["VCA"].get(application_name)
1238 if not vca_deployed:
1239 raise LcmException("charm for member_vnf_index={} vdu_id={} is not deployed".format(member_vnf_index,
1240 vdu_id))
1241 model_name = vca_deployed.get("model")
1242 application_name = vca_deployed.get("application")
1243 if not model_name or not application_name:
1244 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index))
1245 if vca_deployed["operational-status"] != "active":
1246 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1247 member_vnf_index, vca_deployed["operational-status"]))
1248 callback = None # self.n2vc_callback
1249 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1250 await self.n2vc.login()
1251 task = asyncio.ensure_future(
1252 self.n2vc.ExecutePrimitive(
1253 model_name,
1254 application_name,
1255 primitive, callback,
1256 *callback_args,
1257 **primitive_params
1258 )
1259 )
1260 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1261 # db_nsr, db_nslcmop, member_vnf_index))
1262 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1263 # wait until completed with timeout
1264 await asyncio.wait((task,), timeout=600)
1265
1266 result = "FAILED" # by default
1267 result_detail = ""
1268 if task.cancelled():
1269 result_detail = "Task has been cancelled"
1270 elif task.done():
1271 exc = task.exception()
1272 if exc:
1273 result_detail = str(exc)
1274 else:
1275 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1276 result = "COMPLETED"
1277 result_detail = "Done"
1278 else: # timeout
1279 # TODO Should it be cancelled?!!
1280 task.cancel()
1281 result_detail = "timeout"
1282 return result, result_detail
1283
1284 async def action(self, nsr_id, nslcmop_id):
1285 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1286 self.logger.debug(logging_text + "Enter")
1287 # get all needed from database
1288 db_nsr = None
1289 db_nslcmop = None
1290 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1291 db_nslcmop_update = {}
1292 nslcmop_operation_state = None
1293 exc = None
1294 try:
1295 step = "Getting information from database"
1296 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1297 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1298 nsr_lcm = db_nsr["_admin"].get("deployed")
1299 nsr_name = db_nsr["name"]
1300 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1301 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1302
1303 # look if previous tasks in process
1304 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1305 if task_dependency:
1306 step = db_nslcmop_update["detailed-status"] = \
1307 "Waiting for related tasks to be completed: {}".format(task_name)
1308 self.logger.debug(logging_text + step)
1309 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1310 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1311 if pending:
1312 raise LcmException("Timeout waiting related tasks to be completed")
1313
1314 # TODO check if ns is in a proper status
1315 primitive = db_nslcmop["operationParams"]["primitive"]
1316 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1317 result, result_detail = await self._ns_execute_primitive(nsr_lcm, nsr_name, vnf_index, vdu_id, primitive,
1318 primitive_params)
1319 db_nslcmop_update["detailed-status"] = result_detail
1320 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1321 db_nslcmop_update["statusEnteredTime"] = time()
1322 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1323 return # database update is called inside finally
1324
1325 except (DbException, LcmException) as e:
1326 self.logger.error(logging_text + "Exit Exception {}".format(e))
1327 exc = e
1328 except asyncio.CancelledError:
1329 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1330 exc = "Operation was cancelled"
1331 except Exception as e:
1332 exc = traceback.format_exc()
1333 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1334 finally:
1335 if exc and db_nslcmop:
1336 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1337 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1338 db_nslcmop_update["statusEnteredTime"] = time()
1339 if db_nslcmop_update:
1340 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1341 if db_nsr:
1342 db_nsr_update["_admin.nslcmop"] = None
1343 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1344 self.logger.debug(logging_text + "Exit")
1345 if nslcmop_operation_state:
1346 try:
1347 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1348 "operationState": nslcmop_operation_state})
1349 except Exception as e:
1350 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1351 self.logger.debug(logging_text + "Exit")
1352 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1353
1354 async def scale(self, nsr_id, nslcmop_id):
1355 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1356 self.logger.debug(logging_text + "Enter")
1357 # get all needed from database
1358 db_nsr = None
1359 db_nslcmop = None
1360 db_nslcmop_update = {}
1361 nslcmop_operation_state = None
1362 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1363 exc = None
1364 # in case of error, indicates what part of scale was failed to put nsr at error status
1365 scale_process = None
1366 old_operational_status = ""
1367 old_config_status = ""
1368 vnfr_scaled = False
1369 try:
1370 step = "Getting nslcmop from database"
1371 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1372 step = "Getting nsr from database"
1373 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1374 old_operational_status = db_nsr["operational-status"]
1375 old_config_status = db_nsr["config-status"]
1376
1377 # look if previous tasks in process
1378 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1379 if task_dependency:
1380 step = db_nslcmop_update["detailed-status"] = \
1381 "Waiting for related tasks to be completed: {}".format(task_name)
1382 self.logger.debug(logging_text + step)
1383 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1384 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1385 if pending:
1386 raise LcmException("Timeout waiting related tasks to be completed")
1387
1388 step = "Parsing scaling parameters"
1389 db_nsr_update["operational-status"] = "scaling"
1390 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1391 nsr_lcm = db_nsr["_admin"].get("deployed")
1392 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
1393 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1394 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1395 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1396 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1397
1398 step = "Getting vnfr from database"
1399 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1400 step = "Getting vnfd from database"
1401 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1402 step = "Getting scaling-group-descriptor"
1403 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1404 if scaling_descriptor["name"] == scaling_group:
1405 break
1406 else:
1407 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1408 "at vnfd:scaling-group-descriptor".format(scaling_group))
1409 # cooldown_time = 0
1410 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1411 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1412 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1413 # break
1414
1415 # TODO check if ns is in a proper status
1416 step = "Sending scale order to RO"
1417 nb_scale_op = 0
1418 if not db_nsr["_admin"].get("scaling-group"):
1419 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1420 admin_scale_index = 0
1421 else:
1422 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1423 if admin_scale_info["name"] == scaling_group:
1424 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1425 break
1426 else: # not found, set index one plus last element and add new entry with the name
1427 admin_scale_index += 1
1428 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1429 RO_scaling_info = []
1430 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1431 if scaling_type == "SCALE_OUT":
1432 # count if max-instance-count is reached
1433 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1434 max_instance_count = int(scaling_descriptor["max-instance-count"])
1435 if nb_scale_op >= max_instance_count:
1436 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1437 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1438 nb_scale_op = nb_scale_op + 1
1439 vdu_scaling_info["scaling_direction"] = "OUT"
1440 vdu_scaling_info["vdu-create"] = {}
1441 for vdu_scale_info in scaling_descriptor["vdu"]:
1442 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1443 "type": "create", "count": vdu_scale_info.get("count", 1)})
1444 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1445 elif scaling_type == "SCALE_IN":
1446 # count if min-instance-count is reached
1447 min_instance_count = 0
1448 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1449 min_instance_count = int(scaling_descriptor["min-instance-count"])
1450 if nb_scale_op <= min_instance_count:
1451 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1452 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1453 nb_scale_op = nb_scale_op - 1
1454 vdu_scaling_info["scaling_direction"] = "IN"
1455 vdu_scaling_info["vdu-delete"] = {}
1456 for vdu_scale_info in scaling_descriptor["vdu"]:
1457 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1458 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1459 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1460
1461 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1462 vdu_create = vdu_scaling_info.get("vdu-create")
1463 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1464 if vdu_scaling_info["scaling_direction"] == "IN":
1465 for vdur in reversed(db_vnfr["vdur"]):
1466 if vdu_delete.get(vdur["vdu-id-ref"]):
1467 vdu_delete[vdur["vdu-id-ref"]] -= 1
1468 vdu_scaling_info["vdu"].append({
1469 "name": vdur["name"],
1470 "vdu_id": vdur["vdu-id-ref"],
1471 "interface": []
1472 })
1473 for interface in vdur["interfaces"]:
1474 vdu_scaling_info["vdu"][-1]["interface"].append({
1475 "name": interface["name"],
1476 "ip_address": interface["ip-address"],
1477 "mac_address": interface.get("mac-address"),
1478 })
1479 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1480
1481 # execute primitive service PRE-SCALING
1482 step = "Executing pre-scale vnf-config-primitive"
1483 if scaling_descriptor.get("scaling-config-action"):
1484 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1485 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1486 and scaling_type == "SCALE_IN":
1487 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1488 step = db_nslcmop_update["detailed-status"] = \
1489 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1490 # look for primitive
1491 primitive_params = {}
1492 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1493 if config_primitive["name"] == vnf_config_primitive:
1494 for parameter in config_primitive.get("parameter", ()):
1495 if 'default-value' in parameter and \
1496 parameter['default-value'] == "<VDU_SCALE_INFO>":
1497 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1498 default_flow_style=True,
1499 width=256)
1500 break
1501 else:
1502 raise LcmException(
1503 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1504 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1505 "primitive".format(scaling_group, config_primitive))
1506 scale_process = "VCA"
1507 db_nsr_update["config-status"] = "configuring pre-scaling"
1508 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1509 vnf_config_primitive, primitive_params)
1510 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1511 vnf_config_primitive, result, result_detail))
1512 if result == "FAILED":
1513 raise LcmException(result_detail)
1514 db_nsr_update["config-status"] = old_config_status
1515 scale_process = None
1516
1517 if RO_scaling_info:
1518 scale_process = "RO"
1519 RO = ROclient.ROClient(self.loop, **self.ro_config)
1520 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1521 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1522 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1523 # wait until ready
1524 RO_nslcmop_id = RO_desc["instance_action_id"]
1525 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1526
1527 RO_task_done = False
1528 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1529 detailed_status_old = None
1530 self.logger.debug(logging_text + step)
1531
1532 deployment_timeout = 1 * 3600 # One hour
1533 while deployment_timeout > 0:
1534 if not RO_task_done:
1535 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1536 extra_item_id=RO_nslcmop_id)
1537 ns_status, ns_status_info = RO.check_action_status(desc)
1538 if ns_status == "ERROR":
1539 raise ROclient.ROClientException(ns_status_info)
1540 elif ns_status == "BUILD":
1541 detailed_status = step + "; {}".format(ns_status_info)
1542 elif ns_status == "ACTIVE":
1543 RO_task_done = True
1544 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1545 self.logger.debug(logging_text + step)
1546 else:
1547 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1548 else:
1549 desc = await RO.show("ns", RO_nsr_id)
1550 ns_status, ns_status_info = RO.check_ns_status(desc)
1551 if ns_status == "ERROR":
1552 raise ROclient.ROClientException(ns_status_info)
1553 elif ns_status == "BUILD":
1554 detailed_status = step + "; {}".format(ns_status_info)
1555 elif ns_status == "ACTIVE":
1556 step = detailed_status = \
1557 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1558 if not vnfr_scaled:
1559 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1560 vnfr_scaled = True
1561 try:
1562 desc = await RO.show("ns", RO_nsr_id)
1563 # nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
1564 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1565 break
1566 except LcmExceptionNoMgmtIP:
1567 pass
1568 else:
1569 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1570 if detailed_status != detailed_status_old:
1571 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1572 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1573
1574 await asyncio.sleep(5, loop=self.loop)
1575 deployment_timeout -= 5
1576 if deployment_timeout <= 0:
1577 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1578
1579 # update VDU_SCALING_INFO with the obtained ip_addresses
1580 if vdu_scaling_info["scaling_direction"] == "OUT":
1581 for vdur in reversed(db_vnfr["vdur"]):
1582 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1583 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1584 vdu_scaling_info["vdu"].append({
1585 "name": vdur["name"],
1586 "vdu_id": vdur["vdu-id-ref"],
1587 "interface": []
1588 })
1589 for interface in vdur["interfaces"]:
1590 vdu_scaling_info["vdu"][-1]["interface"].append({
1591 "name": interface["name"],
1592 "ip_address": interface["ip-address"],
1593 "mac_address": interface.get("mac-address"),
1594 })
1595 del vdu_scaling_info["vdu-create"]
1596
1597 scale_process = None
1598 if db_nsr_update:
1599 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1600
1601 # execute primitive service POST-SCALING
1602 step = "Executing post-scale vnf-config-primitive"
1603 if scaling_descriptor.get("scaling-config-action"):
1604 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1605 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1606 and scaling_type == "SCALE_OUT":
1607 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1608 step = db_nslcmop_update["detailed-status"] = \
1609 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1610 # look for primitive
1611 primitive_params = {}
1612 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1613 if config_primitive["name"] == vnf_config_primitive:
1614 for parameter in config_primitive.get("parameter", ()):
1615 if 'default-value' in parameter and \
1616 parameter['default-value'] == "<VDU_SCALE_INFO>":
1617 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1618 default_flow_style=True,
1619 width=256)
1620 break
1621 else:
1622 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1623 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1624 "match any vnf-configuration:config-primitive".format(scaling_group,
1625 config_primitive))
1626 scale_process = "VCA"
1627 db_nsr_update["config-status"] = "configuring post-scaling"
1628
1629 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1630 vnf_config_primitive, primitive_params)
1631 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1632 vnf_config_primitive, result, result_detail))
1633 if result == "FAILED":
1634 raise LcmException(result_detail)
1635 db_nsr_update["config-status"] = old_config_status
1636 scale_process = None
1637
1638 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1639 db_nslcmop_update["statusEnteredTime"] = time()
1640 db_nslcmop_update["detailed-status"] = "done"
1641 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1642 db_nsr_update["operational-status"] = old_operational_status
1643 db_nsr_update["config-status"] = old_config_status
1644 return
1645 except (ROclient.ROClientException, DbException, LcmException) as e:
1646 self.logger.error(logging_text + "Exit Exception {}".format(e))
1647 exc = e
1648 except asyncio.CancelledError:
1649 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1650 exc = "Operation was cancelled"
1651 except Exception as e:
1652 exc = traceback.format_exc()
1653 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1654 finally:
1655 if exc:
1656 if db_nslcmop:
1657 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1658 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1659 db_nslcmop_update["statusEnteredTime"] = time()
1660 if db_nsr:
1661 db_nsr_update["operational-status"] = old_operational_status
1662 db_nsr_update["config-status"] = old_config_status
1663 db_nsr_update["detailed-status"] = ""
1664 db_nsr_update["_admin.nslcmop"] = None
1665 if scale_process:
1666 if "VCA" in scale_process:
1667 db_nsr_update["config-status"] = "failed"
1668 if "RO" in scale_process:
1669 db_nsr_update["operational-status"] = "failed"
1670 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1671 exc)
1672 if db_nslcmop_update:
1673 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1674 if db_nsr:
1675 db_nsr_update["_admin.nslcmop"] = None
1676 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1677 if nslcmop_operation_state:
1678 try:
1679 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1680 "operationState": nslcmop_operation_state})
1681 # if cooldown_time:
1682 # await asyncio.sleep(cooldown_time)
1683 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1684 except Exception as e:
1685 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1686 self.logger.debug(logging_text + "Exit")
1687 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")