bug 544 Adding license headers
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25
26 import ROclient
27 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
28
29 from osm_common.dbbase import DbException
30 from osm_common.fsbase import FsException
31 from n2vc.vnf import N2VC
32
33 from copy import copy, deepcopy
34 from http import HTTPStatus
35 from time import time
36 from uuid import uuid4
37
38 __author__ = "Alfonso Tierno"
39
40
41 def get_iterable(in_dict, in_key):
42 """
43 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
44 :param in_dict: a dictionary
45 :param in_key: the key to look for at in_dict
46 :return: in_dict[in_var] or () if it is None or not present
47 """
48 if not in_dict.get(in_key):
49 return ()
50 return in_dict[in_key]
51
52
53 def populate_dict(target_dict, key_list, value):
54 """
55 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
56 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
57 :param target_dict: dictionary to be changed
58 :param key_list: list of keys to insert at target_dict
59 :param value:
60 :return: None
61 """
62 for key in key_list[0:-1]:
63 if key not in target_dict:
64 target_dict[key] = {}
65 target_dict = target_dict[key]
66 target_dict[key_list[-1]] = value
67
68
69 class NsLcm(LcmBase):
70 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
71 total_deploy_timeout = 2 * 3600 # global timeout for deployment
72
73 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
74 """
75 Init, Connect to database, filesystem storage, and messaging
76 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
77 :return: None
78 """
79 # logging
80 self.logger = logging.getLogger('lcm.ns')
81 self.loop = loop
82 self.lcm_tasks = lcm_tasks
83
84 super().__init__(db, msg, fs, self.logger)
85
86 self.ro_config = ro_config
87
88 self.n2vc = N2VC(
89 log=self.logger,
90 server=vca_config['host'],
91 port=vca_config['port'],
92 user=vca_config['user'],
93 secret=vca_config['secret'],
94 # TODO: This should point to the base folder where charms are stored,
95 # if there is a common one (like object storage). Otherwise, leave
96 # it unset and pass it via DeployCharms
97 # artifacts=vca_config[''],
98 artifacts=None,
99 )
100
101 def vnfd2RO(self, vnfd, new_id=None):
102 """
103 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
104 :param vnfd: input vnfd
105 :param new_id: overrides vnf id if provided
106 :return: copy of vnfd
107 """
108 ci_file = None
109 try:
110 vnfd_RO = deepcopy(vnfd)
111 vnfd_RO.pop("_id", None)
112 vnfd_RO.pop("_admin", None)
113 if new_id:
114 vnfd_RO["id"] = new_id
115 for vdu in vnfd_RO.get("vdu", ()):
116 if "cloud-init-file" in vdu:
117 base_folder = vnfd["_admin"]["storage"]
118 clout_init_file = "{}/{}/cloud_init/{}".format(
119 base_folder["folder"],
120 base_folder["pkg-dir"],
121 vdu["cloud-init-file"]
122 )
123 ci_file = self.fs.file_open(clout_init_file, "r")
124 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
125 # convert to base 64 or similar
126 clout_init_content = ci_file.read()
127 ci_file.close()
128 ci_file = None
129 vdu.pop("cloud-init-file", None)
130 vdu["cloud-init"] = clout_init_content
131 # remnove unused by RO configuration, monitoring, scaling
132 vnfd_RO.pop("vnf-configuration", None)
133 vnfd_RO.pop("monitoring-param", None)
134 vnfd_RO.pop("scaling-group-descriptor", None)
135 return vnfd_RO
136 except FsException as e:
137 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
138 finally:
139 if ci_file:
140 ci_file.close()
141
142 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
143 """
144 Callback both for charm status change and task completion
145 :param model_name: Charm model name
146 :param application_name: Charm application name
147 :param status: Can be
148 - blocked: The unit needs manual intervention
149 - maintenance: The unit is actively deploying/configuring
150 - waiting: The unit is waiting for another charm to be ready
151 - active: The unit is deployed, configured, and ready
152 - error: The charm has failed and needs attention.
153 - terminated: The charm has been destroyed
154 - removing,
155 - removed
156 :param message: detailed message error
157 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
158 nsr_id:
159 nslcmop_id:
160 lcmOperationType: currently "instantiate"
161 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
162 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
163 n2vc_event: event used to notify instantiation task that some change has been produced
164 :param task: None for charm status change, or task for completion task callback
165 :return:
166 """
167 try:
168 nsr_id = n2vc_info["nsr_id"]
169 deployed = n2vc_info["deployed"]
170 db_nsr_update = n2vc_info["db_update"]
171 nslcmop_id = n2vc_info["nslcmop_id"]
172 ns_operation = n2vc_info["lcmOperationType"]
173 n2vc_event = n2vc_info["n2vc_event"]
174 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
175 application_name)
176 for vca_index, vca_deployed in enumerate(deployed):
177 if not vca_deployed:
178 continue
179 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
180 break
181 else:
182 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
183 return
184 if task:
185 if task.cancelled():
186 self.logger.debug(logging_text + " task Cancelled")
187 vca_deployed['operational-status'] = "error"
188 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
189 vca_deployed['detailed-status'] = "Task Cancelled"
190 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
191
192 elif task.done():
193 exc = task.exception()
194 if exc:
195 self.logger.error(logging_text + " task Exception={}".format(exc))
196 vca_deployed['operational-status'] = "error"
197 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
198 vca_deployed['detailed-status'] = str(exc)
199 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
200 else:
201 self.logger.debug(logging_text + " task Done")
202 # task is Done, but callback is still ongoing. So ignore
203 return
204 elif status:
205 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
206 if vca_deployed['operational-status'] == status:
207 return # same status, ignore
208 vca_deployed['operational-status'] = status
209 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
210 vca_deployed['detailed-status'] = str(message)
211 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
212 else:
213 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
214 return
215 # wake up instantiate task
216 n2vc_event.set()
217 except Exception as e:
218 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
219
220 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
221 """
222 Creates a RO ns descriptor from OSM ns_instantiate params
223 :param ns_params: OSM instantiate params
224 :return: The RO ns descriptor
225 """
226 vim_2_RO = {}
227 # TODO feature 1417: Check that no instantiation is set over PDU
228 # check if PDU forces a concrete vim-network-id and add it
229 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
230
231 def vim_account_2_RO(vim_account):
232 if vim_account in vim_2_RO:
233 return vim_2_RO[vim_account]
234
235 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
236 if db_vim["_admin"]["operationalState"] != "ENABLED":
237 raise LcmException("VIM={} is not available. operationalState={}".format(
238 vim_account, db_vim["_admin"]["operationalState"]))
239 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
240 vim_2_RO[vim_account] = RO_vim_id
241 return RO_vim_id
242
243 def ip_profile_2_RO(ip_profile):
244 RO_ip_profile = deepcopy((ip_profile))
245 if "dns-server" in RO_ip_profile:
246 if isinstance(RO_ip_profile["dns-server"], list):
247 RO_ip_profile["dns-address"] = []
248 for ds in RO_ip_profile.pop("dns-server"):
249 RO_ip_profile["dns-address"].append(ds['address'])
250 else:
251 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
252 if RO_ip_profile.get("ip-version") == "ipv4":
253 RO_ip_profile["ip-version"] = "IPv4"
254 if RO_ip_profile.get("ip-version") == "ipv6":
255 RO_ip_profile["ip-version"] = "IPv6"
256 if "dhcp-params" in RO_ip_profile:
257 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
258 return RO_ip_profile
259
260 if not ns_params:
261 return None
262 RO_ns_params = {
263 # "name": ns_params["nsName"],
264 # "description": ns_params.get("nsDescription"),
265 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
266 # "scenario": ns_params["nsdId"],
267 }
268 if n2vc_key_list:
269 for vnfd_ref, vnfd in vnfd_dict.items():
270 vdu_needed_access = []
271 mgmt_cp = None
272 if vnfd.get("vnf-configuration"):
273 if vnfd.get("mgmt-interface"):
274 if vnfd["mgmt-interface"].get("vdu-id"):
275 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
276 elif vnfd["mgmt-interface"].get("cp"):
277 mgmt_cp = vnfd["mgmt-interface"]["cp"]
278
279 for vdu in vnfd.get("vdu", ()):
280 if vdu.get("vdu-configuration"):
281 vdu_needed_access.append(vdu["id"])
282 elif mgmt_cp:
283 for vdu_interface in vdu.get("interface"):
284 if vdu_interface.get("external-connection-point-ref") and \
285 vdu_interface["external-connection-point-ref"] == mgmt_cp:
286 vdu_needed_access.append(vdu["id"])
287 mgmt_cp = None
288 break
289
290 if vdu_needed_access:
291 for vnf_member in nsd.get("constituent-vnfd"):
292 if vnf_member["vnfd-id-ref"] != vnfd_ref:
293 continue
294 for vdu in vdu_needed_access:
295 populate_dict(RO_ns_params,
296 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
297 n2vc_key_list)
298
299 if ns_params.get("vduImage"):
300 RO_ns_params["vduImage"] = ns_params["vduImage"]
301
302 if ns_params.get("ssh_keys"):
303 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
304 for vnf_params in get_iterable(ns_params, "vnf"):
305 for constituent_vnfd in nsd["constituent-vnfd"]:
306 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
307 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
308 break
309 else:
310 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
311 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
312 if vnf_params.get("vimAccountId"):
313 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
314 vim_account_2_RO(vnf_params["vimAccountId"]))
315
316 for vdu_params in get_iterable(vnf_params, "vdu"):
317 # TODO feature 1417: check that this VDU exist and it is not a PDU
318 if vdu_params.get("volume"):
319 for volume_params in vdu_params["volume"]:
320 if volume_params.get("vim-volume-id"):
321 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
322 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
323 volume_params["vim-volume-id"])
324 if vdu_params.get("interface"):
325 for interface_params in vdu_params["interface"]:
326 if interface_params.get("ip-address"):
327 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
328 vdu_params["id"], "interfaces", interface_params["name"],
329 "ip_address"),
330 interface_params["ip-address"])
331 if interface_params.get("mac-address"):
332 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
333 vdu_params["id"], "interfaces", interface_params["name"],
334 "mac_address"),
335 interface_params["mac-address"])
336 if interface_params.get("floating-ip-required"):
337 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
338 vdu_params["id"], "interfaces", interface_params["name"],
339 "floating-ip"),
340 interface_params["floating-ip-required"])
341
342 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
343 if internal_vld_params.get("vim-network-name"):
344 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
345 internal_vld_params["name"], "vim-network-name"),
346 internal_vld_params["vim-network-name"])
347 if internal_vld_params.get("ip-profile"):
348 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
349 internal_vld_params["name"], "ip-profile"),
350 ip_profile_2_RO(internal_vld_params["ip-profile"]))
351
352 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
353 # look for interface
354 iface_found = False
355 for vdu_descriptor in vnf_descriptor["vdu"]:
356 for vdu_interface in vdu_descriptor["interface"]:
357 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
358 if icp_params.get("ip-address"):
359 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
360 vdu_descriptor["id"], "interfaces",
361 vdu_interface["name"], "ip_address"),
362 icp_params["ip-address"])
363
364 if icp_params.get("mac-address"):
365 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
366 vdu_descriptor["id"], "interfaces",
367 vdu_interface["name"], "mac_address"),
368 icp_params["mac-address"])
369 iface_found = True
370 break
371 if iface_found:
372 break
373 else:
374 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
375 "internal-vld:id-ref={} is not present at vnfd:internal-"
376 "connection-point".format(vnf_params["member-vnf-index"],
377 icp_params["id-ref"]))
378
379 for vld_params in get_iterable(ns_params, "vld"):
380 if "ip-profile" in vld_params:
381 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
382 ip_profile_2_RO(vld_params["ip-profile"]))
383 if vld_params.get("vim-network-name"):
384 RO_vld_sites = []
385 if isinstance(vld_params["vim-network-name"], dict):
386 for vim_account, vim_net in vld_params["vim-network-name"].items():
387 RO_vld_sites.append({
388 "netmap-use": vim_net,
389 "datacenter": vim_account_2_RO(vim_account)
390 })
391 else: # isinstance str
392 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
393 if RO_vld_sites:
394 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
395 if "vnfd-connection-point-ref" in vld_params:
396 for cp_params in vld_params["vnfd-connection-point-ref"]:
397 # look for interface
398 for constituent_vnfd in nsd["constituent-vnfd"]:
399 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
400 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
401 break
402 else:
403 raise LcmException(
404 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
405 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
406 match_cp = False
407 for vdu_descriptor in vnf_descriptor["vdu"]:
408 for interface_descriptor in vdu_descriptor["interface"]:
409 if interface_descriptor.get("external-connection-point-ref") == \
410 cp_params["vnfd-connection-point-ref"]:
411 match_cp = True
412 break
413 if match_cp:
414 break
415 else:
416 raise LcmException(
417 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
418 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
419 cp_params["member-vnf-index-ref"],
420 cp_params["vnfd-connection-point-ref"],
421 vnf_descriptor["id"]))
422 if cp_params.get("ip-address"):
423 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
424 vdu_descriptor["id"], "interfaces",
425 interface_descriptor["name"], "ip_address"),
426 cp_params["ip-address"])
427 if cp_params.get("mac-address"):
428 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
429 vdu_descriptor["id"], "interfaces",
430 interface_descriptor["name"], "mac_address"),
431 cp_params["mac-address"])
432 return RO_ns_params
433
434 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
435 # make a copy to do not change
436 vdu_create = copy(vdu_create)
437 vdu_delete = copy(vdu_delete)
438
439 vdurs = db_vnfr.get("vdur")
440 if vdurs is None:
441 vdurs = []
442 vdu_index = len(vdurs)
443 while vdu_index:
444 vdu_index -= 1
445 vdur = vdurs[vdu_index]
446 if vdur.get("pdu-type"):
447 continue
448 vdu_id_ref = vdur["vdu-id-ref"]
449 if vdu_create and vdu_create.get(vdu_id_ref):
450 for index in range(0, vdu_create[vdu_id_ref]):
451 vdur = deepcopy(vdur)
452 vdur["_id"] = str(uuid4())
453 vdur["count-index"] += 1
454 vdurs.insert(vdu_index+1+index, vdur)
455 del vdu_create[vdu_id_ref]
456 if vdu_delete and vdu_delete.get(vdu_id_ref):
457 del vdurs[vdu_index]
458 vdu_delete[vdu_id_ref] -= 1
459 if not vdu_delete[vdu_id_ref]:
460 del vdu_delete[vdu_id_ref]
461 # check all operations are done
462 if vdu_create or vdu_delete:
463 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
464 vdu_create))
465 if vdu_delete:
466 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
467 vdu_delete))
468
469 vnfr_update = {"vdur": vdurs}
470 db_vnfr["vdur"] = vdurs
471 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
472
473 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
474 """
475 Updates database nsr with the RO info for the created vld
476 :param ns_update_nsr: dictionary to be filled with the updated info
477 :param db_nsr: content of db_nsr. This is also modified
478 :param nsr_desc_RO: nsr descriptor from RO
479 :return: Nothing, LcmException is raised on errors
480 """
481
482 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
483 for net_RO in get_iterable(nsr_desc_RO, "nets"):
484 if vld["id"] != net_RO.get("ns_net_osm_id"):
485 continue
486 vld["vim-id"] = net_RO.get("vim_net_id")
487 vld["name"] = net_RO.get("vim_name")
488 vld["status"] = net_RO.get("status")
489 vld["status-detailed"] = net_RO.get("error_msg")
490 ns_update_nsr["vld.{}".format(vld_index)] = vld
491 break
492 else:
493 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
494
495 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
496 """
497 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
498 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
499 :param nsr_desc_RO: nsr descriptor from RO
500 :return: Nothing, LcmException is raised on errors
501 """
502 for vnf_index, db_vnfr in db_vnfrs.items():
503 for vnf_RO in nsr_desc_RO["vnfs"]:
504 if vnf_RO["member_vnf_index"] != vnf_index:
505 continue
506 vnfr_update = {}
507 if vnf_RO.get("ip_address"):
508 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
509 elif not db_vnfr.get("ip-address"):
510 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
511
512 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
513 vdur_RO_count_index = 0
514 if vdur.get("pdu-type"):
515 continue
516 for vdur_RO in get_iterable(vnf_RO, "vms"):
517 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
518 continue
519 if vdur["count-index"] != vdur_RO_count_index:
520 vdur_RO_count_index += 1
521 continue
522 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
523 vdur["ip-address"] = vdur_RO.get("ip_address")
524 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
525 vdur["name"] = vdur_RO.get("vim_name")
526 vdur["status"] = vdur_RO.get("status")
527 vdur["status-detailed"] = vdur_RO.get("error_msg")
528 for ifacer in get_iterable(vdur, "interfaces"):
529 for interface_RO in get_iterable(vdur_RO, "interfaces"):
530 if ifacer["name"] == interface_RO.get("internal_name"):
531 ifacer["ip-address"] = interface_RO.get("ip_address")
532 ifacer["mac-address"] = interface_RO.get("mac_address")
533 break
534 else:
535 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
536 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
537 vnfr_update["vdur.{}".format(vdu_index)] = vdur
538 break
539 else:
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
541 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
542
543 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
544 for net_RO in get_iterable(nsr_desc_RO, "nets"):
545 if vld["id"] != net_RO.get("vnf_net_osm_id"):
546 continue
547 vld["vim-id"] = net_RO.get("vim_net_id")
548 vld["name"] = net_RO.get("vim_name")
549 vld["status"] = net_RO.get("status")
550 vld["status-detailed"] = net_RO.get("error_msg")
551 vnfr_update["vld.{}".format(vld_index)] = vld
552 break
553 else:
554 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
555 vnf_index, vld["id"]))
556
557 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
558 break
559
560 else:
561 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
562
563 async def instantiate(self, nsr_id, nslcmop_id):
564 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
565 self.logger.debug(logging_text + "Enter")
566 # get all needed from database
567 start_deploy = time()
568 db_nsr = None
569 db_nslcmop = None
570 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
571 db_nslcmop_update = {}
572 nslcmop_operation_state = None
573 db_vnfrs = {}
574 RO_descriptor_number = 0 # number of descriptors created at RO
575 descriptor_id_2_RO = {} # map between vnfd/nsd id to the id used at RO
576 n2vc_info = {}
577 exc = None
578 try:
579 step = "Getting nslcmop={} from db".format(nslcmop_id)
580 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
581 step = "Getting nsr={} from db".format(nsr_id)
582 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
583 ns_params = db_nslcmop.get("operationParams")
584 nsd = db_nsr["nsd"]
585 nsr_name = db_nsr["name"] # TODO short-name??
586
587 # look if previous tasks in process
588 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
589 if task_dependency:
590 step = db_nslcmop_update["detailed-status"] = \
591 "Waiting for related tasks to be completed: {}".format(task_name)
592 self.logger.debug(logging_text + step)
593 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
594 _, pending = await asyncio.wait(task_dependency, timeout=3600)
595 if pending:
596 raise LcmException("Timeout waiting related tasks to be completed")
597
598 step = "Getting vnfrs from db"
599 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
600 db_vnfds_ref = {}
601 db_vnfds = {}
602 for vnfr in db_vnfrs_list:
603 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
604 vnfd_id = vnfr["vnfd-id"]
605 vnfd_ref = vnfr["vnfd-ref"]
606 if vnfd_id not in db_vnfds:
607 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
608 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
609 db_vnfds_ref[vnfd_ref] = vnfd
610 db_vnfds[vnfd_id] = vnfd
611
612 # Get or generates the _admin.deployed,VCA list
613 vca_deployed_list = None
614 if db_nsr["_admin"].get("deployed"):
615 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
616 if vca_deployed_list is None:
617 vca_deployed_list = []
618 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
619 elif isinstance(vca_deployed_list, dict):
620 # maintain backward compatibility. Change a dict to list at database
621 vca_deployed_list = list(vca_deployed_list.values())
622 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
623
624 db_nsr_update["detailed-status"] = "creating"
625 db_nsr_update["operational-status"] = "init"
626
627 RO = ROclient.ROClient(self.loop, **self.ro_config)
628
629 # get vnfds, instantiate at RO
630 for vnfd_id, vnfd in db_vnfds.items():
631 vnfd_ref = vnfd["id"]
632 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
633 # self.logger.debug(logging_text + step)
634 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, vnfd_ref[:23])
635 descriptor_id_2_RO[vnfd_ref] = vnfd_id_RO
636 RO_descriptor_number += 1
637
638 # look if present
639 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
640 if vnfd_list:
641 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
642 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
643 vnfd_ref, vnfd_list[0]["uuid"]))
644 else:
645 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
646 desc = await RO.create("vnfd", descriptor=vnfd_RO)
647 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
648 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
649 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
650 vnfd_ref, desc["uuid"]))
651 self.update_db_2("nsrs", nsr_id, db_nsr_update)
652
653 # create nsd at RO
654 nsd_ref = nsd["id"]
655 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
656 # self.logger.debug(logging_text + step)
657
658 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
659 descriptor_id_2_RO[nsd_ref] = RO_osm_nsd_id
660 RO_descriptor_number += 1
661 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
662 if nsd_list:
663 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
664 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
665 nsd_ref, RO_nsd_uuid))
666 else:
667 nsd_RO = deepcopy(nsd)
668 nsd_RO["id"] = RO_osm_nsd_id
669 nsd_RO.pop("_id", None)
670 nsd_RO.pop("_admin", None)
671 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
672 vnfd_id = c_vnf["vnfd-id-ref"]
673 c_vnf["vnfd-id-ref"] = descriptor_id_2_RO[vnfd_id]
674 desc = await RO.create("nsd", descriptor=nsd_RO)
675 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
676 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
677 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
678 self.update_db_2("nsrs", nsr_id, db_nsr_update)
679
680 # Crate ns at RO
681 # if present use it unless in error status
682 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
683 if RO_nsr_id:
684 try:
685 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
686 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
687 desc = await RO.show("ns", RO_nsr_id)
688 except ROclient.ROClientException as e:
689 if e.http_code != HTTPStatus.NOT_FOUND:
690 raise
691 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
692 if RO_nsr_id:
693 ns_status, ns_status_info = RO.check_ns_status(desc)
694 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
695 if ns_status == "ERROR":
696 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
697 self.logger.debug(logging_text + step)
698 await RO.delete("ns", RO_nsr_id)
699 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
700 if not RO_nsr_id:
701 step = db_nsr_update["detailed-status"] = "Checking dependencies"
702 # self.logger.debug(logging_text + step)
703
704 # check if VIM is creating and wait look if previous tasks in process
705 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
706 if task_dependency:
707 step = "Waiting for related tasks to be completed: {}".format(task_name)
708 self.logger.debug(logging_text + step)
709 await asyncio.wait(task_dependency, timeout=3600)
710 if ns_params.get("vnf"):
711 for vnf in ns_params["vnf"]:
712 if "vimAccountId" in vnf:
713 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
714 vnf["vimAccountId"])
715 if task_dependency:
716 step = "Waiting for related tasks to be completed: {}".format(task_name)
717 self.logger.debug(logging_text + step)
718 await asyncio.wait(task_dependency, timeout=3600)
719
720 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
721
722 # feature 1429. Add n2vc public key to needed VMs
723 n2vc_key = await self.n2vc.GetPublicKey()
724 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
725
726 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
727 desc = await RO.create("ns", descriptor=RO_ns_params,
728 name=db_nsr["name"],
729 scenario=RO_nsd_uuid)
730 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
731 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
732 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
733 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
734 self.update_db_2("nsrs", nsr_id, db_nsr_update)
735
736 # wait until NS is ready
737 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
738 detailed_status_old = None
739 self.logger.debug(logging_text + step)
740
741 while time() <= start_deploy + self.total_deploy_timeout:
742 desc = await RO.show("ns", RO_nsr_id)
743 ns_status, ns_status_info = RO.check_ns_status(desc)
744 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
745 if ns_status == "ERROR":
746 raise ROclient.ROClientException(ns_status_info)
747 elif ns_status == "BUILD":
748 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
749 elif ns_status == "ACTIVE":
750 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
751 try:
752 self.ns_update_vnfr(db_vnfrs, desc)
753 break
754 except LcmExceptionNoMgmtIP:
755 pass
756 else:
757 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
758 if detailed_status != detailed_status_old:
759 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
760 self.update_db_2("nsrs", nsr_id, db_nsr_update)
761 await asyncio.sleep(5, loop=self.loop)
762 else: # total_deploy_timeout
763 raise ROclient.ROClientException("Timeout waiting ns to be ready")
764
765 step = "Updating NSR"
766 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
767
768 db_nsr["detailed-status"] = "Configuring vnfr"
769 self.update_db_2("nsrs", nsr_id, db_nsr_update)
770
771 # The parameters we'll need to deploy a charm
772 number_to_configure = 0
773
774 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, mgmt_ip_address, n2vc_info,
775 config_primitive=None):
776 """An inner function to deploy the charm from either vnf or vdu
777 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
778 """
779 if not mgmt_ip_address:
780 raise LcmException("vnfd/vdu has not management ip address to configure it")
781 # Login to the VCA.
782 # if number_to_configure == 0:
783 # self.logger.debug("Logging into N2VC...")
784 # task = asyncio.ensure_future(self.n2vc.login())
785 # yield from asyncio.wait_for(task, 30.0)
786 # self.logger.debug("Logged into N2VC!")
787
788 # # await self.n2vc.login()
789
790 # Note: The charm needs to exist on disk at the location
791 # specified by charm_path.
792 base_folder = vnfd["_admin"]["storage"]
793 storage_params = self.fs.get_params()
794 charm_path = "{}{}/{}/charms/{}".format(
795 storage_params["path"],
796 base_folder["folder"],
797 base_folder["pkg-dir"],
798 proxy_charm
799 )
800
801 # Setup the runtime parameters for this VNF
802 params = {'rw_mgmt_ip': mgmt_ip_address}
803 if config_primitive:
804 params["initial-config-primitive"] = config_primitive
805
806 # ns_name will be ignored in the current version of N2VC
807 # but will be implemented for the next point release.
808 model_name = 'default' # TODO bug 581 : change to nsr_id
809 if vdu_id:
810 vdu_id_text = vdu_id
811 else:
812 vdu_id_text = "vnfd" # TODO bug 581 remove and add just an empty string ""
813 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
814 # TODO bug 581 Add "-" as a final argument
815
816 vca_index = len(vca_deployed_list)
817 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
818 # 26*26 charm in the same NS
819 # TODO bug 581 uncoment
820 # application_name = application_name[0:48]
821 # application_name += chr(97 + vca_index / 26) + chr(97 + vca_index % 26)
822 vca_deployed_ = {
823 "member-vnf-index": vnf_index,
824 "vdu_id": vdu_id,
825 "model": model_name,
826 "application": application_name,
827 "operational-status": "init",
828 "detailed-status": "",
829 "vnfd_id": vnfd_id,
830 "vdu_name": vdu_name,
831 "vdu_count_index": vdu_count_index,
832 }
833 vca_deployed_list.append(vca_deployed_)
834 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
835 self.update_db_2("nsrs", nsr_id, db_nsr_update)
836
837 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
838 proxy_charm))
839 if not n2vc_info:
840 n2vc_info["nsr_id"] = nsr_id
841 n2vc_info["nslcmop_id"] = nslcmop_id
842 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
843 n2vc_info["lcmOperationType"] = "instantiate"
844 n2vc_info["deployed"] = vca_deployed_list
845 n2vc_info["db_update"] = db_nsr_update
846 task = asyncio.ensure_future(
847 self.n2vc.DeployCharms(
848 model_name, # The network service name
849 application_name, # The application name
850 vnfd, # The vnf descriptor
851 charm_path, # Path to charm
852 params, # Runtime params, like mgmt ip
853 {}, # for native charms only
854 self.n2vc_callback, # Callback for status changes
855 n2vc_info, # Callback parameter
856 None, # Callback parameter (task)
857 )
858 )
859 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
860 n2vc_info))
861 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
862
863 step = "Looking for needed vnfd to configure"
864 self.logger.debug(logging_text + step)
865
866 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
867 vnfd_id = c_vnf["vnfd-id-ref"]
868 vnf_index = str(c_vnf["member-vnf-index"])
869 vnfd = db_vnfds_ref[vnfd_id]
870
871 # Check if this VNF has a charm configuration
872 vnf_config = vnfd.get("vnf-configuration")
873
874 if vnf_config and vnf_config.get("juju"):
875 proxy_charm = vnf_config["juju"]["charm"]
876 config_primitive = None
877
878 if proxy_charm:
879 if 'initial-config-primitive' in vnf_config:
880 config_primitive = vnf_config['initial-config-primitive']
881
882 # Login to the VCA. If there are multiple calls to login(),
883 # subsequent calls will be a nop and return immediately.
884 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
885 await self.n2vc.login()
886 deploy_charm(vnf_index, None, None, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info,
887 config_primitive)
888 number_to_configure += 1
889
890 # Deploy charms for each VDU that supports one.
891 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
892 vdu_config = vdu.get('vdu-configuration')
893 proxy_charm = None
894 config_primitive = None
895
896 if vdu_config and vdu_config.get("juju"):
897 proxy_charm = vdu_config["juju"]["charm"]
898
899 if 'initial-config-primitive' in vdu_config:
900 config_primitive = vdu_config['initial-config-primitive']
901
902 if proxy_charm:
903 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
904 await self.n2vc.login()
905 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
906 # TODO for the moment only first vdu_id contains a charm deployed
907 if vdur["vdu-id-ref"] != vdu["id"]:
908 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
909 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
910 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
911 vdur["ip-address"], n2vc_info, config_primitive)
912 number_to_configure += 1
913
914 db_nsr_update["operational-status"] = "running"
915 configuration_failed = False
916 if number_to_configure:
917 old_status = "configuring: init: {}".format(number_to_configure)
918 db_nsr_update["config-status"] = old_status
919 db_nsr_update["detailed-status"] = old_status
920 db_nslcmop_update["detailed-status"] = old_status
921
922 # wait until all are configured.
923 while time() <= start_deploy + self.total_deploy_timeout:
924 if db_nsr_update:
925 self.update_db_2("nsrs", nsr_id, db_nsr_update)
926 if db_nslcmop_update:
927 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
928 # TODO add a fake tast that set n2vc_event after some time
929 await n2vc_info["n2vc_event"].wait()
930 n2vc_info["n2vc_event"].clear()
931 all_active = True
932 status_map = {}
933 n2vc_error_text = [] # contain text error list. If empty no one is in error status
934 now = time()
935 for vca_deployed in vca_deployed_list:
936 vca_status = vca_deployed["operational-status"]
937 if vca_status not in status_map:
938 # Initialize it
939 status_map[vca_status] = 0
940 status_map[vca_status] += 1
941
942 if vca_status == "active":
943 vca_deployed.pop("time_first_error", None)
944 vca_deployed.pop("status_first_error", None)
945 continue
946
947 all_active = False
948 if vca_status in ("error", "blocked"):
949 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
950 # if not first time in this status error
951 if not vca_deployed.get("time_first_error"):
952 vca_deployed["time_first_error"] = now
953 continue
954 if vca_deployed.get("time_first_error") and \
955 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
956 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
957 .format(vca_deployed["member-vnf-index"],
958 vca_deployed["vdu_id"], vca_status,
959 vca_deployed["detailed-status-error"]))
960
961 if all_active:
962 break
963 elif n2vc_error_text:
964 db_nsr_update["config-status"] = "failed"
965 error_text = "fail configuring " + ";".join(n2vc_error_text)
966 db_nsr_update["detailed-status"] = error_text
967 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
968 db_nslcmop_update["detailed-status"] = error_text
969 db_nslcmop_update["statusEnteredTime"] = time()
970 configuration_failed = True
971 break
972 else:
973 cs = "configuring: "
974 separator = ""
975 for status, num in status_map.items():
976 cs += separator + "{}: {}".format(status, num)
977 separator = ", "
978 if old_status != cs:
979 db_nsr_update["config-status"] = cs
980 db_nsr_update["detailed-status"] = cs
981 db_nslcmop_update["detailed-status"] = cs
982 old_status = cs
983 else: # total_deploy_timeout
984 raise LcmException("Timeout waiting ns to be configured")
985
986 if not configuration_failed:
987 # all is done
988 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
989 db_nslcmop_update["statusEnteredTime"] = time()
990 db_nslcmop_update["detailed-status"] = "done"
991 db_nsr_update["config-status"] = "configured"
992 db_nsr_update["detailed-status"] = "done"
993
994 return
995
996 except (ROclient.ROClientException, DbException, LcmException) as e:
997 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
998 exc = e
999 except asyncio.CancelledError:
1000 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1001 exc = "Operation was cancelled"
1002 except Exception as e:
1003 exc = traceback.format_exc()
1004 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1005 exc_info=True)
1006 finally:
1007 if exc:
1008 if db_nsr:
1009 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1010 db_nsr_update["operational-status"] = "failed"
1011 if db_nslcmop:
1012 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1013 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1014 db_nslcmop_update["statusEnteredTime"] = time()
1015 if db_nsr:
1016 db_nsr_update["_admin.nslcmop"] = None
1017 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1018 if db_nslcmop_update:
1019 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1020 if nslcmop_operation_state:
1021 try:
1022 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1023 "operationState": nslcmop_operation_state})
1024 except Exception as e:
1025 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1026
1027 self.logger.debug(logging_text + "Exit")
1028 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1029
1030 async def terminate(self, nsr_id, nslcmop_id):
1031 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1032 self.logger.debug(logging_text + "Enter")
1033 db_nsr = None
1034 db_nslcmop = None
1035 exc = None
1036 failed_detail = [] # annotates all failed error messages
1037 vca_task_list = []
1038 vca_task_dict = {}
1039 vca_application_name2index = {}
1040 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1041 db_nslcmop_update = {}
1042 nslcmop_operation_state = None
1043 try:
1044 step = "Getting nslcmop={} from db".format(nslcmop_id)
1045 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1046 step = "Getting nsr={} from db".format(nsr_id)
1047 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1048 # nsd = db_nsr["nsd"]
1049 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1050 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1051 return
1052 # TODO ALF remove
1053 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1054 # #TODO check if VIM is creating and wait
1055 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1056
1057 db_nsr_update["operational-status"] = "terminating"
1058 db_nsr_update["config-status"] = "terminating"
1059
1060 if nsr_deployed and nsr_deployed.get("VCA"):
1061 try:
1062 step = "Scheduling configuration charms removing"
1063 db_nsr_update["detailed-status"] = "Deleting charms"
1064 self.logger.debug(logging_text + step)
1065 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1066 # for backward compatibility
1067 if isinstance(nsr_deployed["VCA"], dict):
1068 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1069 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1070 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1071
1072 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1073 if vca_deployed: # TODO it would be desirable having a and deploy_info.get("deployed"):
1074 task = asyncio.ensure_future(
1075 self.n2vc.RemoveCharms(
1076 vca_deployed['model'],
1077 vca_deployed["application"],
1078 # self.n2vc_callback,
1079 # db_nsr,
1080 # db_nslcmop,
1081 )
1082 )
1083 vca_application_name2index[vca_deployed["application"]] = vca_index
1084 vca_task_list.append(task)
1085 vca_task_dict[vca_deployed["application"]] = task
1086 # task.add_done_callback(functools.partial(self.n2vc_callback, vca_deployed['model'],
1087 # vca_deployed['application'], None, db_nsr,
1088 # db_nslcmop, vnf_index))
1089 self.lcm_tasks.register("ns", nsr_id, nslcmop_id,
1090 "delete_charm:" + vca_deployed["application"], task)
1091 except Exception as e:
1092 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1093
1094 # remove from RO
1095 RO_fail = False
1096 RO = ROclient.ROClient(self.loop, **self.ro_config)
1097
1098 # Delete ns
1099 RO_nsr_id = RO_delete_action = None
1100 if nsr_deployed and nsr_deployed.get("RO"):
1101 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1102 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1103 try:
1104 if RO_nsr_id:
1105 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1106 self.logger.debug(logging_text + step)
1107 desc = await RO.delete("ns", RO_nsr_id)
1108 RO_delete_action = desc["action_id"]
1109 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1110 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1111 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1112 if RO_delete_action:
1113 # wait until NS is deleted from VIM
1114 step = detailed_status = "Waiting ns deleted from VIM. RO_id={}".format(RO_nsr_id)
1115 detailed_status_old = None
1116 self.logger.debug(logging_text + step)
1117
1118 delete_timeout = 20 * 60 # 20 minutes
1119 while delete_timeout > 0:
1120 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1121 extra_item_id=RO_delete_action)
1122 ns_status, ns_status_info = RO.check_action_status(desc)
1123 if ns_status == "ERROR":
1124 raise ROclient.ROClientException(ns_status_info)
1125 elif ns_status == "BUILD":
1126 detailed_status = step + "; {}".format(ns_status_info)
1127 elif ns_status == "ACTIVE":
1128 break
1129 else:
1130 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1131 await asyncio.sleep(5, loop=self.loop)
1132 delete_timeout -= 5
1133 if detailed_status != detailed_status_old:
1134 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1135 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1136 else: # delete_timeout <= 0:
1137 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1138
1139 except ROclient.ROClientException as e:
1140 if e.http_code == 404: # not found
1141 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1142 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1143 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1144 elif e.http_code == 409: # conflict
1145 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1146 self.logger.debug(logging_text + failed_detail[-1])
1147 RO_fail = True
1148 else:
1149 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1150 self.logger.error(logging_text + failed_detail[-1])
1151 RO_fail = True
1152
1153 # Delete nsd
1154 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1155 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1156 try:
1157 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1158 "Deleting nsd at RO"
1159 await RO.delete("nsd", RO_nsd_id)
1160 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1161 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1162 except ROclient.ROClientException as e:
1163 if e.http_code == 404: # not found
1164 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1165 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1166 elif e.http_code == 409: # conflict
1167 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1168 self.logger.debug(logging_text + failed_detail[-1])
1169 RO_fail = True
1170 else:
1171 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1172 self.logger.error(logging_text + failed_detail[-1])
1173 RO_fail = True
1174
1175 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1176 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1177 if not RO_vnfd_id:
1178 continue
1179 try:
1180 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1181 "Deleting vnfd={} at RO".format(vnf_id)
1182 await RO.delete("vnfd", RO_vnfd_id)
1183 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1184 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1185 except ROclient.ROClientException as e:
1186 if e.http_code == 404: # not found
1187 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1188 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1189 elif e.http_code == 409: # conflict
1190 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1191 self.logger.debug(logging_text + failed_detail[-1])
1192 else:
1193 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1194 self.logger.error(logging_text + failed_detail[-1])
1195
1196 if vca_task_list:
1197 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1198 "Waiting for deletion of configuration charms"
1199 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1201 await asyncio.wait(vca_task_list, timeout=300)
1202 for application_name, task in vca_task_dict.items():
1203 if task.cancelled():
1204 failed_detail.append("VCA[application_name={}] Deletion has been cancelled"
1205 .format(application_name))
1206 elif task.done():
1207 exc = task.exception()
1208 if exc:
1209 failed_detail.append("VCA[application_name={}] Deletion exception: {}"
1210 .format(application_name, exc))
1211 else:
1212 vca_index = vca_application_name2index[application_name]
1213 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = None
1214 else: # timeout
1215 # TODO Should it be cancelled?!!
1216 task.cancel()
1217 failed_detail.append("VCA[application_name={}] Deletion timeout".format(application_name))
1218
1219 if failed_detail:
1220 self.logger.error(logging_text + " ;".join(failed_detail))
1221 db_nsr_update["operational-status"] = "failed"
1222 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1223 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1224 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1225 db_nslcmop_update["statusEnteredTime"] = time()
1226 elif db_nslcmop["operationParams"].get("autoremove"):
1227 self.db.del_one("nsrs", {"_id": nsr_id})
1228 db_nsr_update.clear()
1229 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1230 nslcmop_operation_state = "COMPLETED"
1231 db_nslcmop_update.clear()
1232 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1233 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1234 {"_admin.usageSate": "NOT_IN_USE", "_admin.usage": None})
1235 self.logger.debug(logging_text + "Delete from database")
1236 else:
1237 db_nsr_update["operational-status"] = "terminated"
1238 db_nsr_update["detailed-status"] = "Done"
1239 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1240 db_nslcmop_update["detailed-status"] = "Done"
1241 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1242 db_nslcmop_update["statusEnteredTime"] = time()
1243
1244 except (ROclient.ROClientException, DbException) as e:
1245 self.logger.error(logging_text + "Exit Exception {}".format(e))
1246 exc = e
1247 except asyncio.CancelledError:
1248 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1249 exc = "Operation was cancelled"
1250 except Exception as e:
1251 exc = traceback.format_exc()
1252 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1253 finally:
1254 if exc and db_nslcmop:
1255 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1256 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1257 db_nslcmop_update["statusEnteredTime"] = time()
1258 if db_nslcmop_update:
1259 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1260 if db_nsr:
1261 db_nsr_update["_admin.nslcmop"] = None
1262 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1263 if nslcmop_operation_state:
1264 try:
1265 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1266 "operationState": nslcmop_operation_state})
1267 except Exception as e:
1268 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1269 self.logger.debug(logging_text + "Exit")
1270 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1271
1272 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1273 primitive, primitive_params):
1274
1275 for vca_deployed in db_deployed["VCA"]:
1276 if not vca_deployed:
1277 continue
1278 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1279 continue
1280 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1281 continue
1282 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1283 continue
1284 break
1285 else:
1286 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1287 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1288 model_name = vca_deployed.get("model")
1289 application_name = vca_deployed.get("application")
1290 if not model_name or not application_name:
1291 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1292 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1293 if vca_deployed["operational-status"] != "active":
1294 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1295 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1296 callback = None # self.n2vc_callback
1297 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1298 await self.n2vc.login()
1299 task = asyncio.ensure_future(
1300 self.n2vc.ExecutePrimitive(
1301 model_name,
1302 application_name,
1303 primitive, callback,
1304 *callback_args,
1305 **primitive_params
1306 )
1307 )
1308 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1309 # db_nsr, db_nslcmop, member_vnf_index))
1310 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1311 # wait until completed with timeout
1312 await asyncio.wait((task,), timeout=600)
1313
1314 result = "FAILED" # by default
1315 result_detail = ""
1316 if task.cancelled():
1317 result_detail = "Task has been cancelled"
1318 elif task.done():
1319 exc = task.exception()
1320 if exc:
1321 result_detail = str(exc)
1322 else:
1323 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1324 result = "COMPLETED"
1325 result_detail = "Done"
1326 else: # timeout
1327 # TODO Should it be cancelled?!!
1328 task.cancel()
1329 result_detail = "timeout"
1330 return result, result_detail
1331
1332 async def action(self, nsr_id, nslcmop_id):
1333 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1334 self.logger.debug(logging_text + "Enter")
1335 # get all needed from database
1336 db_nsr = None
1337 db_nslcmop = None
1338 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1339 db_nslcmop_update = {}
1340 nslcmop_operation_state = None
1341 exc = None
1342 try:
1343 step = "Getting information from database"
1344 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1345 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1346 nsr_deployed = db_nsr["_admin"].get("deployed")
1347 nsr_name = db_nsr["name"]
1348 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1349 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1350 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1351 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1352
1353 # look if previous tasks in process
1354 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1355 if task_dependency:
1356 step = db_nslcmop_update["detailed-status"] = \
1357 "Waiting for related tasks to be completed: {}".format(task_name)
1358 self.logger.debug(logging_text + step)
1359 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1360 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1361 if pending:
1362 raise LcmException("Timeout waiting related tasks to be completed")
1363
1364 # for backward compatibility
1365 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1366 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1367 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1368 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1369
1370 # TODO check if ns is in a proper status
1371 primitive = db_nslcmop["operationParams"]["primitive"]
1372 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1373 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1374 vdu_name, vdu_count_index, primitive,
1375 primitive_params)
1376 db_nslcmop_update["detailed-status"] = result_detail
1377 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1378 db_nslcmop_update["statusEnteredTime"] = time()
1379 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1380 return # database update is called inside finally
1381
1382 except (DbException, LcmException) as e:
1383 self.logger.error(logging_text + "Exit Exception {}".format(e))
1384 exc = e
1385 except asyncio.CancelledError:
1386 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1387 exc = "Operation was cancelled"
1388 except Exception as e:
1389 exc = traceback.format_exc()
1390 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1391 finally:
1392 if exc and db_nslcmop:
1393 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1394 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1395 db_nslcmop_update["statusEnteredTime"] = time()
1396 if db_nslcmop_update:
1397 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1398 if db_nsr:
1399 db_nsr_update["_admin.nslcmop"] = None
1400 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1401 self.logger.debug(logging_text + "Exit")
1402 if nslcmop_operation_state:
1403 try:
1404 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1405 "operationState": nslcmop_operation_state})
1406 except Exception as e:
1407 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1408 self.logger.debug(logging_text + "Exit")
1409 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1410
1411 async def scale(self, nsr_id, nslcmop_id):
1412 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1413 self.logger.debug(logging_text + "Enter")
1414 # get all needed from database
1415 db_nsr = None
1416 db_nslcmop = None
1417 db_nslcmop_update = {}
1418 nslcmop_operation_state = None
1419 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1420 exc = None
1421 # in case of error, indicates what part of scale was failed to put nsr at error status
1422 scale_process = None
1423 old_operational_status = ""
1424 old_config_status = ""
1425 vnfr_scaled = False
1426 try:
1427 step = "Getting nslcmop from database"
1428 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1429 step = "Getting nsr from database"
1430 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1431 nsr_name = db_nsr["name"]
1432 old_operational_status = db_nsr["operational-status"]
1433 old_config_status = db_nsr["config-status"]
1434
1435 # look if previous tasks in process
1436 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1437 if task_dependency:
1438 step = db_nslcmop_update["detailed-status"] = \
1439 "Waiting for related tasks to be completed: {}".format(task_name)
1440 self.logger.debug(logging_text + step)
1441 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1442 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1443 if pending:
1444 raise LcmException("Timeout waiting related tasks to be completed")
1445
1446 step = "Parsing scaling parameters"
1447 db_nsr_update["operational-status"] = "scaling"
1448 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1449 nsr_deployed = db_nsr["_admin"].get("deployed")
1450 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1451 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1452 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1453 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1454 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1455
1456 # for backward compatibility
1457 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1458 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1459 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1460 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1461
1462 step = "Getting vnfr from database"
1463 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1464 step = "Getting vnfd from database"
1465 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1466 step = "Getting scaling-group-descriptor"
1467 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1468 if scaling_descriptor["name"] == scaling_group:
1469 break
1470 else:
1471 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1472 "at vnfd:scaling-group-descriptor".format(scaling_group))
1473 # cooldown_time = 0
1474 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1475 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1476 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1477 # break
1478
1479 # TODO check if ns is in a proper status
1480 step = "Sending scale order to RO"
1481 nb_scale_op = 0
1482 if not db_nsr["_admin"].get("scaling-group"):
1483 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1484 admin_scale_index = 0
1485 else:
1486 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1487 if admin_scale_info["name"] == scaling_group:
1488 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1489 break
1490 else: # not found, set index one plus last element and add new entry with the name
1491 admin_scale_index += 1
1492 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1493 RO_scaling_info = []
1494 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1495 if scaling_type == "SCALE_OUT":
1496 # count if max-instance-count is reached
1497 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1498 max_instance_count = int(scaling_descriptor["max-instance-count"])
1499 if nb_scale_op >= max_instance_count:
1500 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1501 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1502 nb_scale_op = nb_scale_op + 1
1503 vdu_scaling_info["scaling_direction"] = "OUT"
1504 vdu_scaling_info["vdu-create"] = {}
1505 for vdu_scale_info in scaling_descriptor["vdu"]:
1506 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1507 "type": "create", "count": vdu_scale_info.get("count", 1)})
1508 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1509 elif scaling_type == "SCALE_IN":
1510 # count if min-instance-count is reached
1511 min_instance_count = 0
1512 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1513 min_instance_count = int(scaling_descriptor["min-instance-count"])
1514 if nb_scale_op <= min_instance_count:
1515 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1516 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1517 nb_scale_op = nb_scale_op - 1
1518 vdu_scaling_info["scaling_direction"] = "IN"
1519 vdu_scaling_info["vdu-delete"] = {}
1520 for vdu_scale_info in scaling_descriptor["vdu"]:
1521 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1522 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1523 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1524
1525 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1526 vdu_create = vdu_scaling_info.get("vdu-create")
1527 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1528 if vdu_scaling_info["scaling_direction"] == "IN":
1529 for vdur in reversed(db_vnfr["vdur"]):
1530 if vdu_delete.get(vdur["vdu-id-ref"]):
1531 vdu_delete[vdur["vdu-id-ref"]] -= 1
1532 vdu_scaling_info["vdu"].append({
1533 "name": vdur["name"],
1534 "vdu_id": vdur["vdu-id-ref"],
1535 "interface": []
1536 })
1537 for interface in vdur["interfaces"]:
1538 vdu_scaling_info["vdu"][-1]["interface"].append({
1539 "name": interface["name"],
1540 "ip_address": interface["ip-address"],
1541 "mac_address": interface.get("mac-address"),
1542 })
1543 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1544
1545 # execute primitive service PRE-SCALING
1546 step = "Executing pre-scale vnf-config-primitive"
1547 if scaling_descriptor.get("scaling-config-action"):
1548 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1549 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1550 and scaling_type == "SCALE_IN":
1551 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1552 step = db_nslcmop_update["detailed-status"] = \
1553 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1554 # look for primitive
1555 primitive_params = {}
1556 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1557 if config_primitive["name"] == vnf_config_primitive:
1558 for parameter in config_primitive.get("parameter", ()):
1559 if 'default-value' in parameter and \
1560 parameter['default-value'] == "<VDU_SCALE_INFO>":
1561 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1562 default_flow_style=True,
1563 width=256)
1564 break
1565 else:
1566 raise LcmException(
1567 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1568 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1569 "primitive".format(scaling_group, config_primitive))
1570 scale_process = "VCA"
1571 db_nsr_update["config-status"] = "configuring pre-scaling"
1572 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1573 None, None, None, vnf_config_primitive,
1574 primitive_params)
1575 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1576 vnf_config_primitive, result, result_detail))
1577 if result == "FAILED":
1578 raise LcmException(result_detail)
1579 db_nsr_update["config-status"] = old_config_status
1580 scale_process = None
1581
1582 if RO_scaling_info:
1583 scale_process = "RO"
1584 RO = ROclient.ROClient(self.loop, **self.ro_config)
1585 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1586 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1587 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1588 # wait until ready
1589 RO_nslcmop_id = RO_desc["instance_action_id"]
1590 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1591
1592 RO_task_done = False
1593 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1594 detailed_status_old = None
1595 self.logger.debug(logging_text + step)
1596
1597 deployment_timeout = 1 * 3600 # One hour
1598 while deployment_timeout > 0:
1599 if not RO_task_done:
1600 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1601 extra_item_id=RO_nslcmop_id)
1602 ns_status, ns_status_info = RO.check_action_status(desc)
1603 if ns_status == "ERROR":
1604 raise ROclient.ROClientException(ns_status_info)
1605 elif ns_status == "BUILD":
1606 detailed_status = step + "; {}".format(ns_status_info)
1607 elif ns_status == "ACTIVE":
1608 RO_task_done = True
1609 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1610 self.logger.debug(logging_text + step)
1611 else:
1612 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1613 else:
1614 desc = await RO.show("ns", RO_nsr_id)
1615 ns_status, ns_status_info = RO.check_ns_status(desc)
1616 if ns_status == "ERROR":
1617 raise ROclient.ROClientException(ns_status_info)
1618 elif ns_status == "BUILD":
1619 detailed_status = step + "; {}".format(ns_status_info)
1620 elif ns_status == "ACTIVE":
1621 step = detailed_status = \
1622 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1623 if not vnfr_scaled:
1624 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1625 vnfr_scaled = True
1626 try:
1627 desc = await RO.show("ns", RO_nsr_id)
1628 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1629 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1630 break
1631 except LcmExceptionNoMgmtIP:
1632 pass
1633 else:
1634 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1635 if detailed_status != detailed_status_old:
1636 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1637 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1638
1639 await asyncio.sleep(5, loop=self.loop)
1640 deployment_timeout -= 5
1641 if deployment_timeout <= 0:
1642 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1643
1644 # update VDU_SCALING_INFO with the obtained ip_addresses
1645 if vdu_scaling_info["scaling_direction"] == "OUT":
1646 for vdur in reversed(db_vnfr["vdur"]):
1647 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1648 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1649 vdu_scaling_info["vdu"].append({
1650 "name": vdur["name"],
1651 "vdu_id": vdur["vdu-id-ref"],
1652 "interface": []
1653 })
1654 for interface in vdur["interfaces"]:
1655 vdu_scaling_info["vdu"][-1]["interface"].append({
1656 "name": interface["name"],
1657 "ip_address": interface["ip-address"],
1658 "mac_address": interface.get("mac-address"),
1659 })
1660 del vdu_scaling_info["vdu-create"]
1661
1662 scale_process = None
1663 if db_nsr_update:
1664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1665
1666 # execute primitive service POST-SCALING
1667 step = "Executing post-scale vnf-config-primitive"
1668 if scaling_descriptor.get("scaling-config-action"):
1669 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1670 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1671 and scaling_type == "SCALE_OUT":
1672 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1673 step = db_nslcmop_update["detailed-status"] = \
1674 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1675 # look for primitive
1676 primitive_params = {}
1677 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1678 if config_primitive["name"] == vnf_config_primitive:
1679 for parameter in config_primitive.get("parameter", ()):
1680 if 'default-value' in parameter and \
1681 parameter['default-value'] == "<VDU_SCALE_INFO>":
1682 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1683 default_flow_style=True,
1684 width=256)
1685 break
1686 else:
1687 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1688 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1689 "match any vnf-configuration:config-primitive".format(scaling_group,
1690 config_primitive))
1691 scale_process = "VCA"
1692 db_nsr_update["config-status"] = "configuring post-scaling"
1693
1694 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1695 None, None, None, vnf_config_primitive,
1696 primitive_params)
1697 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1698 vnf_config_primitive, result, result_detail))
1699 if result == "FAILED":
1700 raise LcmException(result_detail)
1701 db_nsr_update["config-status"] = old_config_status
1702 scale_process = None
1703
1704 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1705 db_nslcmop_update["statusEnteredTime"] = time()
1706 db_nslcmop_update["detailed-status"] = "done"
1707 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1708 db_nsr_update["operational-status"] = old_operational_status
1709 db_nsr_update["config-status"] = old_config_status
1710 return
1711 except (ROclient.ROClientException, DbException, LcmException) as e:
1712 self.logger.error(logging_text + "Exit Exception {}".format(e))
1713 exc = e
1714 except asyncio.CancelledError:
1715 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1716 exc = "Operation was cancelled"
1717 except Exception as e:
1718 exc = traceback.format_exc()
1719 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1720 finally:
1721 if exc:
1722 if db_nslcmop:
1723 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1724 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1725 db_nslcmop_update["statusEnteredTime"] = time()
1726 if db_nsr:
1727 db_nsr_update["operational-status"] = old_operational_status
1728 db_nsr_update["config-status"] = old_config_status
1729 db_nsr_update["detailed-status"] = ""
1730 db_nsr_update["_admin.nslcmop"] = None
1731 if scale_process:
1732 if "VCA" in scale_process:
1733 db_nsr_update["config-status"] = "failed"
1734 if "RO" in scale_process:
1735 db_nsr_update["operational-status"] = "failed"
1736 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1737 exc)
1738 if db_nslcmop_update:
1739 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1740 if db_nsr:
1741 db_nsr_update["_admin.nslcmop"] = None
1742 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1743 if nslcmop_operation_state:
1744 try:
1745 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1746 "operationState": nslcmop_operation_state})
1747 # if cooldown_time:
1748 # await asyncio.sleep(cooldown_time)
1749 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1750 except Exception as e:
1751 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1752 self.logger.debug(logging_text + "Exit")
1753 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")