VNFs with only VLDs compatibility
[osm/LCM.git] / osm_lcm / ns.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import logging
7 import logging.handlers
8 import functools
9 import traceback
10
11 import ROclient
12 from lcm_utils import LcmException, LcmBase
13
14 from osm_common.dbbase import DbException
15 from osm_common.fsbase import FsException
16 from n2vc.vnf import N2VC
17
18 from copy import copy, deepcopy
19 from http import HTTPStatus
20 from time import time
21 from uuid import uuid4
22
23 __author__ = "Alfonso Tierno"
24
25
26 def get_iterable(in_dict, in_key):
27 """
28 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
29 :param in_dict: a dictionary
30 :param in_key: the key to look for at in_dict
31 :return: in_dict[in_var] or () if it is None or not present
32 """
33 if not in_dict.get(in_key):
34 return ()
35 return in_dict[in_key]
36
37
38 def populate_dict(target_dict, key_list, value):
39 """
40 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
41 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
42 :param target_dict: dictionary to be changed
43 :param key_list: list of keys to insert at target_dict
44 :param value:
45 :return: None
46 """
47 for key in key_list[0:-1]:
48 if key not in target_dict:
49 target_dict[key] = {}
50 target_dict = target_dict[key]
51 target_dict[key_list[-1]] = value
52
53
54 class NsLcm(LcmBase):
55 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
56 total_deploy_timeout = 2 * 3600 # global timeout for deployment
57
58 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
59 """
60 Init, Connect to database, filesystem storage, and messaging
61 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
62 :return: None
63 """
64 # logging
65 self.logger = logging.getLogger('lcm.ns')
66 self.loop = loop
67 self.lcm_tasks = lcm_tasks
68
69 super().__init__(db, msg, fs, self.logger)
70
71 self.ro_config = ro_config
72
73 self.n2vc = N2VC(
74 log=self.logger,
75 server=vca_config['host'],
76 port=vca_config['port'],
77 user=vca_config['user'],
78 secret=vca_config['secret'],
79 # TODO: This should point to the base folder where charms are stored,
80 # if there is a common one (like object storage). Otherwise, leave
81 # it unset and pass it via DeployCharms
82 # artifacts=vca_config[''],
83 artifacts=None,
84 )
85
86 def _look_for_pdu(self, vdur, vnfr, vim_account): # TODO feature 1417: project_id
87 """
88 Look for a free PDU in the catalog matching vdur type and interfaces. Fills vdur with ip_address information
89 :param vdur: vnfr:vdur descriptor. It is modified with pdu interface info if pdu is found
90 :param member_vnf_index: used just for logging. Target vnfd of nsd
91 :param vim_account:
92 :return: vder_update: dictionary to update vnfr:vdur with pdu info. In addition it modified choosen pdu to set
93 at status IN_USE
94 """
95 pdu_type = vdur.get("pdu-type")
96 assert pdu_type
97 pdu_filter = {
98 "vim.vim_accounts": vim_account,
99 "type": pdu_type,
100 "_admin.operationalState": "ENABLED",
101 "_admin.usageSate": "NOT_IN_USE",
102 # TODO feature 1417: "shared": True,
103 # TODO feature 1417: "_admin.projects_read.cont": ["ANY", project_id],
104 }
105 available_pdus = self.db.get_list("pdus", pdu_filter)
106 for pdu in available_pdus:
107 # step 1 check if this pdu contains needed interfaces:
108 match_interfaces = True
109 for vdur_interface in vdur["interfaces"]:
110 for pdu_interface in pdu["interfaces"]:
111 if pdu_interface["name"] == vdur_interface["name"]:
112 # TODO feature 1417: match per mgmt type
113 break
114 else: # no interface found
115 match_interfaces = False
116 break
117 if not match_interfaces:
118 continue
119
120 # step 2. Update pdu
121 self.update_db_2("pdus", pdu["_id"], {"_admin.usageSate": "IN_USE",
122 "_admin.usage.vnfr_id": vnfr["_id"],
123 "_admin.usage.nsr_id": vnfr["nsr-id-ref"],
124 "_admin.usage.vdur": vdur["vdu-id-ref"],
125 })
126 # step 3. Fill vnfr info by filling vdur
127 vdur["pdu-id"] = pdu["_id"]
128 for vdur_interface in vdur["interfaces"]:
129 for pdu_interface in pdu["interfaces"]:
130 if pdu_interface["name"] == vdur_interface["name"]:
131 vdur_interface.update(pdu_interface)
132 break
133
134 return
135 raise LcmException("No PDU of type={} found for member_vng_index={} at vim_account={} matching interface "
136 "names".format(vdur["vdu-id-ref"], vnfr["member-vnf-index-ref"], pdu_type))
137
138 def vnfd2RO(self, vnfd, new_id=None):
139 """
140 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
141 :param vnfd: input vnfd
142 :param new_id: overrides vnf id if provided
143 :return: copy of vnfd
144 """
145 ci_file = None
146 try:
147 vnfd_RO = deepcopy(vnfd)
148 vnfd_RO.pop("_id", None)
149 vnfd_RO.pop("_admin", None)
150 if new_id:
151 vnfd_RO["id"] = new_id
152 for vdu in vnfd_RO.get("vdu", ()):
153 if "cloud-init-file" in vdu:
154 base_folder = vnfd["_admin"]["storage"]
155 clout_init_file = "{}/{}/cloud_init/{}".format(
156 base_folder["folder"],
157 base_folder["pkg-dir"],
158 vdu["cloud-init-file"]
159 )
160 ci_file = self.fs.file_open(clout_init_file, "r")
161 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
162 # convert to base 64 or similar
163 clout_init_content = ci_file.read()
164 ci_file.close()
165 ci_file = None
166 vdu.pop("cloud-init-file", None)
167 vdu["cloud-init"] = clout_init_content
168 # remnove unused by RO configuration, monitoring, scaling
169 vnfd_RO.pop("vnf-configuration", None)
170 vnfd_RO.pop("monitoring-param", None)
171 vnfd_RO.pop("scaling-group-descriptor", None)
172 return vnfd_RO
173 except FsException as e:
174 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
175 finally:
176 if ci_file:
177 ci_file.close()
178
179 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
180 """
181 Callback both for charm status change and task completion
182 :param model_name: Charm model name
183 :param application_name: Charm application name
184 :param status: Can be
185 - blocked: The unit needs manual intervention
186 - maintenance: The unit is actively deploying/configuring
187 - waiting: The unit is waiting for another charm to be ready
188 - active: The unit is deployed, configured, and ready
189 - error: The charm has failed and needs attention.
190 - terminated: The charm has been destroyed
191 - removing,
192 - removed
193 :param message: detailed message error
194 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
195 nsr_id:
196 nslcmop_id:
197 lcmOperationType: currently "instantiate"
198 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
199 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
200 n2vc_event: event used to notify instantiation task that some change has been produced
201 :param task: None for charm status change, or task for completion task callback
202 :return:
203 """
204 try:
205 nsr_id = n2vc_info["nsr_id"]
206 deployed = n2vc_info["deployed"]
207 db_nsr_update = n2vc_info["db_update"]
208 nslcmop_id = n2vc_info["nslcmop_id"]
209 ns_operation = n2vc_info["lcmOperationType"]
210 n2vc_event = n2vc_info["n2vc_event"]
211 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
212 application_name)
213 vca_deployed = deployed.get(application_name)
214 if not vca_deployed:
215 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
216 return
217
218 if task:
219 if task.cancelled():
220 self.logger.debug(logging_text + " task Cancelled")
221 vca_deployed['operational-status'] = "error"
222 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error"
223 vca_deployed['detailed-status'] = "Task Cancelled"
224 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = "Task Cancelled"
225
226 elif task.done():
227 exc = task.exception()
228 if exc:
229 self.logger.error(logging_text + " task Exception={}".format(exc))
230 vca_deployed['operational-status'] = "error"
231 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = "error"
232 vca_deployed['detailed-status'] = str(exc)
233 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(exc)
234 else:
235 self.logger.debug(logging_text + " task Done")
236 # task is Done, but callback is still ongoing. So ignore
237 return
238 elif status:
239 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
240 if vca_deployed['operational-status'] == status:
241 return # same status, ignore
242 vca_deployed['operational-status'] = status
243 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(application_name)] = status
244 vca_deployed['detailed-status'] = str(message)
245 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(application_name)] = str(message)
246 else:
247 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
248 return
249 # wake up instantiate task
250 n2vc_event.set()
251 except Exception as e:
252 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
253
254 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
255 """
256 Creates a RO ns descriptor from OSM ns_instantiate params
257 :param ns_params: OSM instantiate params
258 :return: The RO ns descriptor
259 """
260 vim_2_RO = {}
261 # TODO feature 1417: Check that no instantiation is set over PDU
262 # check if PDU forces a concrete vim-network-id and add it
263 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
264
265 def vim_account_2_RO(vim_account):
266 if vim_account in vim_2_RO:
267 return vim_2_RO[vim_account]
268
269 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
270 if db_vim["_admin"]["operationalState"] != "ENABLED":
271 raise LcmException("VIM={} is not available. operationalState={}".format(
272 vim_account, db_vim["_admin"]["operationalState"]))
273 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
274 vim_2_RO[vim_account] = RO_vim_id
275 return RO_vim_id
276
277 def ip_profile_2_RO(ip_profile):
278 RO_ip_profile = deepcopy((ip_profile))
279 if "dns-server" in RO_ip_profile:
280 if isinstance(RO_ip_profile["dns-server"], list):
281 RO_ip_profile["dns-address"] = []
282 for ds in RO_ip_profile.pop("dns-server"):
283 RO_ip_profile["dns-address"].append(ds['address'])
284 else:
285 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
286 if RO_ip_profile.get("ip-version") == "ipv4":
287 RO_ip_profile["ip-version"] = "IPv4"
288 if RO_ip_profile.get("ip-version") == "ipv6":
289 RO_ip_profile["ip-version"] = "IPv6"
290 if "dhcp-params" in RO_ip_profile:
291 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
292 return RO_ip_profile
293
294 if not ns_params:
295 return None
296 RO_ns_params = {
297 # "name": ns_params["nsName"],
298 # "description": ns_params.get("nsDescription"),
299 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
300 # "scenario": ns_params["nsdId"],
301 }
302 if n2vc_key_list:
303 for vnfd_ref, vnfd in vnfd_dict.items():
304 vdu_needed_access = []
305 mgmt_cp = None
306 if vnfd.get("vnf-configuration"):
307 if vnfd.get("mgmt-interface"):
308 if vnfd["mgmt-interface"].get("vdu-id"):
309 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
310 elif vnfd["mgmt-interface"].get("cp"):
311 mgmt_cp = vnfd["mgmt-interface"]["cp"]
312
313 for vdu in vnfd.get("vdu", ()):
314 if vdu.get("vdu-configuration"):
315 vdu_needed_access.append(vdu["id"])
316 elif mgmt_cp:
317 for vdu_interface in vdu.get("interface"):
318 if vdu_interface.get("external-connection-point-ref") and \
319 vdu_interface["external-connection-point-ref"] == mgmt_cp:
320 vdu_needed_access.append(vdu["id"])
321 mgmt_cp = None
322 break
323
324 if vdu_needed_access:
325 for vnf_member in nsd.get("constituent-vnfd"):
326 if vnf_member["vnfd-id-ref"] != vnfd_ref:
327 continue
328 for vdu in vdu_needed_access:
329 populate_dict(RO_ns_params,
330 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
331 n2vc_key_list)
332
333 if ns_params.get("vduImage"):
334 RO_ns_params["vduImage"] = ns_params["vduImage"]
335
336 if ns_params.get("ssh_keys"):
337 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
338 for vnf_params in get_iterable(ns_params, "vnf"):
339 for constituent_vnfd in nsd["constituent-vnfd"]:
340 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
341 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
342 break
343 else:
344 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
345 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
346 if vnf_params.get("vimAccountId"):
347 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
348 vim_account_2_RO(vnf_params["vimAccountId"]))
349
350 for vdu_params in get_iterable(vnf_params, "vdu"):
351 # TODO feature 1417: check that this VDU exist and it is not a PDU
352 if vdu_params.get("volume"):
353 for volume_params in vdu_params["volume"]:
354 if volume_params.get("vim-volume-id"):
355 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
356 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
357 volume_params["vim-volume-id"])
358 if vdu_params.get("interface"):
359 for interface_params in vdu_params["interface"]:
360 if interface_params.get("ip-address"):
361 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
362 vdu_params["id"], "interfaces", interface_params["name"],
363 "ip_address"),
364 interface_params["ip-address"])
365 if interface_params.get("mac-address"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
367 vdu_params["id"], "interfaces", interface_params["name"],
368 "mac_address"),
369 interface_params["mac-address"])
370 if interface_params.get("floating-ip-required"):
371 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
372 vdu_params["id"], "interfaces", interface_params["name"],
373 "floating-ip"),
374 interface_params["floating-ip-required"])
375
376 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
377 if internal_vld_params.get("vim-network-name"):
378 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
379 internal_vld_params["name"], "vim-network-name"),
380 internal_vld_params["vim-network-name"])
381 if internal_vld_params.get("ip-profile"):
382 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
383 internal_vld_params["name"], "ip-profile"),
384 ip_profile_2_RO(internal_vld_params["ip-profile"]))
385
386 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
387 # look for interface
388 iface_found = False
389 for vdu_descriptor in vnf_descriptor["vdu"]:
390 for vdu_interface in vdu_descriptor["interface"]:
391 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
392 if icp_params.get("ip-address"):
393 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
394 vdu_descriptor["id"], "interfaces",
395 vdu_interface["name"], "ip_address"),
396 icp_params["ip-address"])
397
398 if icp_params.get("mac-address"):
399 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
400 vdu_descriptor["id"], "interfaces",
401 vdu_interface["name"], "mac_address"),
402 icp_params["mac-address"])
403 iface_found = True
404 break
405 if iface_found:
406 break
407 else:
408 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
409 "internal-vld:id-ref={} is not present at vnfd:internal-"
410 "connection-point".format(vnf_params["member-vnf-index"],
411 icp_params["id-ref"]))
412
413 for vld_params in get_iterable(ns_params, "vld"):
414 if "ip-profile" in vld_params:
415 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
416 ip_profile_2_RO(vld_params["ip-profile"]))
417 if vld_params.get("vim-network-name"):
418 RO_vld_sites = []
419 if isinstance(vld_params["vim-network-name"], dict):
420 for vim_account, vim_net in vld_params["vim-network-name"].items():
421 RO_vld_sites.append({
422 "netmap-use": vim_net,
423 "datacenter": vim_account_2_RO(vim_account)
424 })
425 else: # isinstance str
426 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
427 if RO_vld_sites:
428 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
429 if "vnfd-connection-point-ref" in vld_params:
430 for cp_params in vld_params["vnfd-connection-point-ref"]:
431 # look for interface
432 for constituent_vnfd in nsd["constituent-vnfd"]:
433 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
434 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
435 break
436 else:
437 raise LcmException(
438 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
439 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
440 match_cp = False
441 for vdu_descriptor in vnf_descriptor["vdu"]:
442 for interface_descriptor in vdu_descriptor["interface"]:
443 if interface_descriptor.get("external-connection-point-ref") == \
444 cp_params["vnfd-connection-point-ref"]:
445 match_cp = True
446 break
447 if match_cp:
448 break
449 else:
450 raise LcmException(
451 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
452 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
453 cp_params["member-vnf-index-ref"],
454 cp_params["vnfd-connection-point-ref"],
455 vnf_descriptor["id"]))
456 if cp_params.get("ip-address"):
457 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
458 vdu_descriptor["id"], "interfaces",
459 interface_descriptor["name"], "ip_address"),
460 cp_params["ip-address"])
461 if cp_params.get("mac-address"):
462 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
463 vdu_descriptor["id"], "interfaces",
464 interface_descriptor["name"], "mac_address"),
465 cp_params["mac-address"])
466 return RO_ns_params
467
468 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
469 # make a copy to do not change
470 vdu_create = copy(vdu_create)
471 vdu_delete = copy(vdu_delete)
472
473 vdurs = db_vnfr.get("vdur")
474 if vdurs is None:
475 vdurs = []
476 vdu_index = len(vdurs)
477 while vdu_index:
478 vdu_index -= 1
479 vdur = vdurs[vdu_index]
480 if vdur.get("pdu-type"):
481 continue
482 vdu_id_ref = vdur["vdu-id-ref"]
483 if vdu_create and vdu_create.get(vdu_id_ref):
484 for index in range(0, vdu_create[vdu_id_ref]):
485 vdur = deepcopy(vdur)
486 vdur["_id"] = str(uuid4())
487 vdur["count-index"] += 1
488 vdurs.insert(vdu_index+1+index, vdur)
489 del vdu_create[vdu_id_ref]
490 if vdu_delete and vdu_delete.get(vdu_id_ref):
491 del vdurs[vdu_index]
492 vdu_delete[vdu_id_ref] -= 1
493 if not vdu_delete[vdu_id_ref]:
494 del vdu_delete[vdu_id_ref]
495 # check all operations are done
496 if vdu_create or vdu_delete:
497 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
498 vdu_create))
499 if vdu_delete:
500 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
501 vdu_delete))
502
503 vnfr_update = {"vdur": vdurs}
504 db_vnfr["vdur"] = vdurs
505 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
506
507 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
508 """
509 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
510 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
511 :param nsr_desc_RO: nsr descriptor from RO
512 :return: Nothing, LcmException is raised on errors
513 """
514 for vnf_index, db_vnfr in db_vnfrs.items():
515 for vnf_RO in nsr_desc_RO["vnfs"]:
516 if vnf_RO["member_vnf_index"] != vnf_index:
517 continue
518 vnfr_update = {}
519 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO.get("ip_address")
520
521 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
522 vdur_RO_count_index = 0
523 if vdur.get("pdu-type"):
524 continue
525 for vdur_RO in get_iterable(vnf_RO, "vms"):
526 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
527 continue
528 if vdur["count-index"] != vdur_RO_count_index:
529 vdur_RO_count_index += 1
530 continue
531 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
532 vdur["ip-address"] = vdur_RO.get("ip_address")
533 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
534 vdur["name"] = vdur_RO.get("vim_name")
535 vdur["status"] = vdur_RO.get("status")
536 vdur["status-detailed"] = vdur_RO.get("error_msg")
537 for ifacer in get_iterable(vdur, "interfaces"):
538 for interface_RO in get_iterable(vdur_RO, "interfaces"):
539 if ifacer["name"] == interface_RO.get("internal_name"):
540 ifacer["ip-address"] = interface_RO.get("ip_address")
541 ifacer["mac-address"] = interface_RO.get("mac_address")
542 break
543 else:
544 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
545 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
546 vnfr_update["vdur.{}".format(vdu_index)] = vdur
547 break
548 else:
549 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
550 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
551 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
552 break
553
554 else:
555 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
556
557 async def instantiate(self, nsr_id, nslcmop_id):
558 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
559 self.logger.debug(logging_text + "Enter")
560 # get all needed from database
561 start_deploy = time()
562 db_nsr = None
563 db_nslcmop = None
564 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
565 db_nslcmop_update = {}
566 nslcmop_operation_state = None
567 db_vnfrs = {}
568 RO_descriptor_number = 0 # number of descriptors created at RO
569 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
570 n2vc_info = {}
571 exc = None
572 try:
573 step = "Getting nslcmop={} from db".format(nslcmop_id)
574 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
575 step = "Getting nsr={} from db".format(nsr_id)
576 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
577 ns_params = db_nsr.get("instantiate_params")
578 nsd = db_nsr["nsd"]
579 nsr_name = db_nsr["name"] # TODO short-name??
580
581 # look if previous tasks in process
582 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
583 if task_dependency:
584 step = db_nslcmop_update["detailed-status"] = \
585 "Waiting for related tasks to be completed: {}".format(task_name)
586 self.logger.debug(logging_text + step)
587 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
588 _, pending = await asyncio.wait(task_dependency, timeout=3600)
589 if pending:
590 raise LcmException("Timeout waiting related tasks to be completed")
591
592 step = "Getting vnfrs from db"
593 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
594 db_vnfds_ref = {}
595 db_vnfds = {}
596 for vnfr in db_vnfrs_list:
597 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
598 vnfd_id = vnfr["vnfd-id"]
599 vnfd_ref = vnfr["vnfd-ref"]
600 if vnfd_id not in db_vnfds:
601 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
602 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
603 db_vnfds_ref[vnfd_ref] = vnfd
604 db_vnfds[vnfd_id] = vnfd
605
606 # update VNFR vimAccount and pdu information
607
608 vim_account = vnfr.get("vim-account-id")
609 vnfr_update = {}
610 if not vim_account:
611 step = "Updating VNFR vimAccount"
612 vim_account = db_nsr["instantiate_params"]["vimAccountId"]
613 # check instantiate parameters
614 if db_nsr["instantiate_params"].get("vnf"):
615 for vnf_params in db_nsr["instantiate_params"]["vnf"]:
616 if vnf_params.get("member-vnf-index") == vnfr["member-vnf-index-ref"]:
617 if vnf_params.get("vimAccountId"):
618 vim_account = vnf_params.get("vimAccountId")
619 break
620 vnfr_update["vim-account-id"] = vim_account
621 vnfr["vim-account-id"] = vim_account
622
623 # check PDUs and get PDU
624 for vdur_index, vdur in enumerate(get_iterable(vnfr, "vdur")):
625 pdu_type = vdur.get("pdu-type")
626 if not pdu_type:
627 continue
628 # look for free pdu
629 self._look_for_pdu(vdur, vnfr, vim_account)
630 # fill vnfr with pdu info
631 vnfr_update["vdur.{}".format(vdur_index)] = vdur
632 if vnfr_update:
633 self.update_db_2("vnfrs", vnfr["_id"], vnfr_update)
634
635 nsr_lcm = db_nsr["_admin"].get("deployed")
636 if not nsr_lcm:
637 nsr_lcm = db_nsr["_admin"]["deployed"] = {
638 "id": nsr_id,
639 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
640 "nsr_ip": {},
641 "VCA": {},
642 }
643 db_nsr_update["detailed-status"] = "creating"
644 db_nsr_update["operational-status"] = "init"
645
646 RO = ROclient.ROClient(self.loop, **self.ro_config)
647
648 # get vnfds, instantiate at RO
649 for vnfd_id, vnfd in db_vnfds.items():
650 vnfd_ref = vnfd["id"]
651 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
652 # self.logger.debug(logging_text + step)
653 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
654 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
655 RO_descriptor_number += 1
656
657 # look if present
658 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
659 if vnfd_list:
660 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
661 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
662 vnfd_ref, vnfd_list[0]["uuid"]))
663 else:
664 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
665 desc = await RO.create("vnfd", descriptor=vnfd_RO)
666 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
667 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
668 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
669 vnfd_ref, desc["uuid"]))
670 self.update_db_2("nsrs", nsr_id, db_nsr_update)
671
672 # create nsd at RO
673 nsd_ref = nsd["id"]
674 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
675 # self.logger.debug(logging_text + step)
676
677 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
678 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
679 RO_descriptor_number += 1
680 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
681 if nsd_list:
682 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
683 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
684 nsd_ref, RO_nsd_uuid))
685 else:
686 nsd_RO = deepcopy(nsd)
687 nsd_RO["id"] = RO_osm_nsd_id
688 nsd_RO.pop("_id", None)
689 nsd_RO.pop("_admin", None)
690 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
691 vnfd_id = c_vnf["vnfd-id-ref"]
692 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
693 desc = await RO.create("nsd", descriptor=nsd_RO)
694 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
695 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
696 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
697 self.update_db_2("nsrs", nsr_id, db_nsr_update)
698
699 # Crate ns at RO
700 # if present use it unless in error status
701 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
702 if RO_nsr_id:
703 try:
704 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
705 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
706 desc = await RO.show("ns", RO_nsr_id)
707 except ROclient.ROClientException as e:
708 if e.http_code != HTTPStatus.NOT_FOUND:
709 raise
710 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
711 if RO_nsr_id:
712 ns_status, ns_status_info = RO.check_ns_status(desc)
713 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
714 if ns_status == "ERROR":
715 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
716 self.logger.debug(logging_text + step)
717 await RO.delete("ns", RO_nsr_id)
718 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
719 if not RO_nsr_id:
720 step = db_nsr_update["detailed-status"] = "Checking dependencies"
721 # self.logger.debug(logging_text + step)
722
723 # check if VIM is creating and wait look if previous tasks in process
724 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
725 if task_dependency:
726 step = "Waiting for related tasks to be completed: {}".format(task_name)
727 self.logger.debug(logging_text + step)
728 await asyncio.wait(task_dependency, timeout=3600)
729 if ns_params.get("vnf"):
730 for vnf in ns_params["vnf"]:
731 if "vimAccountId" in vnf:
732 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
733 vnf["vimAccountId"])
734 if task_dependency:
735 step = "Waiting for related tasks to be completed: {}".format(task_name)
736 self.logger.debug(logging_text + step)
737 await asyncio.wait(task_dependency, timeout=3600)
738
739 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
740
741 # feature 1429. Add n2vc public key to needed VMs
742 n2vc_key = await self.n2vc.GetPublicKey()
743 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
744
745 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
746 desc = await RO.create("ns", descriptor=RO_ns_params,
747 name=db_nsr["name"],
748 scenario=RO_nsd_uuid)
749 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
750 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
751 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
752 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
753 self.update_db_2("nsrs", nsr_id, db_nsr_update)
754
755 # wait until NS is ready
756 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
757 detailed_status_old = None
758 self.logger.debug(logging_text + step)
759
760 while time() <= start_deploy + self.total_deploy_timeout:
761 desc = await RO.show("ns", RO_nsr_id)
762 ns_status, ns_status_info = RO.check_ns_status(desc)
763 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
764 if ns_status == "ERROR":
765 raise ROclient.ROClientException(ns_status_info)
766 elif ns_status == "BUILD":
767 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
768 elif ns_status == "ACTIVE":
769 step = detailed_status = "Waiting for management IP address reported by the VIM"
770 try:
771 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
772 break
773 except ROclient.ROClientException as e:
774 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
775 raise e
776 else:
777 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
778 if detailed_status != detailed_status_old:
779 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
780 self.update_db_2("nsrs", nsr_id, db_nsr_update)
781 await asyncio.sleep(5, loop=self.loop)
782 else: # total_deploy_timeout
783 raise ROclient.ROClientException("Timeout waiting ns to be ready")
784
785 step = "Updating VNFRs"
786 self.ns_update_vnfr(db_vnfrs, desc)
787
788 db_nsr["detailed-status"] = "Configuring vnfr"
789 self.update_db_2("nsrs", nsr_id, db_nsr_update)
790
791 # The parameters we'll need to deploy a charm
792 number_to_configure = 0
793
794 def deploy(vnf_index, vdu_id, mgmt_ip_address, n2vc_info, config_primitive=None):
795 """An inner function to deploy the charm from either vnf or vdu
796 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
797 """
798 if not mgmt_ip_address:
799 raise LcmException("vnfd/vdu has not management ip address to configure it")
800 # Login to the VCA.
801 # if number_to_configure == 0:
802 # self.logger.debug("Logging into N2VC...")
803 # task = asyncio.ensure_future(self.n2vc.login())
804 # yield from asyncio.wait_for(task, 30.0)
805 # self.logger.debug("Logged into N2VC!")
806
807 # # await self.n2vc.login()
808
809 # Note: The charm needs to exist on disk at the location
810 # specified by charm_path.
811 base_folder = vnfd["_admin"]["storage"]
812 storage_params = self.fs.get_params()
813 charm_path = "{}{}/{}/charms/{}".format(
814 storage_params["path"],
815 base_folder["folder"],
816 base_folder["pkg-dir"],
817 proxy_charm
818 )
819
820 # Setup the runtime parameters for this VNF
821 params = {'rw_mgmt_ip': mgmt_ip_address}
822 if config_primitive:
823 params["initial-config-primitive"] = config_primitive
824
825 # ns_name will be ignored in the current version of N2VC
826 # but will be implemented for the next point release.
827 model_name = 'default'
828 vdu_id_text = "vnfd"
829 if vdu_id:
830 vdu_id_text = vdu_id
831 application_name = self.n2vc.FormatApplicationName(
832 nsr_name,
833 vnf_index,
834 vdu_id_text
835 )
836 if not nsr_lcm.get("VCA"):
837 nsr_lcm["VCA"] = {}
838 nsr_lcm["VCA"][application_name] = db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = {
839 "member-vnf-index": vnf_index,
840 "vdu_id": vdu_id,
841 "model": model_name,
842 "application": application_name,
843 "operational-status": "init",
844 "detailed-status": "",
845 "vnfd_id": vnfd_id,
846 }
847 self.update_db_2("nsrs", nsr_id, db_nsr_update)
848
849 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
850 proxy_charm))
851 if not n2vc_info:
852 n2vc_info["nsr_id"] = nsr_id
853 n2vc_info["nslcmop_id"] = nslcmop_id
854 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
855 n2vc_info["lcmOperationType"] = "instantiate"
856 n2vc_info["deployed"] = nsr_lcm["VCA"]
857 n2vc_info["db_update"] = db_nsr_update
858 task = asyncio.ensure_future(
859 self.n2vc.DeployCharms(
860 model_name, # The network service name
861 application_name, # The application name
862 vnfd, # The vnf descriptor
863 charm_path, # Path to charm
864 params, # Runtime params, like mgmt ip
865 {}, # for native charms only
866 self.n2vc_callback, # Callback for status changes
867 n2vc_info, # Callback parameter
868 None, # Callback parameter (task)
869 )
870 )
871 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
872 n2vc_info))
873 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
874
875 step = "Looking for needed vnfd to configure"
876 self.logger.debug(logging_text + step)
877
878 for c_vnf in nsd.get("constituent-vnfd", ()):
879 vnfd_id = c_vnf["vnfd-id-ref"]
880 vnf_index = str(c_vnf["member-vnf-index"])
881 vnfd = db_vnfds_ref[vnfd_id]
882
883 # Check if this VNF has a charm configuration
884 vnf_config = vnfd.get("vnf-configuration")
885
886 if vnf_config and vnf_config.get("juju"):
887 proxy_charm = vnf_config["juju"]["charm"]
888 config_primitive = None
889
890 if proxy_charm:
891 if 'initial-config-primitive' in vnf_config:
892 config_primitive = vnf_config['initial-config-primitive']
893
894 # Login to the VCA. If there are multiple calls to login(),
895 # subsequent calls will be a nop and return immediately.
896 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
897 await self.n2vc.login()
898 deploy(vnf_index, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info, config_primitive)
899 number_to_configure += 1
900
901 # Deploy charms for each VDU that supports one.
902 vdu_index = 0
903 for vdu in vnfd.get('vdu', ()):
904 vdu_config = vdu.get('vdu-configuration')
905 proxy_charm = None
906 config_primitive = None
907
908 if vdu_config and vdu_config.get("juju"):
909 proxy_charm = vdu_config["juju"]["charm"]
910
911 if 'initial-config-primitive' in vdu_config:
912 config_primitive = vdu_config['initial-config-primitive']
913
914 if proxy_charm:
915 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
916 await self.n2vc.login()
917 deploy(vnf_index, vdu["id"], db_vnfrs[vnf_index]["vdur"][vdu_index]["ip-address"],
918 n2vc_info, config_primitive)
919 number_to_configure += 1
920 vdu_index += 1
921
922 db_nsr_update["operational-status"] = "running"
923 configuration_failed = False
924 if number_to_configure:
925 old_status = "configuring: init: {}".format(number_to_configure)
926 db_nsr_update["config-status"] = old_status
927 db_nsr_update["detailed-status"] = old_status
928 db_nslcmop_update["detailed-status"] = old_status
929
930 # wait until all are configured.
931 while time() <= start_deploy + self.total_deploy_timeout:
932 if db_nsr_update:
933 self.update_db_2("nsrs", nsr_id, db_nsr_update)
934 if db_nslcmop_update:
935 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
936 # TODO add a fake tast that set n2vc_event after some time
937 await n2vc_info["n2vc_event"].wait()
938 n2vc_info["n2vc_event"].clear()
939 all_active = True
940 status_map = {}
941 n2vc_error_text = [] # contain text error list. If empty no one is in error status
942 now = time()
943 for _, vca_info in nsr_lcm["VCA"].items():
944 vca_status = vca_info["operational-status"]
945 if vca_status not in status_map:
946 # Initialize it
947 status_map[vca_status] = 0
948 status_map[vca_status] += 1
949
950 if vca_status == "active":
951 vca_info.pop("time_first_error", None)
952 vca_info.pop("status_first_error", None)
953 continue
954
955 all_active = False
956 if vca_status in ("error", "blocked"):
957 vca_info["detailed-status-error"] = vca_info["detailed-status"]
958 # if not first time in this status error
959 if not vca_info.get("time_first_error"):
960 vca_info["time_first_error"] = now
961 continue
962 if vca_info.get("time_first_error") and \
963 now <= vca_info["time_first_error"] + self.timeout_vca_on_error:
964 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
965 .format(vca_info["member-vnf-index"],
966 vca_info["vdu_id"], vca_status,
967 vca_info["detailed-status-error"]))
968
969 if all_active:
970 break
971 elif n2vc_error_text:
972 db_nsr_update["config-status"] = "failed"
973 error_text = "fail configuring " + ";".join(n2vc_error_text)
974 db_nsr_update["detailed-status"] = error_text
975 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
976 db_nslcmop_update["detailed-status"] = error_text
977 db_nslcmop_update["statusEnteredTime"] = time()
978 configuration_failed = True
979 break
980 else:
981 cs = "configuring: "
982 separator = ""
983 for status, num in status_map.items():
984 cs += separator + "{}: {}".format(status, num)
985 separator = ", "
986 if old_status != cs:
987 db_nsr_update["config-status"] = cs
988 db_nsr_update["detailed-status"] = cs
989 db_nslcmop_update["detailed-status"] = cs
990 old_status = cs
991 else: # total_deploy_timeout
992 raise LcmException("Timeout waiting ns to be configured")
993
994 if not configuration_failed:
995 # all is done
996 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
997 db_nslcmop_update["statusEnteredTime"] = time()
998 db_nslcmop_update["detailed-status"] = "done"
999 db_nsr_update["config-status"] = "configured"
1000 db_nsr_update["detailed-status"] = "done"
1001
1002 return
1003
1004 except (ROclient.ROClientException, DbException, LcmException) as e:
1005 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1006 exc = e
1007 except asyncio.CancelledError:
1008 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1009 exc = "Operation was cancelled"
1010 except Exception as e:
1011 exc = traceback.format_exc()
1012 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1013 exc_info=True)
1014 finally:
1015 if exc:
1016 if db_nsr:
1017 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1018 db_nsr_update["operational-status"] = "failed"
1019 if db_nslcmop:
1020 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1021 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1022 db_nslcmop_update["statusEnteredTime"] = time()
1023 if db_nsr:
1024 db_nsr_update["_admin.nslcmop"] = None
1025 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1026 if db_nslcmop_update:
1027 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1028 if nslcmop_operation_state:
1029 try:
1030 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1031 "operationState": nslcmop_operation_state})
1032 except Exception as e:
1033 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1034
1035 self.logger.debug(logging_text + "Exit")
1036 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1037
1038 async def terminate(self, nsr_id, nslcmop_id):
1039 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1040 self.logger.debug(logging_text + "Enter")
1041 db_nsr = None
1042 db_nslcmop = None
1043 exc = None
1044 failed_detail = [] # annotates all failed error messages
1045 vca_task_list = []
1046 vca_task_dict = {}
1047 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1048 db_nslcmop_update = {}
1049 nslcmop_operation_state = None
1050 try:
1051 step = "Getting nslcmop={} from db".format(nslcmop_id)
1052 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1053 step = "Getting nsr={} from db".format(nsr_id)
1054 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1055 # nsd = db_nsr["nsd"]
1056 nsr_lcm = deepcopy(db_nsr["_admin"].get("deployed"))
1057 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1058 return
1059 # TODO ALF remove
1060 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1061 # #TODO check if VIM is creating and wait
1062 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1063
1064 db_nsr_update["operational-status"] = "terminating"
1065 db_nsr_update["config-status"] = "terminating"
1066
1067 if nsr_lcm and nsr_lcm.get("VCA"):
1068 try:
1069 step = "Scheduling configuration charms removing"
1070 db_nsr_update["detailed-status"] = "Deleting charms"
1071 self.logger.debug(logging_text + step)
1072 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1073 for application_name, deploy_info in nsr_lcm["VCA"].items():
1074 if deploy_info: # TODO it would be desirable having a and deploy_info.get("deployed"):
1075 task = asyncio.ensure_future(
1076 self.n2vc.RemoveCharms(
1077 deploy_info['model'],
1078 application_name,
1079 # self.n2vc_callback,
1080 # db_nsr,
1081 # db_nslcmop,
1082 )
1083 )
1084 vca_task_list.append(task)
1085 vca_task_dict[application_name] = task
1086 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1087 # deploy_info['application'], None, db_nsr,
1088 # db_nslcmop, vnf_index))
1089 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "delete_charm:" + application_name, task)
1090 except Exception as e:
1091 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1092
1093 # remove from RO
1094 RO_fail = False
1095 RO = ROclient.ROClient(self.loop, **self.ro_config)
1096
1097 # Delete ns
1098 RO_nsr_id = RO_delete_action = None
1099 if nsr_lcm and nsr_lcm.get("RO"):
1100 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
1101 RO_delete_action = nsr_lcm["RO"].get("nsr_delete_action_id")
1102 try:
1103 if RO_nsr_id:
1104 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1105 self.logger.debug(logging_text + step)
1106 desc = await RO.delete("ns", RO_nsr_id)
1107 RO_delete_action = desc["action_id"]
1108 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1109 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1110 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1111 if RO_delete_action:
1112 # wait until NS is deleted from VIM
1113 step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id)
1114 detailed_status_old = None
1115 self.logger.debug(logging_text + step)
1116
1117 delete_timeout = 20 * 60 # 20 minutes
1118 while delete_timeout > 0:
1119 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1120 extra_item_id=RO_delete_action)
1121 ns_status, ns_status_info = RO.check_action_status(desc)
1122 if ns_status == "ERROR":
1123 raise ROclient.ROClientException(ns_status_info)
1124 elif ns_status == "BUILD":
1125 detailed_status = step + "; {}".format(ns_status_info)
1126 elif ns_status == "ACTIVE":
1127 break
1128 else:
1129 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1130 await asyncio.sleep(5, loop=self.loop)
1131 delete_timeout -= 5
1132 if detailed_status != detailed_status_old:
1133 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1134 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1135 else: # delete_timeout <= 0:
1136 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1137
1138 except ROclient.ROClientException as e:
1139 if e.http_code == 404: # not found
1140 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1141 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1142 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1143 elif e.http_code == 409: # conflict
1144 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1145 self.logger.debug(logging_text + failed_detail[-1])
1146 RO_fail = True
1147 else:
1148 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1149 self.logger.error(logging_text + failed_detail[-1])
1150 RO_fail = True
1151
1152 # Delete nsd
1153 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("nsd_id"):
1154 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
1155 try:
1156 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1157 "Deleting nsd at RO"
1158 await RO.delete("nsd", RO_nsd_id)
1159 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1160 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1161 except ROclient.ROClientException as e:
1162 if e.http_code == 404: # not found
1163 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1164 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1165 elif e.http_code == 409: # conflict
1166 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1167 self.logger.debug(logging_text + failed_detail[-1])
1168 RO_fail = True
1169 else:
1170 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1171 self.logger.error(logging_text + failed_detail[-1])
1172 RO_fail = True
1173
1174 if not RO_fail and nsr_lcm and nsr_lcm.get("RO") and nsr_lcm["RO"].get("vnfd_id"):
1175 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
1176 if not RO_vnfd_id:
1177 continue
1178 try:
1179 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1180 "Deleting vnfd={} at RO".format(vnf_id)
1181 await RO.delete("vnfd", RO_vnfd_id)
1182 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1183 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1184 except ROclient.ROClientException as e:
1185 if e.http_code == 404: # not found
1186 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1187 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1188 elif e.http_code == 409: # conflict
1189 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1190 self.logger.debug(logging_text + failed_detail[-1])
1191 else:
1192 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1193 self.logger.error(logging_text + failed_detail[-1])
1194
1195 if vca_task_list:
1196 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1197 "Waiting for deletion of configuration charms"
1198 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1199 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1200 await asyncio.wait(vca_task_list, timeout=300)
1201 for application_name, task in vca_task_dict.items():
1202 if task.cancelled():
1203 failed_detail.append("VCA[{}] Deletion has been cancelled".format(application_name))
1204 elif task.done():
1205 exc = task.exception()
1206 if exc:
1207 failed_detail.append("VCA[{}] Deletion exception: {}".format(application_name, exc))
1208 else:
1209 db_nsr_update["_admin.deployed.VCA.{}".format(application_name)] = None
1210 else: # timeout
1211 # TODO Should it be cancelled?!!
1212 task.cancel()
1213 failed_detail.append("VCA[{}] Deletion timeout".format(application_name))
1214
1215 if failed_detail:
1216 self.logger.error(logging_text + " ;".join(failed_detail))
1217 db_nsr_update["operational-status"] = "failed"
1218 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1219 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1220 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1221 db_nslcmop_update["statusEnteredTime"] = time()
1222 elif db_nslcmop["operationParams"].get("autoremove"):
1223 self.db.del_one("nsrs", {"_id": nsr_id})
1224 db_nsr_update.clear()
1225 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1226 nslcmop_operation_state = "COMPLETED"
1227 db_nslcmop_update.clear()
1228 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1229 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1230 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1231 self.logger.debug(logging_text + "Delete from database")
1232 else:
1233 db_nsr_update["operational-status"] = "terminated"
1234 db_nsr_update["detailed-status"] = "Done"
1235 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1236 db_nslcmop_update["detailed-status"] = "Done"
1237 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1238 db_nslcmop_update["statusEnteredTime"] = time()
1239
1240 except (ROclient.ROClientException, DbException) as e:
1241 self.logger.error(logging_text + "Exit Exception {}".format(e))
1242 exc = e
1243 except asyncio.CancelledError:
1244 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1245 exc = "Operation was cancelled"
1246 except Exception as e:
1247 exc = traceback.format_exc()
1248 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1249 finally:
1250 if exc and db_nslcmop:
1251 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1252 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1253 db_nslcmop_update["statusEnteredTime"] = time()
1254 if db_nslcmop_update:
1255 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1256 if db_nsr:
1257 db_nsr_update["_admin.nslcmop"] = None
1258 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1259 if nslcmop_operation_state:
1260 try:
1261 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1262 "operationState": nslcmop_operation_state})
1263 except Exception as e:
1264 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1265 self.logger.debug(logging_text + "Exit")
1266 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1267
1268 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, primitive, primitive_params):
1269
1270 vdu_id_text = "vnfd"
1271 if vdu_id:
1272 vdu_id_text = vdu_id
1273 application_name = self.n2vc.FormatApplicationName(
1274 nsr_name,
1275 member_vnf_index,
1276 vdu_id_text
1277 )
1278 vca_deployed = db_deployed["VCA"].get(application_name)
1279 if not vca_deployed:
1280 raise LcmException("charm for member_vnf_index={} vdu_id={} is not deployed".format(member_vnf_index,
1281 vdu_id))
1282 model_name = vca_deployed.get("model")
1283 application_name = vca_deployed.get("application")
1284 if not model_name or not application_name:
1285 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(member_vnf_index))
1286 if vca_deployed["operational-status"] != "active":
1287 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1288 member_vnf_index, vca_deployed["operational-status"]))
1289 callback = None # self.n2vc_callback
1290 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1291 await self.n2vc.login()
1292 task = asyncio.ensure_future(
1293 self.n2vc.ExecutePrimitive(
1294 model_name,
1295 application_name,
1296 primitive, callback,
1297 *callback_args,
1298 **primitive_params
1299 )
1300 )
1301 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1302 # db_nsr, db_nslcmop, member_vnf_index))
1303 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1304 # wait until completed with timeout
1305 await asyncio.wait((task,), timeout=600)
1306
1307 result = "FAILED" # by default
1308 result_detail = ""
1309 if task.cancelled():
1310 result_detail = "Task has been cancelled"
1311 elif task.done():
1312 exc = task.exception()
1313 if exc:
1314 result_detail = str(exc)
1315 else:
1316 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1317 result = "COMPLETED"
1318 result_detail = "Done"
1319 else: # timeout
1320 # TODO Should it be cancelled?!!
1321 task.cancel()
1322 result_detail = "timeout"
1323 return result, result_detail
1324
1325 async def action(self, nsr_id, nslcmop_id):
1326 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1327 self.logger.debug(logging_text + "Enter")
1328 # get all needed from database
1329 db_nsr = None
1330 db_nslcmop = None
1331 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1332 db_nslcmop_update = {}
1333 nslcmop_operation_state = None
1334 exc = None
1335 try:
1336 step = "Getting information from database"
1337 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1338 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1339 nsr_lcm = db_nsr["_admin"].get("deployed")
1340 nsr_name = db_nsr["name"]
1341 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1342 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1343
1344 # look if previous tasks in process
1345 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1346 if task_dependency:
1347 step = db_nslcmop_update["detailed-status"] = \
1348 "Waiting for related tasks to be completed: {}".format(task_name)
1349 self.logger.debug(logging_text + step)
1350 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1351 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1352 if pending:
1353 raise LcmException("Timeout waiting related tasks to be completed")
1354
1355 # TODO check if ns is in a proper status
1356 primitive = db_nslcmop["operationParams"]["primitive"]
1357 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1358 result, result_detail = await self._ns_execute_primitive(nsr_lcm, nsr_name, vnf_index, vdu_id, primitive,
1359 primitive_params)
1360 db_nslcmop_update["detailed-status"] = result_detail
1361 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1362 db_nslcmop_update["statusEnteredTime"] = time()
1363 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1364 return # database update is called inside finally
1365
1366 except (DbException, LcmException) as e:
1367 self.logger.error(logging_text + "Exit Exception {}".format(e))
1368 exc = e
1369 except asyncio.CancelledError:
1370 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1371 exc = "Operation was cancelled"
1372 except Exception as e:
1373 exc = traceback.format_exc()
1374 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1375 finally:
1376 if exc and db_nslcmop:
1377 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1378 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1379 db_nslcmop_update["statusEnteredTime"] = time()
1380 if db_nslcmop_update:
1381 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1382 if db_nsr:
1383 db_nsr_update["_admin.nslcmop"] = None
1384 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1385 self.logger.debug(logging_text + "Exit")
1386 if nslcmop_operation_state:
1387 try:
1388 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1389 "operationState": nslcmop_operation_state})
1390 except Exception as e:
1391 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1392 self.logger.debug(logging_text + "Exit")
1393 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1394
1395 async def scale(self, nsr_id, nslcmop_id):
1396 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1397 self.logger.debug(logging_text + "Enter")
1398 # get all needed from database
1399 db_nsr = None
1400 db_nslcmop = None
1401 db_nslcmop_update = {}
1402 nslcmop_operation_state = None
1403 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1404 exc = None
1405 # in case of error, indicates what part of scale was failed to put nsr at error status
1406 scale_process = None
1407 old_operational_status = ""
1408 old_config_status = ""
1409 try:
1410 step = "Getting nslcmop from database"
1411 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1412 step = "Getting nsr from database"
1413 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1414 old_operational_status = db_nsr["operational-status"]
1415 old_config_status = db_nsr["config-status"]
1416
1417 # look if previous tasks in process
1418 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1419 if task_dependency:
1420 step = db_nslcmop_update["detailed-status"] = \
1421 "Waiting for related tasks to be completed: {}".format(task_name)
1422 self.logger.debug(logging_text + step)
1423 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1424 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1425 if pending:
1426 raise LcmException("Timeout waiting related tasks to be completed")
1427
1428 step = "Parsing scaling parameters"
1429 db_nsr_update["operational-status"] = "scaling"
1430 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1431 nsr_lcm = db_nsr["_admin"].get("deployed")
1432 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
1433 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1434 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1435 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1436 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1437
1438 step = "Getting vnfr from database"
1439 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1440 step = "Getting vnfd from database"
1441 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1442 step = "Getting scaling-group-descriptor"
1443 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1444 if scaling_descriptor["name"] == scaling_group:
1445 break
1446 else:
1447 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1448 "at vnfd:scaling-group-descriptor".format(scaling_group))
1449 # cooldown_time = 0
1450 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1451 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1452 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1453 # break
1454
1455 # TODO check if ns is in a proper status
1456 step = "Sending scale order to RO"
1457 nb_scale_op = 0
1458 if not db_nsr["_admin"].get("scaling-group"):
1459 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1460 admin_scale_index = 0
1461 else:
1462 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1463 if admin_scale_info["name"] == scaling_group:
1464 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1465 break
1466 else: # not found, set index one plus last element and add new entry with the name
1467 admin_scale_index += 1
1468 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1469 RO_scaling_info = []
1470 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1471 if scaling_type == "SCALE_OUT":
1472 # count if max-instance-count is reached
1473 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1474 max_instance_count = int(scaling_descriptor["max-instance-count"])
1475 if nb_scale_op >= max_instance_count:
1476 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1477 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1478 nb_scale_op = nb_scale_op + 1
1479 vdu_scaling_info["scaling_direction"] = "OUT"
1480 vdu_scaling_info["vdu-create"] = {}
1481 for vdu_scale_info in scaling_descriptor["vdu"]:
1482 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1483 "type": "create", "count": vdu_scale_info.get("count", 1)})
1484 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1485 elif scaling_type == "SCALE_IN":
1486 # count if min-instance-count is reached
1487 min_instance_count = 0
1488 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1489 min_instance_count = int(scaling_descriptor["min-instance-count"])
1490 if nb_scale_op <= min_instance_count:
1491 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1492 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1493 nb_scale_op = nb_scale_op - 1
1494 vdu_scaling_info["scaling_direction"] = "IN"
1495 vdu_scaling_info["vdu-delete"] = {}
1496 for vdu_scale_info in scaling_descriptor["vdu"]:
1497 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1498 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1499 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1500
1501 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1502 vdu_create = vdu_scaling_info.get("vdu-create")
1503 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1504 if vdu_scaling_info["scaling_direction"] == "IN":
1505 for vdur in reversed(db_vnfr["vdur"]):
1506 if vdu_delete.get(vdur["vdu-id-ref"]):
1507 vdu_delete[vdur["vdu-id-ref"]] -= 1
1508 vdu_scaling_info["vdu"].append({
1509 "name": vdur["name"],
1510 "vdu_id": vdur["vdu-id-ref"],
1511 "interface": []
1512 })
1513 for interface in vdur["interfaces"]:
1514 vdu_scaling_info["vdu"][-1]["interface"].append({
1515 "name": interface["name"],
1516 "ip_address": interface["ip-address"],
1517 "mac_address": interface.get("mac-address"),
1518 })
1519 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1520
1521 # execute primitive service PRE-SCALING
1522 step = "Executing pre-scale vnf-config-primitive"
1523 if scaling_descriptor.get("scaling-config-action"):
1524 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1525 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1526 and scaling_type == "SCALE_IN":
1527 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1528 step = db_nslcmop_update["detailed-status"] = \
1529 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1530 # look for primitive
1531 primitive_params = {}
1532 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1533 if config_primitive["name"] == vnf_config_primitive:
1534 for parameter in config_primitive.get("parameter", ()):
1535 if 'default-value' in parameter and \
1536 parameter['default-value'] == "<VDU_SCALE_INFO>":
1537 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1538 default_flow_style=True,
1539 width=256)
1540 break
1541 else:
1542 raise LcmException(
1543 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1544 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1545 "primitive".format(scaling_group, config_primitive))
1546 scale_process = "VCA"
1547 db_nsr_update["config-status"] = "configuring pre-scaling"
1548 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1549 vnf_config_primitive, primitive_params)
1550 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1551 vnf_config_primitive, result, result_detail))
1552 if result == "FAILED":
1553 raise LcmException(result_detail)
1554 db_nsr_update["config-status"] = old_config_status
1555 scale_process = None
1556
1557 if RO_scaling_info:
1558 scale_process = "RO"
1559 RO = ROclient.ROClient(self.loop, **self.ro_config)
1560 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1561 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1562 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1563 # wait until ready
1564 RO_nslcmop_id = RO_desc["instance_action_id"]
1565 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1566
1567 RO_task_done = False
1568 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1569 detailed_status_old = None
1570 self.logger.debug(logging_text + step)
1571
1572 deployment_timeout = 1 * 3600 # One hour
1573 while deployment_timeout > 0:
1574 if not RO_task_done:
1575 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1576 extra_item_id=RO_nslcmop_id)
1577 ns_status, ns_status_info = RO.check_action_status(desc)
1578 if ns_status == "ERROR":
1579 raise ROclient.ROClientException(ns_status_info)
1580 elif ns_status == "BUILD":
1581 detailed_status = step + "; {}".format(ns_status_info)
1582 elif ns_status == "ACTIVE":
1583 RO_task_done = True
1584 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1585 self.logger.debug(logging_text + step)
1586 else:
1587 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1588 else:
1589 desc = await RO.show("ns", RO_nsr_id)
1590 ns_status, ns_status_info = RO.check_ns_status(desc)
1591 if ns_status == "ERROR":
1592 raise ROclient.ROClientException(ns_status_info)
1593 elif ns_status == "BUILD":
1594 detailed_status = step + "; {}".format(ns_status_info)
1595 elif ns_status == "ACTIVE":
1596 step = detailed_status = "Waiting for management IP address reported by the VIM"
1597 try:
1598 desc = await RO.show("ns", RO_nsr_id)
1599 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
1600 break
1601 except ROclient.ROClientException as e:
1602 if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
1603 raise e
1604 else:
1605 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1606 if detailed_status != detailed_status_old:
1607 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1608 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1609
1610 await asyncio.sleep(5, loop=self.loop)
1611 deployment_timeout -= 5
1612 if deployment_timeout <= 0:
1613 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1614
1615 step = "Updating VNFRs"
1616 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1617 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1618
1619 # update VDU_SCALING_INFO with the obtained ip_addresses
1620 if vdu_scaling_info["scaling_direction"] == "OUT":
1621 for vdur in reversed(db_vnfr["vdur"]):
1622 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1623 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1624 vdu_scaling_info["vdu"].append({
1625 "name": vdur["name"],
1626 "vdu_id": vdur["vdu-id-ref"],
1627 "interface": []
1628 })
1629 for interface in vdur["interfaces"]:
1630 vdu_scaling_info["vdu"][-1]["interface"].append({
1631 "name": interface["name"],
1632 "ip_address": interface["ip-address"],
1633 "mac_address": interface.get("mac-address"),
1634 })
1635 del vdu_scaling_info["vdu-create"]
1636
1637 scale_process = None
1638 if db_nsr_update:
1639 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1640
1641 # execute primitive service POST-SCALING
1642 step = "Executing post-scale vnf-config-primitive"
1643 if scaling_descriptor.get("scaling-config-action"):
1644 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1645 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1646 and scaling_type == "SCALE_OUT":
1647 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1648 step = db_nslcmop_update["detailed-status"] = \
1649 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1650 # look for primitive
1651 primitive_params = {}
1652 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1653 if config_primitive["name"] == vnf_config_primitive:
1654 for parameter in config_primitive.get("parameter", ()):
1655 if 'default-value' in parameter and \
1656 parameter['default-value'] == "<VDU_SCALE_INFO>":
1657 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1658 default_flow_style=True,
1659 width=256)
1660 break
1661 else:
1662 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1663 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1664 "match any vnf-configuration:config-primitive".format(scaling_group,
1665 config_primitive))
1666 scale_process = "VCA"
1667 db_nsr_update["config-status"] = "configuring post-scaling"
1668
1669 result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
1670 vnf_config_primitive, primitive_params)
1671 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1672 vnf_config_primitive, result, result_detail))
1673 if result == "FAILED":
1674 raise LcmException(result_detail)
1675 db_nsr_update["config-status"] = old_config_status
1676 scale_process = None
1677
1678 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1679 db_nslcmop_update["statusEnteredTime"] = time()
1680 db_nslcmop_update["detailed-status"] = "done"
1681 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1682 db_nsr_update["operational-status"] = old_operational_status
1683 db_nsr_update["config-status"] = old_config_status
1684 return
1685 except (ROclient.ROClientException, DbException, LcmException) as e:
1686 self.logger.error(logging_text + "Exit Exception {}".format(e))
1687 exc = e
1688 except asyncio.CancelledError:
1689 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1690 exc = "Operation was cancelled"
1691 except Exception as e:
1692 exc = traceback.format_exc()
1693 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1694 finally:
1695 if exc:
1696 if db_nslcmop:
1697 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1698 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1699 db_nslcmop_update["statusEnteredTime"] = time()
1700 if db_nsr:
1701 db_nsr_update["operational-status"] = old_operational_status
1702 db_nsr_update["config-status"] = old_config_status
1703 db_nsr_update["detailed-status"] = ""
1704 db_nsr_update["_admin.nslcmop"] = None
1705 if scale_process:
1706 if "VCA" in scale_process:
1707 db_nsr_update["config-status"] = "failed"
1708 if "RO" in scale_process:
1709 db_nsr_update["operational-status"] = "failed"
1710 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1711 exc)
1712 if db_nslcmop_update:
1713 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1714 if db_nsr:
1715 db_nsr_update["_admin.nslcmop"] = None
1716 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1717 if nslcmop_operation_state:
1718 try:
1719 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1720 "operationState": nslcmop_operation_state})
1721 # if cooldown_time:
1722 # await asyncio.sleep(cooldown_time)
1723 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1724 except Exception as e:
1725 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1726 self.logger.debug(logging_text + "Exit")
1727 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")