173ad887d1861263aa368a3283ed4195da62e55b
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25
26 import ROclient
27 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
28
29 from osm_common.dbbase import DbException
30 from osm_common.fsbase import FsException
31 from n2vc.vnf import N2VC
32
33 from copy import copy, deepcopy
34 from http import HTTPStatus
35 from time import time
36 from uuid import uuid4
37
38 __author__ = "Alfonso Tierno"
39
40
41 def get_iterable(in_dict, in_key):
42 """
43 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
44 :param in_dict: a dictionary
45 :param in_key: the key to look for at in_dict
46 :return: in_dict[in_var] or () if it is None or not present
47 """
48 if not in_dict.get(in_key):
49 return ()
50 return in_dict[in_key]
51
52
53 def populate_dict(target_dict, key_list, value):
54 """
55 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
56 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
57 :param target_dict: dictionary to be changed
58 :param key_list: list of keys to insert at target_dict
59 :param value:
60 :return: None
61 """
62 for key in key_list[0:-1]:
63 if key not in target_dict:
64 target_dict[key] = {}
65 target_dict = target_dict[key]
66 target_dict[key_list[-1]] = value
67
68
69 class NsLcm(LcmBase):
70 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
71 total_deploy_timeout = 2 * 3600 # global timeout for deployment
72
73 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
74 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
79 # logging
80 self.logger = logging.getLogger('lcm.ns')
81 self.loop = loop
82 self.lcm_tasks = lcm_tasks
83
84 super().__init__(db, msg, fs, self.logger)
85
86 self.ro_config = ro_config
87
88 self.n2vc = N2VC(
89 log=self.logger,
90 server=vca_config['host'],
91 port=vca_config['port'],
92 user=vca_config['user'],
93 secret=vca_config['secret'],
94 # TODO: This should point to the base folder where charms are stored,
95 # if there is a common one (like object storage). Otherwise, leave
96 # it unset and pass it via DeployCharms
97 # artifacts=vca_config[''],
98 artifacts=None,
99 )
100
101 def vnfd2RO(self, vnfd, new_id=None):
102 """
103 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
104 :param vnfd: input vnfd
105 :param new_id: overrides vnf id if provided
106 :return: copy of vnfd
107 """
108 ci_file = None
109 try:
110 vnfd_RO = deepcopy(vnfd)
111 vnfd_RO.pop("_id", None)
112 vnfd_RO.pop("_admin", None)
113 if new_id:
114 vnfd_RO["id"] = new_id
115 for vdu in vnfd_RO.get("vdu", ()):
116 if "cloud-init-file" in vdu:
117 base_folder = vnfd["_admin"]["storage"]
118 clout_init_file = "{}/{}/cloud_init/{}".format(
119 base_folder["folder"],
120 base_folder["pkg-dir"],
121 vdu["cloud-init-file"]
122 )
123 ci_file = self.fs.file_open(clout_init_file, "r")
124 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
125 # convert to base 64 or similar
126 clout_init_content = ci_file.read()
127 ci_file.close()
128 ci_file = None
129 vdu.pop("cloud-init-file", None)
130 vdu["cloud-init"] = clout_init_content
131 # remnove unused by RO configuration, monitoring, scaling
132 vnfd_RO.pop("vnf-configuration", None)
133 vnfd_RO.pop("monitoring-param", None)
134 vnfd_RO.pop("scaling-group-descriptor", None)
135 return vnfd_RO
136 except FsException as e:
137 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
138 finally:
139 if ci_file:
140 ci_file.close()
141
142 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
143 """
144 Callback both for charm status change and task completion
145 :param model_name: Charm model name
146 :param application_name: Charm application name
147 :param status: Can be
148 - blocked: The unit needs manual intervention
149 - maintenance: The unit is actively deploying/configuring
150 - waiting: The unit is waiting for another charm to be ready
151 - active: The unit is deployed, configured, and ready
152 - error: The charm has failed and needs attention.
153 - terminated: The charm has been destroyed
154 - removing,
155 - removed
156 :param message: detailed message error
157 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
158 nsr_id:
159 nslcmop_id:
160 lcmOperationType: currently "instantiate"
161 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
162 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
163 n2vc_event: event used to notify instantiation task that some change has been produced
164 :param task: None for charm status change, or task for completion task callback
165 :return:
166 """
167 try:
168 nsr_id = n2vc_info["nsr_id"]
169 deployed = n2vc_info["deployed"]
170 db_nsr_update = n2vc_info["db_update"]
171 nslcmop_id = n2vc_info["nslcmop_id"]
172 ns_operation = n2vc_info["lcmOperationType"]
173 n2vc_event = n2vc_info["n2vc_event"]
174 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
175 application_name)
176 for vca_index, vca_deployed in enumerate(deployed):
177 if not vca_deployed:
178 continue
179 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
180 break
181 else:
182 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
183 return
184 if task:
185 if task.cancelled():
186 self.logger.debug(logging_text + " task Cancelled")
187 vca_deployed['operational-status'] = "error"
188 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
189 vca_deployed['detailed-status'] = "Task Cancelled"
190 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
191
192 elif task.done():
193 exc = task.exception()
194 if exc:
195 self.logger.error(logging_text + " task Exception={}".format(exc))
196 vca_deployed['operational-status'] = "error"
197 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
198 vca_deployed['detailed-status'] = str(exc)
199 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
200 else:
201 self.logger.debug(logging_text + " task Done")
202 # task is Done, but callback is still ongoing. So ignore
203 return
204 elif status:
205 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
206 if vca_deployed['operational-status'] == status:
207 return # same status, ignore
208 vca_deployed['operational-status'] = status
209 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
210 vca_deployed['detailed-status'] = str(message)
211 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
212 else:
213 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
214 return
215 # wake up instantiate task
216 n2vc_event.set()
217 except Exception as e:
218 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
219
220 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
221 """
222 Creates a RO ns descriptor from OSM ns_instantiate params
223 :param ns_params: OSM instantiate params
224 :return: The RO ns descriptor
225 """
226 vim_2_RO = {}
227 # TODO feature 1417: Check that no instantiation is set over PDU
228 # check if PDU forces a concrete vim-network-id and add it
229 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
230
231 def vim_account_2_RO(vim_account):
232 if vim_account in vim_2_RO:
233 return vim_2_RO[vim_account]
234
235 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
236 if db_vim["_admin"]["operationalState"] != "ENABLED":
237 raise LcmException("VIM={} is not available. operationalState={}".format(
238 vim_account, db_vim["_admin"]["operationalState"]))
239 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
240 vim_2_RO[vim_account] = RO_vim_id
241 return RO_vim_id
242
243 def ip_profile_2_RO(ip_profile):
244 RO_ip_profile = deepcopy((ip_profile))
245 if "dns-server" in RO_ip_profile:
246 if isinstance(RO_ip_profile["dns-server"], list):
247 RO_ip_profile["dns-address"] = []
248 for ds in RO_ip_profile.pop("dns-server"):
249 RO_ip_profile["dns-address"].append(ds['address'])
250 else:
251 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
252 if RO_ip_profile.get("ip-version") == "ipv4":
253 RO_ip_profile["ip-version"] = "IPv4"
254 if RO_ip_profile.get("ip-version") == "ipv6":
255 RO_ip_profile["ip-version"] = "IPv6"
256 if "dhcp-params" in RO_ip_profile:
257 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
258 return RO_ip_profile
259
260 if not ns_params:
261 return None
262 RO_ns_params = {
263 # "name": ns_params["nsName"],
264 # "description": ns_params.get("nsDescription"),
265 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
266 # "scenario": ns_params["nsdId"],
267 }
268 if n2vc_key_list:
269 for vnfd_ref, vnfd in vnfd_dict.items():
270 vdu_needed_access = []
271 mgmt_cp = None
272 if vnfd.get("vnf-configuration"):
273 if vnfd.get("mgmt-interface"):
274 if vnfd["mgmt-interface"].get("vdu-id"):
275 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
276 elif vnfd["mgmt-interface"].get("cp"):
277 mgmt_cp = vnfd["mgmt-interface"]["cp"]
278
279 for vdu in vnfd.get("vdu", ()):
280 if vdu.get("vdu-configuration"):
281 vdu_needed_access.append(vdu["id"])
282 elif mgmt_cp:
283 for vdu_interface in vdu.get("interface"):
284 if vdu_interface.get("external-connection-point-ref") and \
285 vdu_interface["external-connection-point-ref"] == mgmt_cp:
286 vdu_needed_access.append(vdu["id"])
287 mgmt_cp = None
288 break
289
290 if vdu_needed_access:
291 for vnf_member in nsd.get("constituent-vnfd"):
292 if vnf_member["vnfd-id-ref"] != vnfd_ref:
293 continue
294 for vdu in vdu_needed_access:
295 populate_dict(RO_ns_params,
296 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
297 n2vc_key_list)
298
299 if ns_params.get("vduImage"):
300 RO_ns_params["vduImage"] = ns_params["vduImage"]
301
302 if ns_params.get("ssh_keys"):
303 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
304 for vnf_params in get_iterable(ns_params, "vnf"):
305 for constituent_vnfd in nsd["constituent-vnfd"]:
306 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
307 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
308 break
309 else:
310 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
311 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
312 if vnf_params.get("vimAccountId"):
313 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
314 vim_account_2_RO(vnf_params["vimAccountId"]))
315
316 for vdu_params in get_iterable(vnf_params, "vdu"):
317 # TODO feature 1417: check that this VDU exist and it is not a PDU
318 if vdu_params.get("volume"):
319 for volume_params in vdu_params["volume"]:
320 if volume_params.get("vim-volume-id"):
321 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
322 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
323 volume_params["vim-volume-id"])
324 if vdu_params.get("interface"):
325 for interface_params in vdu_params["interface"]:
326 if interface_params.get("ip-address"):
327 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
328 vdu_params["id"], "interfaces", interface_params["name"],
329 "ip_address"),
330 interface_params["ip-address"])
331 if interface_params.get("mac-address"):
332 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
333 vdu_params["id"], "interfaces", interface_params["name"],
334 "mac_address"),
335 interface_params["mac-address"])
336 if interface_params.get("floating-ip-required"):
337 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
338 vdu_params["id"], "interfaces", interface_params["name"],
339 "floating-ip"),
340 interface_params["floating-ip-required"])
341
342 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
343 if internal_vld_params.get("vim-network-name"):
344 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
345 internal_vld_params["name"], "vim-network-name"),
346 internal_vld_params["vim-network-name"])
347 if internal_vld_params.get("ip-profile"):
348 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
349 internal_vld_params["name"], "ip-profile"),
350 ip_profile_2_RO(internal_vld_params["ip-profile"]))
351
352 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
353 # look for interface
354 iface_found = False
355 for vdu_descriptor in vnf_descriptor["vdu"]:
356 for vdu_interface in vdu_descriptor["interface"]:
357 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
358 if icp_params.get("ip-address"):
359 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
360 vdu_descriptor["id"], "interfaces",
361 vdu_interface["name"], "ip_address"),
362 icp_params["ip-address"])
363
364 if icp_params.get("mac-address"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_descriptor["id"], "interfaces",
367 vdu_interface["name"], "mac_address"),
368 icp_params["mac-address"])
369 iface_found = True
370 break
371 if iface_found:
372 break
373 else:
374 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
375 "internal-vld:id-ref={} is not present at vnfd:internal-"
376 "connection-point".format(vnf_params["member-vnf-index"],
377 icp_params["id-ref"]))
378
379 for vld_params in get_iterable(ns_params, "vld"):
380 if "ip-profile" in vld_params:
381 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(vld_params["ip-profile"]))
383 if vld_params.get("vim-network-name"):
384 RO_vld_sites = []
385 if isinstance(vld_params["vim-network-name"], dict):
386 for vim_account, vim_net in vld_params["vim-network-name"].items():
387 RO_vld_sites.append({
388 "netmap-use": vim_net,
389 "datacenter": vim_account_2_RO(vim_account)
390 })
391 else: # isinstance str
392 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
393 if RO_vld_sites:
394 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
395 if "vnfd-connection-point-ref" in vld_params:
396 for cp_params in vld_params["vnfd-connection-point-ref"]:
397 # look for interface
398 for constituent_vnfd in nsd["constituent-vnfd"]:
399 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
400 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
401 break
402 else:
403 raise LcmException(
404 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
405 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
406 match_cp = False
407 for vdu_descriptor in vnf_descriptor["vdu"]:
408 for interface_descriptor in vdu_descriptor["interface"]:
409 if interface_descriptor.get("external-connection-point-ref") == \
410 cp_params["vnfd-connection-point-ref"]:
411 match_cp = True
412 break
413 if match_cp:
414 break
415 else:
416 raise LcmException(
417 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
418 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
419 cp_params["member-vnf-index-ref"],
420 cp_params["vnfd-connection-point-ref"],
421 vnf_descriptor["id"]))
422 if cp_params.get("ip-address"):
423 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
424 vdu_descriptor["id"], "interfaces",
425 interface_descriptor["name"], "ip_address"),
426 cp_params["ip-address"])
427 if cp_params.get("mac-address"):
428 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
429 vdu_descriptor["id"], "interfaces",
430 interface_descriptor["name"], "mac_address"),
431 cp_params["mac-address"])
432 return RO_ns_params
433
434 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
435 # make a copy to do not change
436 vdu_create = copy(vdu_create)
437 vdu_delete = copy(vdu_delete)
438
439 vdurs = db_vnfr.get("vdur")
440 if vdurs is None:
441 vdurs = []
442 vdu_index = len(vdurs)
443 while vdu_index:
444 vdu_index -= 1
445 vdur = vdurs[vdu_index]
446 if vdur.get("pdu-type"):
447 continue
448 vdu_id_ref = vdur["vdu-id-ref"]
449 if vdu_create and vdu_create.get(vdu_id_ref):
450 for index in range(0, vdu_create[vdu_id_ref]):
451 vdur = deepcopy(vdur)
452 vdur["_id"] = str(uuid4())
453 vdur["count-index"] += 1
454 vdurs.insert(vdu_index+1+index, vdur)
455 del vdu_create[vdu_id_ref]
456 if vdu_delete and vdu_delete.get(vdu_id_ref):
457 del vdurs[vdu_index]
458 vdu_delete[vdu_id_ref] -= 1
459 if not vdu_delete[vdu_id_ref]:
460 del vdu_delete[vdu_id_ref]
461 # check all operations are done
462 if vdu_create or vdu_delete:
463 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
464 vdu_create))
465 if vdu_delete:
466 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
467 vdu_delete))
468
469 vnfr_update = {"vdur": vdurs}
470 db_vnfr["vdur"] = vdurs
471 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
472
473 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
474 """
475 Updates database nsr with the RO info for the created vld
476 :param ns_update_nsr: dictionary to be filled with the updated info
477 :param db_nsr: content of db_nsr. This is also modified
478 :param nsr_desc_RO: nsr descriptor from RO
479 :return: Nothing, LcmException is raised on errors
480 """
481
482 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
483 for net_RO in get_iterable(nsr_desc_RO, "nets"):
484 if vld["id"] != net_RO.get("ns_net_osm_id"):
485 continue
486 vld["vim-id"] = net_RO.get("vim_net_id")
487 vld["name"] = net_RO.get("vim_name")
488 vld["status"] = net_RO.get("status")
489 vld["status-detailed"] = net_RO.get("error_msg")
490 ns_update_nsr["vld.{}".format(vld_index)] = vld
491 break
492 else:
493 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
494
495 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
496 """
497 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
498 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
499 :param nsr_desc_RO: nsr descriptor from RO
500 :return: Nothing, LcmException is raised on errors
501 """
502 for vnf_index, db_vnfr in db_vnfrs.items():
503 for vnf_RO in nsr_desc_RO["vnfs"]:
504 if vnf_RO["member_vnf_index"] != vnf_index:
505 continue
506 vnfr_update = {}
507 if vnf_RO.get("ip_address"):
508 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
509 elif not db_vnfr.get("ip-address"):
510 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
511
512 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
513 vdur_RO_count_index = 0
514 if vdur.get("pdu-type"):
515 continue
516 for vdur_RO in get_iterable(vnf_RO, "vms"):
517 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
518 continue
519 if vdur["count-index"] != vdur_RO_count_index:
520 vdur_RO_count_index += 1
521 continue
522 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
523 vdur["ip-address"] = vdur_RO.get("ip_address")
524 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
525 vdur["name"] = vdur_RO.get("vim_name")
526 vdur["status"] = vdur_RO.get("status")
527 vdur["status-detailed"] = vdur_RO.get("error_msg")
528 for ifacer in get_iterable(vdur, "interfaces"):
529 for interface_RO in get_iterable(vdur_RO, "interfaces"):
530 if ifacer["name"] == interface_RO.get("internal_name"):
531 ifacer["ip-address"] = interface_RO.get("ip_address")
532 ifacer["mac-address"] = interface_RO.get("mac_address")
533 break
534 else:
535 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
536 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
537 vnfr_update["vdur.{}".format(vdu_index)] = vdur
538 break
539 else:
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
541 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
542
543 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
544 for net_RO in get_iterable(nsr_desc_RO, "nets"):
545 if vld["id"] != net_RO.get("vnf_net_osm_id"):
546 continue
547 vld["vim-id"] = net_RO.get("vim_net_id")
548 vld["name"] = net_RO.get("vim_name")
549 vld["status"] = net_RO.get("status")
550 vld["status-detailed"] = net_RO.get("error_msg")
551 vnfr_update["vld.{}".format(vld_index)] = vld
552 break
553 else:
554 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
555 vnf_index, vld["id"]))
556
557 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
558 break
559
560 else:
561 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
562
563 async def instantiate(self, nsr_id, nslcmop_id):
564 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
565 self.logger.debug(logging_text + "Enter")
566 # get all needed from database
567 start_deploy = time()
568 db_nsr = None
569 db_nslcmop = None
570 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
571 db_nslcmop_update = {}
572 nslcmop_operation_state = None
573 db_vnfrs = {}
574 RO_descriptor_number = 0 # number of descriptors created at RO
575 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
576 n2vc_info = {}
577 exc = None
578 try:
579 step = "Getting nslcmop={} from db".format(nslcmop_id)
580 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
581 step = "Getting nsr={} from db".format(nsr_id)
582 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
583 ns_params = db_nslcmop.get("operationParams")
584 nsd = db_nsr["nsd"]
585 nsr_name = db_nsr["name"] # TODO short-name??
586
587 # look if previous tasks in process
588 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
589 if task_dependency:
590 step = db_nslcmop_update["detailed-status"] = \
591 "Waiting for related tasks to be completed: {}".format(task_name)
592 self.logger.debug(logging_text + step)
593 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
594 _, pending = await asyncio.wait(task_dependency, timeout=3600)
595 if pending:
596 raise LcmException("Timeout waiting related tasks to be completed")
597
598 step = "Getting vnfrs from db"
599 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
600 db_vnfds_ref = {}
601 db_vnfds = {}
602 for vnfr in db_vnfrs_list:
603 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
604 vnfd_id = vnfr["vnfd-id"]
605 vnfd_ref = vnfr["vnfd-ref"]
606 if vnfd_id not in db_vnfds:
607 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
608 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
609 db_vnfds_ref[vnfd_ref] = vnfd
610 db_vnfds[vnfd_id] = vnfd
611
612 # Get or generates the _admin.deployed,VCA list
613 vca_deployed_list = None
614 if db_nsr["_admin"].get("deployed"):
615 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
616 if vca_deployed_list is None:
617 vca_deployed_list = []
618 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
619 elif isinstance(vca_deployed_list, dict):
620 # maintain backward compatibility. Change a dict to list at database
621 vca_deployed_list = list(vca_deployed_list.values())
622 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
623
624 db_nsr_update["detailed-status"] = "creating"
625 db_nsr_update["operational-status"] = "init"
626
627 RO = ROclient.ROClient(self.loop, **self.ro_config)
628
629 # get vnfds, instantiate at RO
630 for vnfd_id, vnfd in db_vnfds.items():
631 vnfd_ref = vnfd["id"]
632 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
633 # self.logger.debug(logging_text + step)
634 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
635 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
636 RO_descriptor_number += 1
637
638 # look if present
639 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
640 if vnfd_list:
641 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
642 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
643 vnfd_ref, vnfd_list[0]["uuid"]))
644 else:
645 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
646 desc = await RO.create("vnfd", descriptor=vnfd_RO)
647 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
648 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
649 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
650 vnfd_ref, desc["uuid"]))
651 self.update_db_2("nsrs", nsr_id, db_nsr_update)
652
653 # create nsd at RO
654 nsd_ref = nsd["id"]
655 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
656 # self.logger.debug(logging_text + step)
657
658 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
659 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
660 RO_descriptor_number += 1
661 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
662 if nsd_list:
663 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
664 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
665 nsd_ref, RO_nsd_uuid))
666 else:
667 nsd_RO = deepcopy(nsd)
668 nsd_RO["id"] = RO_osm_nsd_id
669 nsd_RO.pop("_id", None)
670 nsd_RO.pop("_admin", None)
671 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
672 vnfd_id = c_vnf["vnfd-id-ref"]
673 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
674 desc = await RO.create("nsd", descriptor=nsd_RO)
675 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
676 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
677 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
678 self.update_db_2("nsrs", nsr_id, db_nsr_update)
679
680 # Crate ns at RO
681 # if present use it unless in error status
682 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
683 if RO_nsr_id:
684 try:
685 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
686 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
687 desc = await RO.show("ns", RO_nsr_id)
688 except ROclient.ROClientException as e:
689 if e.http_code != HTTPStatus.NOT_FOUND:
690 raise
691 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
692 if RO_nsr_id:
693 ns_status, ns_status_info = RO.check_ns_status(desc)
694 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
695 if ns_status == "ERROR":
696 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
697 self.logger.debug(logging_text + step)
698 await RO.delete("ns", RO_nsr_id)
699 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
700 if not RO_nsr_id:
701 step = db_nsr_update["detailed-status"] = "Checking dependencies"
702 # self.logger.debug(logging_text + step)
703
704 # check if VIM is creating and wait look if previous tasks in process
705 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
706 if task_dependency:
707 step = "Waiting for related tasks to be completed: {}".format(task_name)
708 self.logger.debug(logging_text + step)
709 await asyncio.wait(task_dependency, timeout=3600)
710 if ns_params.get("vnf"):
711 for vnf in ns_params["vnf"]:
712 if "vimAccountId" in vnf:
713 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
714 vnf["vimAccountId"])
715 if task_dependency:
716 step = "Waiting for related tasks to be completed: {}".format(task_name)
717 self.logger.debug(logging_text + step)
718 await asyncio.wait(task_dependency, timeout=3600)
719
720 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
721
722 # feature 1429. Add n2vc public key to needed VMs
723 n2vc_key = await self.n2vc.GetPublicKey()
724 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
725
726 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
727 desc = await RO.create("ns", descriptor=RO_ns_params,
728 name=db_nsr["name"],
729 scenario=RO_nsd_uuid)
730 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
731 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
732 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
733 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
734 self.update_db_2("nsrs", nsr_id, db_nsr_update)
735
736 # wait until NS is ready
737 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
738 detailed_status_old = None
739 self.logger.debug(logging_text + step)
740
741 while time() <= start_deploy + self.total_deploy_timeout:
742 desc = await RO.show("ns", RO_nsr_id)
743 ns_status, ns_status_info = RO.check_ns_status(desc)
744 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
745 if ns_status == "ERROR":
746 raise ROclient.ROClientException(ns_status_info)
747 elif ns_status == "BUILD":
748 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
749 elif ns_status == "ACTIVE":
750 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
751 try:
752 self.ns_update_vnfr(db_vnfrs, desc)
753 break
754 except LcmExceptionNoMgmtIP:
755 pass
756 else:
757 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
758 if detailed_status != detailed_status_old:
759 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
760 self.update_db_2("nsrs", nsr_id, db_nsr_update)
761 await asyncio.sleep(5, loop=self.loop)
762 else: # total_deploy_timeout
763 raise ROclient.ROClientException("Timeout waiting ns to be ready")
764
765 step = "Updating NSR"
766 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
767
768 db_nsr["detailed-status"] = "Configuring vnfr"
769 self.update_db_2("nsrs", nsr_id, db_nsr_update)
770
771 # The parameters we'll need to deploy a charm
772 number_to_configure = 0
773
774 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, mgmt_ip_address, n2vc_info,
775 config_primitive=None):
776 """An inner function to deploy the charm from either vnf or vdu
777 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
778 """
779 if not mgmt_ip_address:
780 raise LcmException("vnfd/vdu has not management ip address to configure it")
781 # Login to the VCA.
782 # if number_to_configure == 0:
783 # self.logger.debug("Logging into N2VC...")
784 # task = asyncio.ensure_future(self.n2vc.login())
785 # yield from asyncio.wait_for(task, 30.0)
786 # self.logger.debug("Logged into N2VC!")
787
788 # # await self.n2vc.login()
789
790 # Note: The charm needs to exist on disk at the location
791 # specified by charm_path.
792 base_folder = vnfd["_admin"]["storage"]
793 storage_params = self.fs.get_params()
794 charm_path = "{}{}/{}/charms/{}".format(
795 storage_params["path"],
796 base_folder["folder"],
797 base_folder["pkg-dir"],
798 proxy_charm
799 )
800
801 # Setup the runtime parameters for this VNF
802 params = {'rw_mgmt_ip': mgmt_ip_address}
803 if config_primitive:
804 params["initial-config-primitive"] = config_primitive
805
806 # ns_name will be ignored in the current version of N2VC
807 # but will be implemented for the next point release.
808 model_name = "default" # TODO bug 585 nsr_id
809 if vdu_id:
810 vdu_id_text = vdu_id + "-"
811 else:
812 vdu_id_text = "-"
813 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
814
815 vca_index = len(vca_deployed_list)
816 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
817 # 26*26 charm in the same NS
818 application_name = application_name[0:48]
819 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
820 vca_deployed_ = {
821 "member-vnf-index": vnf_index,
822 "vdu_id": vdu_id,
823 "model": model_name,
824 "application": application_name,
825 "operational-status": "init",
826 "detailed-status": "",
827 "vnfd_id": vnfd_id,
828 "vdu_name": vdu_name,
829 "vdu_count_index": vdu_count_index,
830 }
831 vca_deployed_list.append(vca_deployed_)
832 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
833 self.update_db_2("nsrs", nsr_id, db_nsr_update)
834
835 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
836 proxy_charm))
837 if not n2vc_info:
838 n2vc_info["nsr_id"] = nsr_id
839 n2vc_info["nslcmop_id"] = nslcmop_id
840 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
841 n2vc_info["lcmOperationType"] = "instantiate"
842 n2vc_info["deployed"] = vca_deployed_list
843 n2vc_info["db_update"] = db_nsr_update
844 task = asyncio.ensure_future(
845 self.n2vc.DeployCharms(
846 model_name, # The network service name
847 application_name, # The application name
848 vnfd, # The vnf descriptor
849 charm_path, # Path to charm
850 params, # Runtime params, like mgmt ip
851 {}, # for native charms only
852 self.n2vc_callback, # Callback for status changes
853 n2vc_info, # Callback parameter
854 None, # Callback parameter (task)
855 )
856 )
857 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
858 n2vc_info))
859 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
860
861 step = "Looking for needed vnfd to configure"
862 self.logger.debug(logging_text + step)
863
864 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
865 vnfd_id = c_vnf["vnfd-id-ref"]
866 vnf_index = str(c_vnf["member-vnf-index"])
867 vnfd = db_vnfds_ref[vnfd_id]
868
869 # Check if this VNF has a charm configuration
870 vnf_config = vnfd.get("vnf-configuration")
871
872 if vnf_config and vnf_config.get("juju"):
873 proxy_charm = vnf_config["juju"]["charm"]
874 config_primitive = None
875
876 if proxy_charm:
877 if 'initial-config-primitive' in vnf_config:
878 config_primitive = vnf_config['initial-config-primitive']
879
880 # Login to the VCA. If there are multiple calls to login(),
881 # subsequent calls will be a nop and return immediately.
882 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
883 await self.n2vc.login()
884 deploy_charm(vnf_index, None, None, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info,
885 config_primitive)
886 number_to_configure += 1
887
888 # Deploy charms for each VDU that supports one.
889 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
890 vdu_config = vdu.get('vdu-configuration')
891 proxy_charm = None
892 config_primitive = None
893
894 if vdu_config and vdu_config.get("juju"):
895 proxy_charm = vdu_config["juju"]["charm"]
896
897 if 'initial-config-primitive' in vdu_config:
898 config_primitive = vdu_config['initial-config-primitive']
899
900 if proxy_charm:
901 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
902 await self.n2vc.login()
903 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
904 # TODO for the moment only first vdu_id contains a charm deployed
905 if vdur["vdu-id-ref"] != vdu["id"]:
906 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
907 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
908 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
909 vdur["ip-address"], n2vc_info, config_primitive)
910 number_to_configure += 1
911
912 db_nsr_update["operational-status"] = "running"
913 configuration_failed = False
914 if number_to_configure:
915 old_status = "configuring: init: {}".format(number_to_configure)
916 db_nsr_update["config-status"] = old_status
917 db_nsr_update["detailed-status"] = old_status
918 db_nslcmop_update["detailed-status"] = old_status
919
920 # wait until all are configured.
921 while time() <= start_deploy + self.total_deploy_timeout:
922 if db_nsr_update:
923 self.update_db_2("nsrs", nsr_id, db_nsr_update)
924 if db_nslcmop_update:
925 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
926 # TODO add a fake tast that set n2vc_event after some time
927 await n2vc_info["n2vc_event"].wait()
928 n2vc_info["n2vc_event"].clear()
929 all_active = True
930 status_map = {}
931 n2vc_error_text = [] # contain text error list. If empty no one is in error status
932 now = time()
933 for vca_deployed in vca_deployed_list:
934 vca_status = vca_deployed["operational-status"]
935 if vca_status not in status_map:
936 # Initialize it
937 status_map[vca_status] = 0
938 status_map[vca_status] += 1
939
940 if vca_status == "active":
941 vca_deployed.pop("time_first_error", None)
942 vca_deployed.pop("status_first_error", None)
943 continue
944
945 all_active = False
946 if vca_status in ("error", "blocked"):
947 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
948 # if not first time in this status error
949 if not vca_deployed.get("time_first_error"):
950 vca_deployed["time_first_error"] = now
951 continue
952 if vca_deployed.get("time_first_error") and \
953 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
954 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
955 .format(vca_deployed["member-vnf-index"],
956 vca_deployed["vdu_id"], vca_status,
957 vca_deployed["detailed-status-error"]))
958
959 if all_active:
960 break
961 elif n2vc_error_text:
962 db_nsr_update["config-status"] = "failed"
963 error_text = "fail configuring " + ";".join(n2vc_error_text)
964 db_nsr_update["detailed-status"] = error_text
965 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
966 db_nslcmop_update["detailed-status"] = error_text
967 db_nslcmop_update["statusEnteredTime"] = time()
968 configuration_failed = True
969 break
970 else:
971 cs = "configuring: "
972 separator = ""
973 for status, num in status_map.items():
974 cs += separator + "{}: {}".format(status, num)
975 separator = ", "
976 if old_status != cs:
977 db_nsr_update["config-status"] = cs
978 db_nsr_update["detailed-status"] = cs
979 db_nslcmop_update["detailed-status"] = cs
980 old_status = cs
981 else: # total_deploy_timeout
982 raise LcmException("Timeout waiting ns to be configured")
983
984 if not configuration_failed:
985 # all is done
986 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
987 db_nslcmop_update["statusEnteredTime"] = time()
988 db_nslcmop_update["detailed-status"] = "done"
989 db_nsr_update["config-status"] = "configured"
990 db_nsr_update["detailed-status"] = "done"
991
992 return
993
994 except (ROclient.ROClientException, DbException, LcmException) as e:
995 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
996 exc = e
997 except asyncio.CancelledError:
998 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
999 exc = "Operation was cancelled"
1000 except Exception as e:
1001 exc = traceback.format_exc()
1002 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1003 exc_info=True)
1004 finally:
1005 if exc:
1006 if db_nsr:
1007 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1008 db_nsr_update["operational-status"] = "failed"
1009 if db_nslcmop:
1010 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1011 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1012 db_nslcmop_update["statusEnteredTime"] = time()
1013 if db_nsr:
1014 db_nsr_update["_admin.nslcmop"] = None
1015 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1016 if db_nslcmop_update:
1017 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1018 if nslcmop_operation_state:
1019 try:
1020 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1021 "operationState": nslcmop_operation_state})
1022 except Exception as e:
1023 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1024
1025 self.logger.debug(logging_text + "Exit")
1026 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1027
1028 async def terminate(self, nsr_id, nslcmop_id):
1029 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1030 self.logger.debug(logging_text + "Enter")
1031 db_nsr = None
1032 db_nslcmop = None
1033 exc = None
1034 failed_detail = [] # annotates all failed error messages
1035 vca_task_list = []
1036 vca_task_dict = {}
1037 vca_application_name2index = {}
1038 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1039 db_nslcmop_update = {}
1040 nslcmop_operation_state = None
1041 try:
1042 step = "Getting nslcmop={} from db".format(nslcmop_id)
1043 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1044 step = "Getting nsr={} from db".format(nsr_id)
1045 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1046 # nsd = db_nsr["nsd"]
1047 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1048 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1049 return
1050 # TODO ALF remove
1051 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1052 # #TODO check if VIM is creating and wait
1053 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1054
1055 db_nsr_update["operational-status"] = "terminating"
1056 db_nsr_update["config-status"] = "terminating"
1057
1058 if nsr_deployed and nsr_deployed.get("VCA"):
1059 try:
1060 step = "Scheduling configuration charms removing"
1061 db_nsr_update["detailed-status"] = "Deleting charms"
1062 self.logger.debug(logging_text + step)
1063 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1064 # for backward compatibility
1065 if isinstance(nsr_deployed["VCA"], dict):
1066 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1067 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1068 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1069
1070 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1071 if vca_deployed: # TODO it would be desirable having a and deploy_info.get("deployed"):
1072 task = asyncio.ensure_future(
1073 self.n2vc.RemoveCharms(
1074 vca_deployed['model'],
1075 vca_deployed["application"],
1076 # self.n2vc_callback,
1077 # db_nsr,
1078 # db_nslcmop,
1079 )
1080 )
1081 vca_application_name2index[vca_deployed["application"]] = vca_index
1082 vca_task_list.append(task)
1083 vca_task_dict[vca_deployed["application"]] = task
1084 # task.add_done_callback(functools.partial(self.n2vc_callback, vca_deployed['model'],
1085 # vca_deployed['application'], None, db_nsr,
1086 # db_nslcmop, vnf_index))
1087 self.lcm_tasks.register("ns", nsr_id, nslcmop_id,
1088 "delete_charm:" + vca_deployed["application"], task)
1089 except Exception as e:
1090 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1091
1092 # remove from RO
1093 RO_fail = False
1094 RO = ROclient.ROClient(self.loop, **self.ro_config)
1095
1096 # Delete ns
1097 RO_nsr_id = RO_delete_action = None
1098 if nsr_deployed and nsr_deployed.get("RO"):
1099 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1100 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1101 try:
1102 if RO_nsr_id:
1103 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1104 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1105 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1106 self.logger.debug(logging_text + step)
1107 desc = await RO.delete("ns", RO_nsr_id)
1108 RO_delete_action = desc["action_id"]
1109 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1110 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1111 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1112 if RO_delete_action:
1113 # wait until NS is deleted from VIM
1114 step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id)
1115 detailed_status_old = None
1116 self.logger.debug(logging_text + step)
1117
1118 delete_timeout = 20 * 60 # 20 minutes
1119 while delete_timeout > 0:
1120 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1121 extra_item_id=RO_delete_action)
1122 ns_status, ns_status_info = RO.check_action_status(desc)
1123 if ns_status == "ERROR":
1124 raise ROclient.ROClientException(ns_status_info)
1125 elif ns_status == "BUILD":
1126 detailed_status = step + "; {}".format(ns_status_info)
1127 elif ns_status == "ACTIVE":
1128 break
1129 else:
1130 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1131 if detailed_status != detailed_status_old:
1132 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1133 db_nsr_update["detailed-status"] = detailed_status
1134 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1135 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1136 await asyncio.sleep(5, loop=self.loop)
1137 delete_timeout -= 5
1138 else: # delete_timeout <= 0:
1139 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1140
1141 except ROclient.ROClientException as e:
1142 if e.http_code == 404: # not found
1143 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1144 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1145 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1146 elif e.http_code == 409: # conflict
1147 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1148 self.logger.debug(logging_text + failed_detail[-1])
1149 RO_fail = True
1150 else:
1151 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1152 self.logger.error(logging_text + failed_detail[-1])
1153 RO_fail = True
1154
1155 # Delete nsd
1156 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1157 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1158 try:
1159 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1160 "Deleting nsd at RO"
1161 await RO.delete("nsd", RO_nsd_id)
1162 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1163 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1164 except ROclient.ROClientException as e:
1165 if e.http_code == 404: # not found
1166 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1167 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1168 elif e.http_code == 409: # conflict
1169 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1170 self.logger.debug(logging_text + failed_detail[-1])
1171 RO_fail = True
1172 else:
1173 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1174 self.logger.error(logging_text + failed_detail[-1])
1175 RO_fail = True
1176
1177 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1178 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1179 if not RO_vnfd_id:
1180 continue
1181 try:
1182 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1183 "Deleting vnfd={} at RO".format(vnf_id)
1184 await RO.delete("vnfd", RO_vnfd_id)
1185 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1186 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1187 except ROclient.ROClientException as e:
1188 if e.http_code == 404: # not found
1189 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1190 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1191 elif e.http_code == 409: # conflict
1192 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1193 self.logger.debug(logging_text + failed_detail[-1])
1194 else:
1195 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1196 self.logger.error(logging_text + failed_detail[-1])
1197
1198 if vca_task_list:
1199 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1200 "Waiting for deletion of configuration charms"
1201 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1202 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1203 await asyncio.wait(vca_task_list, timeout=300)
1204 for application_name, task in vca_task_dict.items():
1205 if task.cancelled():
1206 failed_detail.append("VCA[application_name={}] Deletion has been cancelled"
1207 .format(application_name))
1208 elif task.done():
1209 exc = task.exception()
1210 if exc:
1211 failed_detail.append("VCA[application_name={}] Deletion exception: {}"
1212 .format(application_name, exc))
1213 else:
1214 vca_index = vca_application_name2index[application_name]
1215 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = None
1216 else: # timeout
1217 # TODO Should it be cancelled?!!
1218 task.cancel()
1219 failed_detail.append("VCA[application_name={}] Deletion timeout".format(application_name))
1220
1221 if failed_detail:
1222 self.logger.error(logging_text + " ;".join(failed_detail))
1223 db_nsr_update["operational-status"] = "failed"
1224 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1225 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1226 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1227 db_nslcmop_update["statusEnteredTime"] = time()
1228 elif db_nslcmop["operationParams"].get("autoremove"):
1229 self.db.del_one("nsrs", {"_id": nsr_id})
1230 db_nsr_update.clear()
1231 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1232 nslcmop_operation_state = "COMPLETED"
1233 db_nslcmop_update.clear()
1234 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1235 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1236 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1237 self.logger.debug(logging_text + "Delete from database")
1238 else:
1239 db_nsr_update["operational-status"] = "terminated"
1240 db_nsr_update["detailed-status"] = "Done"
1241 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1242 db_nslcmop_update["detailed-status"] = "Done"
1243 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1244 db_nslcmop_update["statusEnteredTime"] = time()
1245
1246 except (ROclient.ROClientException, DbException) as e:
1247 self.logger.error(logging_text + "Exit Exception {}".format(e))
1248 exc = e
1249 except asyncio.CancelledError:
1250 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1251 exc = "Operation was cancelled"
1252 except Exception as e:
1253 exc = traceback.format_exc()
1254 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1255 finally:
1256 if exc and db_nslcmop:
1257 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1258 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1259 db_nslcmop_update["statusEnteredTime"] = time()
1260 if db_nslcmop_update:
1261 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1262 if db_nsr:
1263 db_nsr_update["_admin.nslcmop"] = None
1264 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1265 if nslcmop_operation_state:
1266 try:
1267 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1268 "operationState": nslcmop_operation_state})
1269 except Exception as e:
1270 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1271 self.logger.debug(logging_text + "Exit")
1272 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1273
1274 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1275 primitive, primitive_params):
1276
1277 for vca_deployed in db_deployed["VCA"]:
1278 if not vca_deployed:
1279 continue
1280 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1281 continue
1282 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1283 continue
1284 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1285 continue
1286 break
1287 else:
1288 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1289 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1290 model_name = vca_deployed.get("model")
1291 application_name = vca_deployed.get("application")
1292 if not model_name or not application_name:
1293 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1294 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1295 if vca_deployed["operational-status"] != "active":
1296 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1297 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1298 callback = None # self.n2vc_callback
1299 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1300 await self.n2vc.login()
1301 task = asyncio.ensure_future(
1302 self.n2vc.ExecutePrimitive(
1303 model_name,
1304 application_name,
1305 primitive, callback,
1306 *callback_args,
1307 **primitive_params
1308 )
1309 )
1310 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1311 # db_nsr, db_nslcmop, member_vnf_index))
1312 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1313 # wait until completed with timeout
1314 await asyncio.wait((task,), timeout=600)
1315
1316 result = "FAILED" # by default
1317 result_detail = ""
1318 if task.cancelled():
1319 result_detail = "Task has been cancelled"
1320 elif task.done():
1321 exc = task.exception()
1322 if exc:
1323 result_detail = str(exc)
1324 else:
1325 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1326 result = "COMPLETED"
1327 result_detail = "Done"
1328 else: # timeout
1329 # TODO Should it be cancelled?!!
1330 task.cancel()
1331 result_detail = "timeout"
1332 return result, result_detail
1333
1334 async def action(self, nsr_id, nslcmop_id):
1335 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1336 self.logger.debug(logging_text + "Enter")
1337 # get all needed from database
1338 db_nsr = None
1339 db_nslcmop = None
1340 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1341 db_nslcmop_update = {}
1342 nslcmop_operation_state = None
1343 exc = None
1344 try:
1345 step = "Getting information from database"
1346 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1347 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1348 nsr_deployed = db_nsr["_admin"].get("deployed")
1349 nsr_name = db_nsr["name"]
1350 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1351 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1352 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1353 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1354
1355 # look if previous tasks in process
1356 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1357 if task_dependency:
1358 step = db_nslcmop_update["detailed-status"] = \
1359 "Waiting for related tasks to be completed: {}".format(task_name)
1360 self.logger.debug(logging_text + step)
1361 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1362 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1363 if pending:
1364 raise LcmException("Timeout waiting related tasks to be completed")
1365
1366 # for backward compatibility
1367 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1368 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1369 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1370 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1371
1372 # TODO check if ns is in a proper status
1373 primitive = db_nslcmop["operationParams"]["primitive"]
1374 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1375 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1376 vdu_name, vdu_count_index, primitive,
1377 primitive_params)
1378 db_nslcmop_update["detailed-status"] = result_detail
1379 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1380 db_nslcmop_update["statusEnteredTime"] = time()
1381 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1382 return # database update is called inside finally
1383
1384 except (DbException, LcmException) as e:
1385 self.logger.error(logging_text + "Exit Exception {}".format(e))
1386 exc = e
1387 except asyncio.CancelledError:
1388 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1389 exc = "Operation was cancelled"
1390 except Exception as e:
1391 exc = traceback.format_exc()
1392 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1393 finally:
1394 if exc and db_nslcmop:
1395 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1396 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1397 db_nslcmop_update["statusEnteredTime"] = time()
1398 if db_nslcmop_update:
1399 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1400 if db_nsr:
1401 db_nsr_update["_admin.nslcmop"] = None
1402 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1403 self.logger.debug(logging_text + "Exit")
1404 if nslcmop_operation_state:
1405 try:
1406 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1407 "operationState": nslcmop_operation_state})
1408 except Exception as e:
1409 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1410 self.logger.debug(logging_text + "Exit")
1411 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1412
1413 async def scale(self, nsr_id, nslcmop_id):
1414 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1415 self.logger.debug(logging_text + "Enter")
1416 # get all needed from database
1417 db_nsr = None
1418 db_nslcmop = None
1419 db_nslcmop_update = {}
1420 nslcmop_operation_state = None
1421 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1422 exc = None
1423 # in case of error, indicates what part of scale was failed to put nsr at error status
1424 scale_process = None
1425 old_operational_status = ""
1426 old_config_status = ""
1427 vnfr_scaled = False
1428 try:
1429 step = "Getting nslcmop from database"
1430 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1431 step = "Getting nsr from database"
1432 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1433 nsr_name = db_nsr["name"]
1434 old_operational_status = db_nsr["operational-status"]
1435 old_config_status = db_nsr["config-status"]
1436
1437 # look if previous tasks in process
1438 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1439 if task_dependency:
1440 step = db_nslcmop_update["detailed-status"] = \
1441 "Waiting for related tasks to be completed: {}".format(task_name)
1442 self.logger.debug(logging_text + step)
1443 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1444 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1445 if pending:
1446 raise LcmException("Timeout waiting related tasks to be completed")
1447
1448 step = "Parsing scaling parameters"
1449 db_nsr_update["operational-status"] = "scaling"
1450 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1451 nsr_deployed = db_nsr["_admin"].get("deployed")
1452 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1453 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1454 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1455 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1456 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1457
1458 # for backward compatibility
1459 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1460 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1461 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1462 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1463
1464 step = "Getting vnfr from database"
1465 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1466 step = "Getting vnfd from database"
1467 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1468 step = "Getting scaling-group-descriptor"
1469 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1470 if scaling_descriptor["name"] == scaling_group:
1471 break
1472 else:
1473 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1474 "at vnfd:scaling-group-descriptor".format(scaling_group))
1475 # cooldown_time = 0
1476 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1477 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1478 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1479 # break
1480
1481 # TODO check if ns is in a proper status
1482 step = "Sending scale order to RO"
1483 nb_scale_op = 0
1484 if not db_nsr["_admin"].get("scaling-group"):
1485 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1486 admin_scale_index = 0
1487 else:
1488 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1489 if admin_scale_info["name"] == scaling_group:
1490 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1491 break
1492 else: # not found, set index one plus last element and add new entry with the name
1493 admin_scale_index += 1
1494 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1495 RO_scaling_info = []
1496 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1497 if scaling_type == "SCALE_OUT":
1498 # count if max-instance-count is reached
1499 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1500 max_instance_count = int(scaling_descriptor["max-instance-count"])
1501 if nb_scale_op >= max_instance_count:
1502 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1503 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1504 nb_scale_op = nb_scale_op + 1
1505 vdu_scaling_info["scaling_direction"] = "OUT"
1506 vdu_scaling_info["vdu-create"] = {}
1507 for vdu_scale_info in scaling_descriptor["vdu"]:
1508 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1509 "type": "create", "count": vdu_scale_info.get("count", 1)})
1510 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1511 elif scaling_type == "SCALE_IN":
1512 # count if min-instance-count is reached
1513 min_instance_count = 0
1514 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1515 min_instance_count = int(scaling_descriptor["min-instance-count"])
1516 if nb_scale_op <= min_instance_count:
1517 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1518 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1519 nb_scale_op = nb_scale_op - 1
1520 vdu_scaling_info["scaling_direction"] = "IN"
1521 vdu_scaling_info["vdu-delete"] = {}
1522 for vdu_scale_info in scaling_descriptor["vdu"]:
1523 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1524 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1525 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1526
1527 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1528 vdu_create = vdu_scaling_info.get("vdu-create")
1529 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1530 if vdu_scaling_info["scaling_direction"] == "IN":
1531 for vdur in reversed(db_vnfr["vdur"]):
1532 if vdu_delete.get(vdur["vdu-id-ref"]):
1533 vdu_delete[vdur["vdu-id-ref"]] -= 1
1534 vdu_scaling_info["vdu"].append({
1535 "name": vdur["name"],
1536 "vdu_id": vdur["vdu-id-ref"],
1537 "interface": []
1538 })
1539 for interface in vdur["interfaces"]:
1540 vdu_scaling_info["vdu"][-1]["interface"].append({
1541 "name": interface["name"],
1542 "ip_address": interface["ip-address"],
1543 "mac_address": interface.get("mac-address"),
1544 })
1545 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1546
1547 # execute primitive service PRE-SCALING
1548 step = "Executing pre-scale vnf-config-primitive"
1549 if scaling_descriptor.get("scaling-config-action"):
1550 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1551 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1552 and scaling_type == "SCALE_IN":
1553 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1554 step = db_nslcmop_update["detailed-status"] = \
1555 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1556 # look for primitive
1557 primitive_params = {}
1558 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1559 if config_primitive["name"] == vnf_config_primitive:
1560 for parameter in config_primitive.get("parameter", ()):
1561 if 'default-value' in parameter and \
1562 parameter['default-value'] == "<VDU_SCALE_INFO>":
1563 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1564 default_flow_style=True,
1565 width=256)
1566 break
1567 else:
1568 raise LcmException(
1569 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1570 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1571 "primitive".format(scaling_group, config_primitive))
1572 scale_process = "VCA"
1573 db_nsr_update["config-status"] = "configuring pre-scaling"
1574 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1575 None, None, None, vnf_config_primitive,
1576 primitive_params)
1577 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1578 vnf_config_primitive, result, result_detail))
1579 if result == "FAILED":
1580 raise LcmException(result_detail)
1581 db_nsr_update["config-status"] = old_config_status
1582 scale_process = None
1583
1584 if RO_scaling_info:
1585 scale_process = "RO"
1586 RO = ROclient.ROClient(self.loop, **self.ro_config)
1587 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1588 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1589 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1590 # wait until ready
1591 RO_nslcmop_id = RO_desc["instance_action_id"]
1592 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1593
1594 RO_task_done = False
1595 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1596 detailed_status_old = None
1597 self.logger.debug(logging_text + step)
1598
1599 deployment_timeout = 1 * 3600 # One hour
1600 while deployment_timeout > 0:
1601 if not RO_task_done:
1602 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1603 extra_item_id=RO_nslcmop_id)
1604 ns_status, ns_status_info = RO.check_action_status(desc)
1605 if ns_status == "ERROR":
1606 raise ROclient.ROClientException(ns_status_info)
1607 elif ns_status == "BUILD":
1608 detailed_status = step + "; {}".format(ns_status_info)
1609 elif ns_status == "ACTIVE":
1610 RO_task_done = True
1611 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1612 self.logger.debug(logging_text + step)
1613 else:
1614 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1615 else:
1616 desc = await RO.show("ns", RO_nsr_id)
1617 ns_status, ns_status_info = RO.check_ns_status(desc)
1618 if ns_status == "ERROR":
1619 raise ROclient.ROClientException(ns_status_info)
1620 elif ns_status == "BUILD":
1621 detailed_status = step + "; {}".format(ns_status_info)
1622 elif ns_status == "ACTIVE":
1623 step = detailed_status = \
1624 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1625 if not vnfr_scaled:
1626 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1627 vnfr_scaled = True
1628 try:
1629 desc = await RO.show("ns", RO_nsr_id)
1630 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1631 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1632 break
1633 except LcmExceptionNoMgmtIP:
1634 pass
1635 else:
1636 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1637 if detailed_status != detailed_status_old:
1638 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1639 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1640
1641 await asyncio.sleep(5, loop=self.loop)
1642 deployment_timeout -= 5
1643 if deployment_timeout <= 0:
1644 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1645
1646 # update VDU_SCALING_INFO with the obtained ip_addresses
1647 if vdu_scaling_info["scaling_direction"] == "OUT":
1648 for vdur in reversed(db_vnfr["vdur"]):
1649 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1650 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1651 vdu_scaling_info["vdu"].append({
1652 "name": vdur["name"],
1653 "vdu_id": vdur["vdu-id-ref"],
1654 "interface": []
1655 })
1656 for interface in vdur["interfaces"]:
1657 vdu_scaling_info["vdu"][-1]["interface"].append({
1658 "name": interface["name"],
1659 "ip_address": interface["ip-address"],
1660 "mac_address": interface.get("mac-address"),
1661 })
1662 del vdu_scaling_info["vdu-create"]
1663
1664 scale_process = None
1665 if db_nsr_update:
1666 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1667
1668 # execute primitive service POST-SCALING
1669 step = "Executing post-scale vnf-config-primitive"
1670 if scaling_descriptor.get("scaling-config-action"):
1671 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1672 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1673 and scaling_type == "SCALE_OUT":
1674 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1675 step = db_nslcmop_update["detailed-status"] = \
1676 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1677 # look for primitive
1678 primitive_params = {}
1679 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1680 if config_primitive["name"] == vnf_config_primitive:
1681 for parameter in config_primitive.get("parameter", ()):
1682 if 'default-value' in parameter and \
1683 parameter['default-value'] == "<VDU_SCALE_INFO>":
1684 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1685 default_flow_style=True,
1686 width=256)
1687 break
1688 else:
1689 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1690 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1691 "match any vnf-configuration:config-primitive".format(scaling_group,
1692 config_primitive))
1693 scale_process = "VCA"
1694 db_nsr_update["config-status"] = "configuring post-scaling"
1695
1696 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1697 None, None, None, vnf_config_primitive,
1698 primitive_params)
1699 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1700 vnf_config_primitive, result, result_detail))
1701 if result == "FAILED":
1702 raise LcmException(result_detail)
1703 db_nsr_update["config-status"] = old_config_status
1704 scale_process = None
1705
1706 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1707 db_nslcmop_update["statusEnteredTime"] = time()
1708 db_nslcmop_update["detailed-status"] = "done"
1709 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1710 db_nsr_update["operational-status"] = old_operational_status
1711 db_nsr_update["config-status"] = old_config_status
1712 return
1713 except (ROclient.ROClientException, DbException, LcmException) as e:
1714 self.logger.error(logging_text + "Exit Exception {}".format(e))
1715 exc = e
1716 except asyncio.CancelledError:
1717 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1718 exc = "Operation was cancelled"
1719 except Exception as e:
1720 exc = traceback.format_exc()
1721 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1722 finally:
1723 if exc:
1724 if db_nslcmop:
1725 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1726 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1727 db_nslcmop_update["statusEnteredTime"] = time()
1728 if db_nsr:
1729 db_nsr_update["operational-status"] = old_operational_status
1730 db_nsr_update["config-status"] = old_config_status
1731 db_nsr_update["detailed-status"] = ""
1732 db_nsr_update["_admin.nslcmop"] = None
1733 if scale_process:
1734 if "VCA" in scale_process:
1735 db_nsr_update["config-status"] = "failed"
1736 if "RO" in scale_process:
1737 db_nsr_update["operational-status"] = "failed"
1738 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1739 exc)
1740 if db_nslcmop_update:
1741 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1742 if db_nsr:
1743 db_nsr_update["_admin.nslcmop"] = None
1744 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1745 if nslcmop_operation_state:
1746 try:
1747 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1748 "operationState": nslcmop_operation_state})
1749 # if cooldown_time:
1750 # await asyncio.sleep(cooldown_time)
1751 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1752 except Exception as e:
1753 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1754 self.logger.debug(logging_text + "Exit")
1755 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")