Include variables in cloud-init
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 2 * 3600 # global timeout for deployment
73 timeout_charm_delete = 10 * 60
74
75 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
76 """
77 Init, Connect to database, filesystem storage, and messaging
78 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
79 :return: None
80 """
81 # logging
82 self.logger = logging.getLogger('lcm.ns')
83 self.loop = loop
84 self.lcm_tasks = lcm_tasks
85
86 super().__init__(db, msg, fs, self.logger)
87
88 self.ro_config = ro_config
89
90 self.n2vc = N2VC(
91 log=self.logger,
92 server=vca_config['host'],
93 port=vca_config['port'],
94 user=vca_config['user'],
95 secret=vca_config['secret'],
96 # TODO: This should point to the base folder where charms are stored,
97 # if there is a common one (like object storage). Otherwise, leave
98 # it unset and pass it via DeployCharms
99 # artifacts=vca_config[''],
100 artifacts=None,
101 )
102
103 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
104 """
105 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
106 :param vnfd: input vnfd
107 :param new_id: overrides vnf id if provided
108 :param additionalParamsForVnf: Instantiation params for VNFs provided
109 :param nsrId: Id of the NSR
110 :return: copy of vnfd
111 """
112 ci_file = None
113 try:
114 vnfd_RO = deepcopy(vnfd)
115 vnfd_RO.pop("_id", None)
116 vnfd_RO.pop("_admin", None)
117 if new_id:
118 vnfd_RO["id"] = new_id
119 for vdu in vnfd_RO.get("vdu", ()):
120 if "cloud-init-file" in vdu:
121 base_folder = vnfd["_admin"]["storage"]
122 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
123 vdu["cloud-init-file"])
124 with self.fs.file_open(cloud_init_file, "r") as ci_file:
125 cloud_init_content = ci_file.read()
126 vdu.pop("cloud-init-file", None)
127 elif "cloud-init" in vdu:
128 cloud_init_content = vdu["cloud-init"]
129 try:
130 env = Environment()
131 ast = env.parse(cloud_init_content)
132 mandatory_vars = meta.find_undeclared_variables(ast)
133 if mandatory_vars:
134 for var in mandatory_vars:
135 if not additionalParams or var not in additionalParams.keys():
136 raise LcmException("Variable '{}' defined in vnfd[id={}] vdu[id={}] cloud-init or "
137 "cloud-init-file, must be provided in the instantiation parameters "
138 "inside the 'additionalParamsForVnf' block".format(var, vnfd["id"],
139 vdu["id"]))
140 template = Template(cloud_init_content)
141 cloud_init_content = template.render(additionalParams)
142 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
143 raise LcmException("Error in jinja2 template: {}".format(e))
144 vdu["cloud-init"] = cloud_init_content
145 # remnove unused by RO configuration, monitoring, scaling
146 vnfd_RO.pop("vnf-configuration", None)
147 vnfd_RO.pop("monitoring-param", None)
148 vnfd_RO.pop("scaling-group-descriptor", None)
149 return vnfd_RO
150 except FsException as e:
151 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
152 finally:
153 if ci_file:
154 ci_file.close()
155
156 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
157 """
158 Callback both for charm status change and task completion
159 :param model_name: Charm model name
160 :param application_name: Charm application name
161 :param status: Can be
162 - blocked: The unit needs manual intervention
163 - maintenance: The unit is actively deploying/configuring
164 - waiting: The unit is waiting for another charm to be ready
165 - active: The unit is deployed, configured, and ready
166 - error: The charm has failed and needs attention.
167 - terminated: The charm has been destroyed
168 - removing,
169 - removed
170 :param message: detailed message error
171 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
172 nsr_id:
173 nslcmop_id:
174 lcmOperationType: currently "instantiate"
175 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
176 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
177 n2vc_event: event used to notify instantiation task that some change has been produced
178 :param task: None for charm status change, or task for completion task callback
179 :return:
180 """
181 try:
182 nsr_id = n2vc_info["nsr_id"]
183 deployed = n2vc_info["deployed"]
184 db_nsr_update = n2vc_info["db_update"]
185 nslcmop_id = n2vc_info["nslcmop_id"]
186 ns_operation = n2vc_info["lcmOperationType"]
187 n2vc_event = n2vc_info["n2vc_event"]
188 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
189 application_name)
190 for vca_index, vca_deployed in enumerate(deployed):
191 if not vca_deployed:
192 continue
193 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
194 break
195 else:
196 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
197 return
198 if task:
199 if task.cancelled():
200 self.logger.debug(logging_text + " task Cancelled")
201 vca_deployed['operational-status'] = "error"
202 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
203 vca_deployed['detailed-status'] = "Task Cancelled"
204 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
205
206 elif task.done():
207 exc = task.exception()
208 if exc:
209 self.logger.error(logging_text + " task Exception={}".format(exc))
210 vca_deployed['operational-status'] = "error"
211 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
212 vca_deployed['detailed-status'] = str(exc)
213 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
214 else:
215 self.logger.debug(logging_text + " task Done")
216 # task is Done, but callback is still ongoing. So ignore
217 return
218 elif status:
219 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
220 if vca_deployed['operational-status'] == status:
221 return # same status, ignore
222 vca_deployed['operational-status'] = status
223 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
224 vca_deployed['detailed-status'] = str(message)
225 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
226 else:
227 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
228 return
229 # wake up instantiate task
230 n2vc_event.set()
231 except Exception as e:
232 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
233
234 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
235 """
236 Creates a RO ns descriptor from OSM ns_instantiate params
237 :param ns_params: OSM instantiate params
238 :return: The RO ns descriptor
239 """
240 vim_2_RO = {}
241 # TODO feature 1417: Check that no instantiation is set over PDU
242 # check if PDU forces a concrete vim-network-id and add it
243 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
244
245 def vim_account_2_RO(vim_account):
246 if vim_account in vim_2_RO:
247 return vim_2_RO[vim_account]
248
249 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
250 if db_vim["_admin"]["operationalState"] != "ENABLED":
251 raise LcmException("VIM={} is not available. operationalState={}".format(
252 vim_account, db_vim["_admin"]["operationalState"]))
253 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
254 vim_2_RO[vim_account] = RO_vim_id
255 return RO_vim_id
256
257 def ip_profile_2_RO(ip_profile):
258 RO_ip_profile = deepcopy((ip_profile))
259 if "dns-server" in RO_ip_profile:
260 if isinstance(RO_ip_profile["dns-server"], list):
261 RO_ip_profile["dns-address"] = []
262 for ds in RO_ip_profile.pop("dns-server"):
263 RO_ip_profile["dns-address"].append(ds['address'])
264 else:
265 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
266 if RO_ip_profile.get("ip-version") == "ipv4":
267 RO_ip_profile["ip-version"] = "IPv4"
268 if RO_ip_profile.get("ip-version") == "ipv6":
269 RO_ip_profile["ip-version"] = "IPv6"
270 if "dhcp-params" in RO_ip_profile:
271 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
272 return RO_ip_profile
273
274 if not ns_params:
275 return None
276 RO_ns_params = {
277 # "name": ns_params["nsName"],
278 # "description": ns_params.get("nsDescription"),
279 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
280 # "scenario": ns_params["nsdId"],
281 }
282 if n2vc_key_list:
283 for vnfd_ref, vnfd in vnfd_dict.items():
284 vdu_needed_access = []
285 mgmt_cp = None
286 if vnfd.get("vnf-configuration"):
287 if vnfd.get("mgmt-interface"):
288 if vnfd["mgmt-interface"].get("vdu-id"):
289 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
290 elif vnfd["mgmt-interface"].get("cp"):
291 mgmt_cp = vnfd["mgmt-interface"]["cp"]
292
293 for vdu in vnfd.get("vdu", ()):
294 if vdu.get("vdu-configuration"):
295 vdu_needed_access.append(vdu["id"])
296 elif mgmt_cp:
297 for vdu_interface in vdu.get("interface"):
298 if vdu_interface.get("external-connection-point-ref") and \
299 vdu_interface["external-connection-point-ref"] == mgmt_cp:
300 vdu_needed_access.append(vdu["id"])
301 mgmt_cp = None
302 break
303
304 if vdu_needed_access:
305 for vnf_member in nsd.get("constituent-vnfd"):
306 if vnf_member["vnfd-id-ref"] != vnfd_ref:
307 continue
308 for vdu in vdu_needed_access:
309 populate_dict(RO_ns_params,
310 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
311 n2vc_key_list)
312
313 if ns_params.get("vduImage"):
314 RO_ns_params["vduImage"] = ns_params["vduImage"]
315
316 if ns_params.get("ssh_keys"):
317 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
318 for vnf_params in get_iterable(ns_params, "vnf"):
319 for constituent_vnfd in nsd["constituent-vnfd"]:
320 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
321 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
322 break
323 else:
324 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
325 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
326 if vnf_params.get("vimAccountId"):
327 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
328 vim_account_2_RO(vnf_params["vimAccountId"]))
329
330 for vdu_params in get_iterable(vnf_params, "vdu"):
331 # TODO feature 1417: check that this VDU exist and it is not a PDU
332 if vdu_params.get("volume"):
333 for volume_params in vdu_params["volume"]:
334 if volume_params.get("vim-volume-id"):
335 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
336 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
337 volume_params["vim-volume-id"])
338 if vdu_params.get("interface"):
339 for interface_params in vdu_params["interface"]:
340 if interface_params.get("ip-address"):
341 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
342 vdu_params["id"], "interfaces", interface_params["name"],
343 "ip_address"),
344 interface_params["ip-address"])
345 if interface_params.get("mac-address"):
346 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
347 vdu_params["id"], "interfaces", interface_params["name"],
348 "mac_address"),
349 interface_params["mac-address"])
350 if interface_params.get("floating-ip-required"):
351 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
352 vdu_params["id"], "interfaces", interface_params["name"],
353 "floating-ip"),
354 interface_params["floating-ip-required"])
355
356 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
357 if internal_vld_params.get("vim-network-name"):
358 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
359 internal_vld_params["name"], "vim-network-name"),
360 internal_vld_params["vim-network-name"])
361 if internal_vld_params.get("vim-network-id"):
362 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
363 internal_vld_params["name"], "vim-network-id"),
364 internal_vld_params["vim-network-id"])
365 if internal_vld_params.get("ip-profile"):
366 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
367 internal_vld_params["name"], "ip-profile"),
368 ip_profile_2_RO(internal_vld_params["ip-profile"]))
369
370 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
371 # look for interface
372 iface_found = False
373 for vdu_descriptor in vnf_descriptor["vdu"]:
374 for vdu_interface in vdu_descriptor["interface"]:
375 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
376 if icp_params.get("ip-address"):
377 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
378 vdu_descriptor["id"], "interfaces",
379 vdu_interface["name"], "ip_address"),
380 icp_params["ip-address"])
381
382 if icp_params.get("mac-address"):
383 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
384 vdu_descriptor["id"], "interfaces",
385 vdu_interface["name"], "mac_address"),
386 icp_params["mac-address"])
387 iface_found = True
388 break
389 if iface_found:
390 break
391 else:
392 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
393 "internal-vld:id-ref={} is not present at vnfd:internal-"
394 "connection-point".format(vnf_params["member-vnf-index"],
395 icp_params["id-ref"]))
396
397 for vld_params in get_iterable(ns_params, "vld"):
398 if "ip-profile" in vld_params:
399 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
400 ip_profile_2_RO(vld_params["ip-profile"]))
401 if vld_params.get("vim-network-name"):
402 RO_vld_sites = []
403 if isinstance(vld_params["vim-network-name"], dict):
404 for vim_account, vim_net in vld_params["vim-network-name"].items():
405 RO_vld_sites.append({
406 "netmap-use": vim_net,
407 "datacenter": vim_account_2_RO(vim_account)
408 })
409 else: # isinstance str
410 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
411 if RO_vld_sites:
412 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
413 if vld_params.get("vim-network-id"):
414 RO_vld_sites = []
415 if isinstance(vld_params["vim-network-id"], dict):
416 for vim_account, vim_net in vld_params["vim-network-id"].items():
417 RO_vld_sites.append({
418 "netmap-use": vim_net,
419 "datacenter": vim_account_2_RO(vim_account)
420 })
421 else: # isinstance str
422 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
423 if RO_vld_sites:
424 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
425 if "vnfd-connection-point-ref" in vld_params:
426 for cp_params in vld_params["vnfd-connection-point-ref"]:
427 # look for interface
428 for constituent_vnfd in nsd["constituent-vnfd"]:
429 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
430 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
431 break
432 else:
433 raise LcmException(
434 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
435 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
436 match_cp = False
437 for vdu_descriptor in vnf_descriptor["vdu"]:
438 for interface_descriptor in vdu_descriptor["interface"]:
439 if interface_descriptor.get("external-connection-point-ref") == \
440 cp_params["vnfd-connection-point-ref"]:
441 match_cp = True
442 break
443 if match_cp:
444 break
445 else:
446 raise LcmException(
447 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
448 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
449 cp_params["member-vnf-index-ref"],
450 cp_params["vnfd-connection-point-ref"],
451 vnf_descriptor["id"]))
452 if cp_params.get("ip-address"):
453 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
454 vdu_descriptor["id"], "interfaces",
455 interface_descriptor["name"], "ip_address"),
456 cp_params["ip-address"])
457 if cp_params.get("mac-address"):
458 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
459 vdu_descriptor["id"], "interfaces",
460 interface_descriptor["name"], "mac_address"),
461 cp_params["mac-address"])
462 return RO_ns_params
463
464 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
465 # make a copy to do not change
466 vdu_create = copy(vdu_create)
467 vdu_delete = copy(vdu_delete)
468
469 vdurs = db_vnfr.get("vdur")
470 if vdurs is None:
471 vdurs = []
472 vdu_index = len(vdurs)
473 while vdu_index:
474 vdu_index -= 1
475 vdur = vdurs[vdu_index]
476 if vdur.get("pdu-type"):
477 continue
478 vdu_id_ref = vdur["vdu-id-ref"]
479 if vdu_create and vdu_create.get(vdu_id_ref):
480 for index in range(0, vdu_create[vdu_id_ref]):
481 vdur = deepcopy(vdur)
482 vdur["_id"] = str(uuid4())
483 vdur["count-index"] += 1
484 vdurs.insert(vdu_index+1+index, vdur)
485 del vdu_create[vdu_id_ref]
486 if vdu_delete and vdu_delete.get(vdu_id_ref):
487 del vdurs[vdu_index]
488 vdu_delete[vdu_id_ref] -= 1
489 if not vdu_delete[vdu_id_ref]:
490 del vdu_delete[vdu_id_ref]
491 # check all operations are done
492 if vdu_create or vdu_delete:
493 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
494 vdu_create))
495 if vdu_delete:
496 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
497 vdu_delete))
498
499 vnfr_update = {"vdur": vdurs}
500 db_vnfr["vdur"] = vdurs
501 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
502
503 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
504 """
505 Updates database nsr with the RO info for the created vld
506 :param ns_update_nsr: dictionary to be filled with the updated info
507 :param db_nsr: content of db_nsr. This is also modified
508 :param nsr_desc_RO: nsr descriptor from RO
509 :return: Nothing, LcmException is raised on errors
510 """
511
512 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
513 for net_RO in get_iterable(nsr_desc_RO, "nets"):
514 if vld["id"] != net_RO.get("ns_net_osm_id"):
515 continue
516 vld["vim-id"] = net_RO.get("vim_net_id")
517 vld["name"] = net_RO.get("vim_name")
518 vld["status"] = net_RO.get("status")
519 vld["status-detailed"] = net_RO.get("error_msg")
520 ns_update_nsr["vld.{}".format(vld_index)] = vld
521 break
522 else:
523 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
524
525 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
526 """
527 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
528 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
529 :param nsr_desc_RO: nsr descriptor from RO
530 :return: Nothing, LcmException is raised on errors
531 """
532 for vnf_index, db_vnfr in db_vnfrs.items():
533 for vnf_RO in nsr_desc_RO["vnfs"]:
534 if vnf_RO["member_vnf_index"] != vnf_index:
535 continue
536 vnfr_update = {}
537 if vnf_RO.get("ip_address"):
538 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"]
539 elif not db_vnfr.get("ip-address"):
540 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
541
542 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
543 vdur_RO_count_index = 0
544 if vdur.get("pdu-type"):
545 continue
546 for vdur_RO in get_iterable(vnf_RO, "vms"):
547 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
548 continue
549 if vdur["count-index"] != vdur_RO_count_index:
550 vdur_RO_count_index += 1
551 continue
552 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
553 vdur["ip-address"] = vdur_RO.get("ip_address")
554 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
555 vdur["name"] = vdur_RO.get("vim_name")
556 vdur["status"] = vdur_RO.get("status")
557 vdur["status-detailed"] = vdur_RO.get("error_msg")
558 for ifacer in get_iterable(vdur, "interfaces"):
559 for interface_RO in get_iterable(vdur_RO, "interfaces"):
560 if ifacer["name"] == interface_RO.get("internal_name"):
561 ifacer["ip-address"] = interface_RO.get("ip_address")
562 ifacer["mac-address"] = interface_RO.get("mac_address")
563 break
564 else:
565 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
566 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
567 vnfr_update["vdur.{}".format(vdu_index)] = vdur
568 break
569 else:
570 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
571 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
572
573 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
574 for net_RO in get_iterable(nsr_desc_RO, "nets"):
575 if vld["id"] != net_RO.get("vnf_net_osm_id"):
576 continue
577 vld["vim-id"] = net_RO.get("vim_net_id")
578 vld["name"] = net_RO.get("vim_name")
579 vld["status"] = net_RO.get("status")
580 vld["status-detailed"] = net_RO.get("error_msg")
581 vnfr_update["vld.{}".format(vld_index)] = vld
582 break
583 else:
584 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
585 vnf_index, vld["id"]))
586
587 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
588 break
589
590 else:
591 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
592
593 async def instantiate(self, nsr_id, nslcmop_id):
594 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
595 self.logger.debug(logging_text + "Enter")
596 # get all needed from database
597 start_deploy = time()
598 db_nsr = None
599 db_nslcmop = None
600 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
601 db_nslcmop_update = {}
602 nslcmop_operation_state = None
603 db_vnfrs = {}
604 RO_descriptor_number = 0 # number of descriptors created at RO
605 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
606 n2vc_info = {}
607 exc = None
608 try:
609 step = "Getting nslcmop={} from db".format(nslcmop_id)
610 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
611 step = "Getting nsr={} from db".format(nsr_id)
612 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
613 ns_params = db_nslcmop.get("operationParams")
614 nsd = db_nsr["nsd"]
615 nsr_name = db_nsr["name"] # TODO short-name??
616
617 # look if previous tasks in process
618 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
619 if task_dependency:
620 step = db_nslcmop_update["detailed-status"] = \
621 "Waiting for related tasks to be completed: {}".format(task_name)
622 self.logger.debug(logging_text + step)
623 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
624 _, pending = await asyncio.wait(task_dependency, timeout=3600)
625 if pending:
626 raise LcmException("Timeout waiting related tasks to be completed")
627
628 step = "Getting vnfrs from db"
629 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
630 db_vnfds_ref = {}
631 db_vnfds = {}
632 for vnfr in db_vnfrs_list:
633 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
634 vnfd_id = vnfr["vnfd-id"]
635 vnfd_ref = vnfr["vnfd-ref"]
636 if vnfd_id not in db_vnfds:
637 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
638 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
639 db_vnfds_ref[vnfd_ref] = vnfd
640 db_vnfds[vnfd_id] = vnfd
641
642 # Get or generates the _admin.deployed,VCA list
643 vca_deployed_list = None
644 if db_nsr["_admin"].get("deployed"):
645 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
646 if vca_deployed_list is None:
647 vca_deployed_list = []
648 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
649 elif isinstance(vca_deployed_list, dict):
650 # maintain backward compatibility. Change a dict to list at database
651 vca_deployed_list = list(vca_deployed_list.values())
652 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
653
654 db_nsr_update["detailed-status"] = "creating"
655 db_nsr_update["operational-status"] = "init"
656
657 RO = ROclient.ROClient(self.loop, **self.ro_config)
658
659 # set state to INSTANTIATED. When instantiated NBI will not delete directly
660 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
661 self.update_db_2("nsrs", nsr_id, db_nsr_update)
662
663 # get vnfds, instantiate at RO
664 for c_vnf in nsd.get("constituent-vnfd", ()):
665 member_vnf_index = c_vnf["member-vnf-index"]
666 vnfd_ref = vnfd["id"]
667 step = db_nsr_update["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_ref)
668 # self.logger.debug(logging_text + step)
669 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
670 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
671 RO_descriptor_number += 1
672
673 # look if present
674 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
675 if vnfd_list:
676 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = vnfd_list[0]["uuid"]
677 self.logger.debug(logging_text + "vnfd={} exists at RO. Using RO_id={}".format(
678 vnfd_ref, vnfd_list[0]["uuid"]))
679 else:
680 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
681 get("additionalParamsForVnf"), nsr_id)
682 desc = await RO.create("vnfd", descriptor=vnfd_RO)
683 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnfd_id)] = desc["uuid"]
684 self.logger.debug(logging_text + "vnfd={} created at RO. RO_id={}".format(
685 vnfd_ref, desc["uuid"]))
686 self.update_db_2("nsrs", nsr_id, db_nsr_update)
687
688 # create nsd at RO
689 nsd_ref = nsd["id"]
690 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
691 # self.logger.debug(logging_text + step)
692
693 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
694 RO_descriptor_number += 1
695 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
696 if nsd_list:
697 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
698 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
699 nsd_ref, RO_nsd_uuid))
700 else:
701 nsd_RO = deepcopy(nsd)
702 nsd_RO["id"] = RO_osm_nsd_id
703 nsd_RO.pop("_id", None)
704 nsd_RO.pop("_admin", None)
705 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
706 member_vnf_index = c_vnf["member-vnf-index"]
707 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
708 for c_vld in nsd_RO.get("vld", ()):
709 for cp in c_vld.get("vnfd-connection-point-ref", ()):
710 member_vnf_index = cp["member-vnf-index-ref"]
711 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
712
713 desc = await RO.create("nsd", descriptor=nsd_RO)
714 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
715 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
716 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
717 self.update_db_2("nsrs", nsr_id, db_nsr_update)
718
719 # Crate ns at RO
720 # if present use it unless in error status
721 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
722 if RO_nsr_id:
723 try:
724 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
725 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
726 desc = await RO.show("ns", RO_nsr_id)
727 except ROclient.ROClientException as e:
728 if e.http_code != HTTPStatus.NOT_FOUND:
729 raise
730 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
731 if RO_nsr_id:
732 ns_status, ns_status_info = RO.check_ns_status(desc)
733 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
734 if ns_status == "ERROR":
735 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
736 self.logger.debug(logging_text + step)
737 await RO.delete("ns", RO_nsr_id)
738 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
739 if not RO_nsr_id:
740 step = db_nsr_update["detailed-status"] = "Checking dependencies"
741 # self.logger.debug(logging_text + step)
742
743 # check if VIM is creating and wait look if previous tasks in process
744 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
745 if task_dependency:
746 step = "Waiting for related tasks to be completed: {}".format(task_name)
747 self.logger.debug(logging_text + step)
748 await asyncio.wait(task_dependency, timeout=3600)
749 if ns_params.get("vnf"):
750 for vnf in ns_params["vnf"]:
751 if "vimAccountId" in vnf:
752 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
753 vnf["vimAccountId"])
754 if task_dependency:
755 step = "Waiting for related tasks to be completed: {}".format(task_name)
756 self.logger.debug(logging_text + step)
757 await asyncio.wait(task_dependency, timeout=3600)
758
759 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
760
761 # feature 1429. Add n2vc public key to needed VMs
762 n2vc_key = await self.n2vc.GetPublicKey()
763 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
764
765 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
766 desc = await RO.create("ns", descriptor=RO_ns_params,
767 name=db_nsr["name"],
768 scenario=RO_nsd_uuid)
769 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
770 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
771 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
772 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
773 self.update_db_2("nsrs", nsr_id, db_nsr_update)
774
775 # wait until NS is ready
776 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
777 detailed_status_old = None
778 self.logger.debug(logging_text + step)
779
780 while time() <= start_deploy + self.total_deploy_timeout:
781 desc = await RO.show("ns", RO_nsr_id)
782 ns_status, ns_status_info = RO.check_ns_status(desc)
783 db_nsr_update["admin.deployed.RO.nsr_status"] = ns_status
784 if ns_status == "ERROR":
785 raise ROclient.ROClientException(ns_status_info)
786 elif ns_status == "BUILD":
787 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
788 elif ns_status == "ACTIVE":
789 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
790 try:
791 self.ns_update_vnfr(db_vnfrs, desc)
792 break
793 except LcmExceptionNoMgmtIP:
794 pass
795 else:
796 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
797 if detailed_status != detailed_status_old:
798 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
799 self.update_db_2("nsrs", nsr_id, db_nsr_update)
800 await asyncio.sleep(5, loop=self.loop)
801 else: # total_deploy_timeout
802 raise ROclient.ROClientException("Timeout waiting ns to be ready")
803
804 step = "Updating NSR"
805 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
806
807 db_nsr["detailed-status"] = "Configuring vnfr"
808 self.update_db_2("nsrs", nsr_id, db_nsr_update)
809
810 # The parameters we'll need to deploy a charm
811 number_to_configure = 0
812
813 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, mgmt_ip_address, n2vc_info,
814 config_primitive=None):
815 """An inner function to deploy the charm from either vnf or vdu
816 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
817 """
818 if not mgmt_ip_address:
819 raise LcmException("vnfd/vdu has not management ip address to configure it")
820 # Login to the VCA.
821 # if number_to_configure == 0:
822 # self.logger.debug("Logging into N2VC...")
823 # task = asyncio.ensure_future(self.n2vc.login())
824 # yield from asyncio.wait_for(task, 30.0)
825 # self.logger.debug("Logged into N2VC!")
826
827 # # await self.n2vc.login()
828
829 # Note: The charm needs to exist on disk at the location
830 # specified by charm_path.
831 base_folder = vnfd["_admin"]["storage"]
832 storage_params = self.fs.get_params()
833 charm_path = "{}{}/{}/charms/{}".format(
834 storage_params["path"],
835 base_folder["folder"],
836 base_folder["pkg-dir"],
837 proxy_charm
838 )
839
840 # Setup the runtime parameters for this VNF
841 params = {'rw_mgmt_ip': mgmt_ip_address}
842 if config_primitive:
843 params["initial-config-primitive"] = config_primitive
844
845 # ns_name will be ignored in the current version of N2VC
846 # but will be implemented for the next point release.
847 model_name = "default" # TODO bug 585 nsr_id
848 if vdu_id:
849 vdu_id_text = vdu_id + "-"
850 else:
851 vdu_id_text = "-"
852 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
853
854 vca_index = len(vca_deployed_list)
855 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
856 # 26*26 charm in the same NS
857 application_name = application_name[0:48]
858 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
859 vca_deployed_ = {
860 "member-vnf-index": vnf_index,
861 "vdu_id": vdu_id,
862 "model": model_name,
863 "application": application_name,
864 "operational-status": "init",
865 "detailed-status": "",
866 "vnfd_id": vnfd_id,
867 "vdu_name": vdu_name,
868 "vdu_count_index": vdu_count_index,
869 }
870 vca_deployed_list.append(vca_deployed_)
871 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
872 self.update_db_2("nsrs", nsr_id, db_nsr_update)
873
874 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
875 proxy_charm))
876 if not n2vc_info:
877 n2vc_info["nsr_id"] = nsr_id
878 n2vc_info["nslcmop_id"] = nslcmop_id
879 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
880 n2vc_info["lcmOperationType"] = "instantiate"
881 n2vc_info["deployed"] = vca_deployed_list
882 n2vc_info["db_update"] = db_nsr_update
883 task = asyncio.ensure_future(
884 self.n2vc.DeployCharms(
885 model_name, # The network service name
886 application_name, # The application name
887 vnfd, # The vnf descriptor
888 charm_path, # Path to charm
889 params, # Runtime params, like mgmt ip
890 {}, # for native charms only
891 self.n2vc_callback, # Callback for status changes
892 n2vc_info, # Callback parameter
893 None, # Callback parameter (task)
894 )
895 )
896 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
897 n2vc_info))
898 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
899
900 step = "Looking for needed vnfd to configure"
901 self.logger.debug(logging_text + step)
902
903 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
904 vnfd_id = c_vnf["vnfd-id-ref"]
905 vnf_index = str(c_vnf["member-vnf-index"])
906 vnfd = db_vnfds_ref[vnfd_id]
907
908 # Check if this VNF has a charm configuration
909 vnf_config = vnfd.get("vnf-configuration")
910
911 if vnf_config and vnf_config.get("juju"):
912 proxy_charm = vnf_config["juju"]["charm"]
913 config_primitive = None
914
915 if proxy_charm:
916 if 'initial-config-primitive' in vnf_config:
917 config_primitive = vnf_config['initial-config-primitive']
918
919 # Login to the VCA. If there are multiple calls to login(),
920 # subsequent calls will be a nop and return immediately.
921 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
922 await self.n2vc.login()
923 deploy_charm(vnf_index, None, None, None, db_vnfrs[vnf_index]["ip-address"], n2vc_info,
924 config_primitive)
925 number_to_configure += 1
926
927 # Deploy charms for each VDU that supports one.
928 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
929 vdu_config = vdu.get('vdu-configuration')
930 proxy_charm = None
931 config_primitive = None
932
933 if vdu_config and vdu_config.get("juju"):
934 proxy_charm = vdu_config["juju"]["charm"]
935
936 if 'initial-config-primitive' in vdu_config:
937 config_primitive = vdu_config['initial-config-primitive']
938
939 if proxy_charm:
940 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
941 await self.n2vc.login()
942 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
943 # TODO for the moment only first vdu_id contains a charm deployed
944 if vdur["vdu-id-ref"] != vdu["id"]:
945 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
946 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
947 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
948 vdur["ip-address"], n2vc_info, config_primitive)
949 number_to_configure += 1
950
951 db_nsr_update["operational-status"] = "running"
952 configuration_failed = False
953 if number_to_configure:
954 old_status = "configuring: init: {}".format(number_to_configure)
955 db_nsr_update["config-status"] = old_status
956 db_nsr_update["detailed-status"] = old_status
957 db_nslcmop_update["detailed-status"] = old_status
958
959 # wait until all are configured.
960 while time() <= start_deploy + self.total_deploy_timeout:
961 if db_nsr_update:
962 self.update_db_2("nsrs", nsr_id, db_nsr_update)
963 if db_nslcmop_update:
964 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
965 # TODO add a fake tast that set n2vc_event after some time
966 await n2vc_info["n2vc_event"].wait()
967 n2vc_info["n2vc_event"].clear()
968 all_active = True
969 status_map = {}
970 n2vc_error_text = [] # contain text error list. If empty no one is in error status
971 now = time()
972 for vca_deployed in vca_deployed_list:
973 vca_status = vca_deployed["operational-status"]
974 if vca_status not in status_map:
975 # Initialize it
976 status_map[vca_status] = 0
977 status_map[vca_status] += 1
978
979 if vca_status == "active":
980 vca_deployed.pop("time_first_error", None)
981 vca_deployed.pop("status_first_error", None)
982 continue
983
984 all_active = False
985 if vca_status in ("error", "blocked"):
986 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
987 # if not first time in this status error
988 if not vca_deployed.get("time_first_error"):
989 vca_deployed["time_first_error"] = now
990 continue
991 if vca_deployed.get("time_first_error") and \
992 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
993 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
994 .format(vca_deployed["member-vnf-index"],
995 vca_deployed["vdu_id"], vca_status,
996 vca_deployed["detailed-status-error"]))
997
998 if all_active:
999 break
1000 elif n2vc_error_text:
1001 db_nsr_update["config-status"] = "failed"
1002 error_text = "fail configuring " + ";".join(n2vc_error_text)
1003 db_nsr_update["detailed-status"] = error_text
1004 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1005 db_nslcmop_update["detailed-status"] = error_text
1006 db_nslcmop_update["statusEnteredTime"] = time()
1007 configuration_failed = True
1008 break
1009 else:
1010 cs = "configuring: "
1011 separator = ""
1012 for status, num in status_map.items():
1013 cs += separator + "{}: {}".format(status, num)
1014 separator = ", "
1015 if old_status != cs:
1016 db_nsr_update["config-status"] = cs
1017 db_nsr_update["detailed-status"] = cs
1018 db_nslcmop_update["detailed-status"] = cs
1019 old_status = cs
1020 else: # total_deploy_timeout
1021 raise LcmException("Timeout waiting ns to be configured")
1022
1023 if not configuration_failed:
1024 # all is done
1025 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1026 db_nslcmop_update["statusEnteredTime"] = time()
1027 db_nslcmop_update["detailed-status"] = "done"
1028 db_nsr_update["config-status"] = "configured"
1029 db_nsr_update["detailed-status"] = "done"
1030
1031 return
1032
1033 except (ROclient.ROClientException, DbException, LcmException) as e:
1034 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1035 exc = e
1036 except asyncio.CancelledError:
1037 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1038 exc = "Operation was cancelled"
1039 except Exception as e:
1040 exc = traceback.format_exc()
1041 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1042 exc_info=True)
1043 finally:
1044 if exc:
1045 if db_nsr:
1046 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1047 db_nsr_update["operational-status"] = "failed"
1048 if db_nslcmop:
1049 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1050 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1051 db_nslcmop_update["statusEnteredTime"] = time()
1052 try:
1053 if db_nsr:
1054 db_nsr_update["_admin.nslcmop"] = None
1055 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1056 if db_nslcmop_update:
1057 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1058 except DbException as e:
1059 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1060 if nslcmop_operation_state:
1061 try:
1062 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1063 "operationState": nslcmop_operation_state})
1064 except Exception as e:
1065 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1066
1067 self.logger.debug(logging_text + "Exit")
1068 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1069
1070 async def _destroy_charm(self, model, application):
1071 """
1072 Order N2VC destroy a charm
1073 :param model:
1074 :param application:
1075 :return: True if charm does not exist. False if it exist
1076 """
1077 if not await self.n2vc.HasApplication(model, application):
1078 return True # Already removed
1079 await self.n2vc.RemoveCharms(model, application)
1080 return False
1081
1082 async def _wait_charm_destroyed(self, model, application, timeout):
1083 """
1084 Wait until charm does not exist
1085 :param model:
1086 :param application:
1087 :param timeout:
1088 :return: True if not exist, False if timeout
1089 """
1090 while True:
1091 if not await self.n2vc.HasApplication(model, application):
1092 return True
1093 if timeout < 0:
1094 return False
1095 await asyncio.sleep(10)
1096 timeout -= 10
1097
1098 async def terminate(self, nsr_id, nslcmop_id):
1099 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1100 self.logger.debug(logging_text + "Enter")
1101 db_nsr = None
1102 db_nslcmop = None
1103 exc = None
1104 failed_detail = [] # annotates all failed error messages
1105 vca_time_destroy = None # time of where destroy charm order
1106 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1107 db_nslcmop_update = {}
1108 nslcmop_operation_state = None
1109 try:
1110 step = "Getting nslcmop={} from db".format(nslcmop_id)
1111 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1112 step = "Getting nsr={} from db".format(nsr_id)
1113 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1114 # nsd = db_nsr["nsd"]
1115 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1116 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1117 return
1118 # #TODO check if VIM is creating and wait
1119 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1120
1121 db_nsr_update["operational-status"] = "terminating"
1122 db_nsr_update["config-status"] = "terminating"
1123
1124 if nsr_deployed and nsr_deployed.get("VCA"):
1125 try:
1126 step = "Scheduling configuration charms removing"
1127 db_nsr_update["detailed-status"] = "Deleting charms"
1128 self.logger.debug(logging_text + step)
1129 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1130 # for backward compatibility
1131 if isinstance(nsr_deployed["VCA"], dict):
1132 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1133 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1134 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1135
1136 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1137 if vca_deployed:
1138 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1139 vca_deployed.clear()
1140 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1141 else:
1142 vca_time_destroy = time()
1143 except Exception as e:
1144 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1145
1146 # remove from RO
1147 RO_fail = False
1148 RO = ROclient.ROClient(self.loop, **self.ro_config)
1149
1150 # Delete ns
1151 RO_nsr_id = RO_delete_action = None
1152 if nsr_deployed and nsr_deployed.get("RO"):
1153 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1154 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1155 try:
1156 if RO_nsr_id:
1157 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1158 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1159 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1160 self.logger.debug(logging_text + step)
1161 desc = await RO.delete("ns", RO_nsr_id)
1162 RO_delete_action = desc["action_id"]
1163 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1164 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1165 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1166 if RO_delete_action:
1167 # wait until NS is deleted from VIM
1168 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1169 format(RO_nsr_id, RO_delete_action)
1170 detailed_status_old = None
1171 self.logger.debug(logging_text + step)
1172
1173 delete_timeout = 20 * 60 # 20 minutes
1174 while delete_timeout > 0:
1175 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1176 extra_item_id=RO_delete_action)
1177 ns_status, ns_status_info = RO.check_action_status(desc)
1178 if ns_status == "ERROR":
1179 raise ROclient.ROClientException(ns_status_info)
1180 elif ns_status == "BUILD":
1181 detailed_status = step + "; {}".format(ns_status_info)
1182 elif ns_status == "ACTIVE":
1183 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1184 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1185 break
1186 else:
1187 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1188 if detailed_status != detailed_status_old:
1189 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1190 db_nsr_update["detailed-status"] = detailed_status
1191 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1192 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1193 await asyncio.sleep(5, loop=self.loop)
1194 delete_timeout -= 5
1195 else: # delete_timeout <= 0:
1196 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1197
1198 except ROclient.ROClientException as e:
1199 if e.http_code == 404: # not found
1200 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1201 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1202 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1203 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1204 elif e.http_code == 409: # conflict
1205 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1206 self.logger.debug(logging_text + failed_detail[-1])
1207 RO_fail = True
1208 else:
1209 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1210 self.logger.error(logging_text + failed_detail[-1])
1211 RO_fail = True
1212
1213 # Delete nsd
1214 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1215 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1216 try:
1217 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1218 "Deleting nsd at RO"
1219 await RO.delete("nsd", RO_nsd_id)
1220 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1221 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1222 except ROclient.ROClientException as e:
1223 if e.http_code == 404: # not found
1224 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1225 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1226 elif e.http_code == 409: # conflict
1227 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1228 self.logger.debug(logging_text + failed_detail[-1])
1229 RO_fail = True
1230 else:
1231 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1232 self.logger.error(logging_text + failed_detail[-1])
1233 RO_fail = True
1234
1235 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd_id"):
1236 for vnf_id, RO_vnfd_id in nsr_deployed["RO"]["vnfd_id"].items():
1237 if not RO_vnfd_id:
1238 continue
1239 try:
1240 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1241 "Deleting vnfd={} at RO".format(vnf_id)
1242 await RO.delete("vnfd", RO_vnfd_id)
1243 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1244 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1245 except ROclient.ROClientException as e:
1246 if e.http_code == 404: # not found
1247 db_nsr_update["_admin.deployed.RO.vnfd_id.{}".format(vnf_id)] = None
1248 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1249 elif e.http_code == 409: # conflict
1250 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1251 self.logger.debug(logging_text + failed_detail[-1])
1252 else:
1253 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1254 self.logger.error(logging_text + failed_detail[-1])
1255
1256 # wait until charm deleted
1257 if vca_time_destroy:
1258 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1259 "Waiting for deletion of configuration charms"
1260 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1261 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1262 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1263 if not vca_deployed:
1264 continue
1265 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1266 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1267 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1268 timeout):
1269 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1270 vca_deployed["application"]))
1271 else:
1272 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1273
1274 if failed_detail:
1275 self.logger.error(logging_text + " ;".join(failed_detail))
1276 db_nsr_update["operational-status"] = "failed"
1277 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1278 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1279 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1280 db_nslcmop_update["statusEnteredTime"] = time()
1281 elif db_nslcmop["operationParams"].get("autoremove"):
1282 self.db.del_one("nsrs", {"_id": nsr_id})
1283 db_nsr = None
1284 db_nsr_update.clear()
1285 self.db.del_list("nslcmops", {"nsInstanceId": nsr_id})
1286 db_nslcmop = None
1287 nslcmop_operation_state = "COMPLETED"
1288 db_nslcmop_update.clear()
1289 self.db.del_list("vnfrs", {"nsr-id-ref": nsr_id})
1290 self.db.set_list("pdus", {"_admin.usage.nsr_id": nsr_id},
1291 {"_admin.usageState": "NOT_IN_USE", "_admin.usage": None})
1292 self.logger.debug(logging_text + "Delete from database")
1293 else:
1294 db_nsr_update["operational-status"] = "terminated"
1295 db_nsr_update["detailed-status"] = "Done"
1296 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1297 db_nslcmop_update["detailed-status"] = "Done"
1298 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1299 db_nslcmop_update["statusEnteredTime"] = time()
1300
1301 except (ROclient.ROClientException, DbException) as e:
1302 self.logger.error(logging_text + "Exit Exception {}".format(e))
1303 exc = e
1304 except asyncio.CancelledError:
1305 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1306 exc = "Operation was cancelled"
1307 except Exception as e:
1308 exc = traceback.format_exc()
1309 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1310 finally:
1311 if exc and db_nslcmop:
1312 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1313 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1314 db_nslcmop_update["statusEnteredTime"] = time()
1315 try:
1316 if db_nslcmop and db_nslcmop_update:
1317 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1318 if db_nsr:
1319 db_nsr_update["_admin.nslcmop"] = None
1320 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1321 except DbException as e:
1322 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1323 if nslcmop_operation_state:
1324 try:
1325 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1326 "operationState": nslcmop_operation_state})
1327 except Exception as e:
1328 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1329 self.logger.debug(logging_text + "Exit")
1330 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1331
1332 async def _ns_execute_primitive(self, db_deployed, nsr_name, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1333 primitive, primitive_params):
1334
1335 for vca_deployed in db_deployed["VCA"]:
1336 if not vca_deployed:
1337 continue
1338 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1339 continue
1340 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1341 continue
1342 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1343 continue
1344 break
1345 else:
1346 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not deployed"
1347 .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1348 model_name = vca_deployed.get("model")
1349 application_name = vca_deployed.get("application")
1350 if not model_name or not application_name:
1351 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not model "
1352 "or application name" .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1353 if vca_deployed["operational-status"] != "active":
1354 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1355 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1356 callback = None # self.n2vc_callback
1357 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1358 await self.n2vc.login()
1359 task = asyncio.ensure_future(
1360 self.n2vc.ExecutePrimitive(
1361 model_name,
1362 application_name,
1363 primitive, callback,
1364 *callback_args,
1365 **primitive_params
1366 )
1367 )
1368 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1369 # db_nsr, db_nslcmop, member_vnf_index))
1370 # self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "action:" + primitive, task)
1371 # wait until completed with timeout
1372 await asyncio.wait((task,), timeout=600)
1373
1374 result = "FAILED" # by default
1375 result_detail = ""
1376 if task.cancelled():
1377 result_detail = "Task has been cancelled"
1378 elif task.done():
1379 exc = task.exception()
1380 if exc:
1381 result_detail = str(exc)
1382 else:
1383 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1384 result = "COMPLETED"
1385 result_detail = "Done"
1386 else: # timeout
1387 # TODO Should it be cancelled?!!
1388 task.cancel()
1389 result_detail = "timeout"
1390 return result, result_detail
1391
1392 async def action(self, nsr_id, nslcmop_id):
1393 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1394 self.logger.debug(logging_text + "Enter")
1395 # get all needed from database
1396 db_nsr = None
1397 db_nslcmop = None
1398 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1399 db_nslcmop_update = {}
1400 nslcmop_operation_state = None
1401 exc = None
1402 try:
1403 step = "Getting information from database"
1404 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1405 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1406 nsr_deployed = db_nsr["_admin"].get("deployed")
1407 nsr_name = db_nsr["name"]
1408 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1409 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1410 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1411 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1412
1413 # look if previous tasks in process
1414 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1415 if task_dependency:
1416 step = db_nslcmop_update["detailed-status"] = \
1417 "Waiting for related tasks to be completed: {}".format(task_name)
1418 self.logger.debug(logging_text + step)
1419 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1420 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1421 if pending:
1422 raise LcmException("Timeout waiting related tasks to be completed")
1423
1424 # for backward compatibility
1425 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1426 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1427 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1428 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1429
1430 # TODO check if ns is in a proper status
1431 primitive = db_nslcmop["operationParams"]["primitive"]
1432 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1433 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index, vdu_id,
1434 vdu_name, vdu_count_index, primitive,
1435 primitive_params)
1436 db_nslcmop_update["detailed-status"] = result_detail
1437 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1438 db_nslcmop_update["statusEnteredTime"] = time()
1439 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1440 return # database update is called inside finally
1441
1442 except (DbException, LcmException) as e:
1443 self.logger.error(logging_text + "Exit Exception {}".format(e))
1444 exc = e
1445 except asyncio.CancelledError:
1446 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1447 exc = "Operation was cancelled"
1448 except Exception as e:
1449 exc = traceback.format_exc()
1450 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1451 finally:
1452 if exc and db_nslcmop:
1453 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1454 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1455 db_nslcmop_update["statusEnteredTime"] = time()
1456 try:
1457 if db_nslcmop_update:
1458 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1459 if db_nsr:
1460 db_nsr_update["_admin.nslcmop"] = None
1461 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1462 except DbException as e:
1463 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1464 self.logger.debug(logging_text + "Exit")
1465 if nslcmop_operation_state:
1466 try:
1467 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1468 "operationState": nslcmop_operation_state})
1469 except Exception as e:
1470 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1471 self.logger.debug(logging_text + "Exit")
1472 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1473
1474 async def scale(self, nsr_id, nslcmop_id):
1475 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1476 self.logger.debug(logging_text + "Enter")
1477 # get all needed from database
1478 db_nsr = None
1479 db_nslcmop = None
1480 db_nslcmop_update = {}
1481 nslcmop_operation_state = None
1482 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1483 exc = None
1484 # in case of error, indicates what part of scale was failed to put nsr at error status
1485 scale_process = None
1486 old_operational_status = ""
1487 old_config_status = ""
1488 vnfr_scaled = False
1489 try:
1490 step = "Getting nslcmop from database"
1491 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1492 step = "Getting nsr from database"
1493 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1494 nsr_name = db_nsr["name"]
1495 old_operational_status = db_nsr["operational-status"]
1496 old_config_status = db_nsr["config-status"]
1497
1498 # look if previous tasks in process
1499 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1500 if task_dependency:
1501 step = db_nslcmop_update["detailed-status"] = \
1502 "Waiting for related tasks to be completed: {}".format(task_name)
1503 self.logger.debug(logging_text + step)
1504 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1505 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1506 if pending:
1507 raise LcmException("Timeout waiting related tasks to be completed")
1508
1509 step = "Parsing scaling parameters"
1510 db_nsr_update["operational-status"] = "scaling"
1511 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1512 nsr_deployed = db_nsr["_admin"].get("deployed")
1513 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1514 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1515 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1516 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1517 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1518
1519 # for backward compatibility
1520 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1521 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1522 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1523 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1524
1525 step = "Getting vnfr from database"
1526 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1527 step = "Getting vnfd from database"
1528 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1529 step = "Getting scaling-group-descriptor"
1530 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1531 if scaling_descriptor["name"] == scaling_group:
1532 break
1533 else:
1534 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1535 "at vnfd:scaling-group-descriptor".format(scaling_group))
1536 # cooldown_time = 0
1537 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1538 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1539 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1540 # break
1541
1542 # TODO check if ns is in a proper status
1543 step = "Sending scale order to RO"
1544 nb_scale_op = 0
1545 if not db_nsr["_admin"].get("scaling-group"):
1546 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1547 admin_scale_index = 0
1548 else:
1549 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1550 if admin_scale_info["name"] == scaling_group:
1551 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1552 break
1553 else: # not found, set index one plus last element and add new entry with the name
1554 admin_scale_index += 1
1555 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1556 RO_scaling_info = []
1557 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1558 if scaling_type == "SCALE_OUT":
1559 # count if max-instance-count is reached
1560 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1561 max_instance_count = int(scaling_descriptor["max-instance-count"])
1562 if nb_scale_op >= max_instance_count:
1563 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1564 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1565 nb_scale_op = nb_scale_op + 1
1566 vdu_scaling_info["scaling_direction"] = "OUT"
1567 vdu_scaling_info["vdu-create"] = {}
1568 for vdu_scale_info in scaling_descriptor["vdu"]:
1569 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1570 "type": "create", "count": vdu_scale_info.get("count", 1)})
1571 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1572 elif scaling_type == "SCALE_IN":
1573 # count if min-instance-count is reached
1574 min_instance_count = 0
1575 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1576 min_instance_count = int(scaling_descriptor["min-instance-count"])
1577 if nb_scale_op <= min_instance_count:
1578 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1579 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1580 nb_scale_op = nb_scale_op - 1
1581 vdu_scaling_info["scaling_direction"] = "IN"
1582 vdu_scaling_info["vdu-delete"] = {}
1583 for vdu_scale_info in scaling_descriptor["vdu"]:
1584 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1585 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1586 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1587
1588 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1589 vdu_create = vdu_scaling_info.get("vdu-create")
1590 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1591 if vdu_scaling_info["scaling_direction"] == "IN":
1592 for vdur in reversed(db_vnfr["vdur"]):
1593 if vdu_delete.get(vdur["vdu-id-ref"]):
1594 vdu_delete[vdur["vdu-id-ref"]] -= 1
1595 vdu_scaling_info["vdu"].append({
1596 "name": vdur["name"],
1597 "vdu_id": vdur["vdu-id-ref"],
1598 "interface": []
1599 })
1600 for interface in vdur["interfaces"]:
1601 vdu_scaling_info["vdu"][-1]["interface"].append({
1602 "name": interface["name"],
1603 "ip_address": interface["ip-address"],
1604 "mac_address": interface.get("mac-address"),
1605 })
1606 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1607
1608 # execute primitive service PRE-SCALING
1609 step = "Executing pre-scale vnf-config-primitive"
1610 if scaling_descriptor.get("scaling-config-action"):
1611 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1612 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1613 and scaling_type == "SCALE_IN":
1614 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1615 step = db_nslcmop_update["detailed-status"] = \
1616 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1617 # look for primitive
1618 primitive_params = {}
1619 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1620 if config_primitive["name"] == vnf_config_primitive:
1621 for parameter in config_primitive.get("parameter", ()):
1622 if 'default-value' in parameter and \
1623 parameter['default-value'] == "<VDU_SCALE_INFO>":
1624 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1625 default_flow_style=True,
1626 width=256)
1627 break
1628 else:
1629 raise LcmException(
1630 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1631 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
1632 "primitive".format(scaling_group, config_primitive))
1633 scale_process = "VCA"
1634 db_nsr_update["config-status"] = "configuring pre-scaling"
1635 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1636 None, None, None, vnf_config_primitive,
1637 primitive_params)
1638 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1639 vnf_config_primitive, result, result_detail))
1640 if result == "FAILED":
1641 raise LcmException(result_detail)
1642 db_nsr_update["config-status"] = old_config_status
1643 scale_process = None
1644
1645 if RO_scaling_info:
1646 scale_process = "RO"
1647 RO = ROclient.ROClient(self.loop, **self.ro_config)
1648 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1649 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1650 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1651 # wait until ready
1652 RO_nslcmop_id = RO_desc["instance_action_id"]
1653 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1654
1655 RO_task_done = False
1656 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1657 detailed_status_old = None
1658 self.logger.debug(logging_text + step)
1659
1660 deployment_timeout = 1 * 3600 # One hour
1661 while deployment_timeout > 0:
1662 if not RO_task_done:
1663 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1664 extra_item_id=RO_nslcmop_id)
1665 ns_status, ns_status_info = RO.check_action_status(desc)
1666 if ns_status == "ERROR":
1667 raise ROclient.ROClientException(ns_status_info)
1668 elif ns_status == "BUILD":
1669 detailed_status = step + "; {}".format(ns_status_info)
1670 elif ns_status == "ACTIVE":
1671 RO_task_done = True
1672 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1673 self.logger.debug(logging_text + step)
1674 else:
1675 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1676 else:
1677 desc = await RO.show("ns", RO_nsr_id)
1678 ns_status, ns_status_info = RO.check_ns_status(desc)
1679 if ns_status == "ERROR":
1680 raise ROclient.ROClientException(ns_status_info)
1681 elif ns_status == "BUILD":
1682 detailed_status = step + "; {}".format(ns_status_info)
1683 elif ns_status == "ACTIVE":
1684 step = detailed_status = \
1685 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1686 if not vnfr_scaled:
1687 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1688 vnfr_scaled = True
1689 try:
1690 desc = await RO.show("ns", RO_nsr_id)
1691 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1692 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1693 break
1694 except LcmExceptionNoMgmtIP:
1695 pass
1696 else:
1697 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1698 if detailed_status != detailed_status_old:
1699 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1700 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1701
1702 await asyncio.sleep(5, loop=self.loop)
1703 deployment_timeout -= 5
1704 if deployment_timeout <= 0:
1705 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1706
1707 # update VDU_SCALING_INFO with the obtained ip_addresses
1708 if vdu_scaling_info["scaling_direction"] == "OUT":
1709 for vdur in reversed(db_vnfr["vdur"]):
1710 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1711 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1712 vdu_scaling_info["vdu"].append({
1713 "name": vdur["name"],
1714 "vdu_id": vdur["vdu-id-ref"],
1715 "interface": []
1716 })
1717 for interface in vdur["interfaces"]:
1718 vdu_scaling_info["vdu"][-1]["interface"].append({
1719 "name": interface["name"],
1720 "ip_address": interface["ip-address"],
1721 "mac_address": interface.get("mac-address"),
1722 })
1723 del vdu_scaling_info["vdu-create"]
1724
1725 scale_process = None
1726 if db_nsr_update:
1727 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1728
1729 # execute primitive service POST-SCALING
1730 step = "Executing post-scale vnf-config-primitive"
1731 if scaling_descriptor.get("scaling-config-action"):
1732 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1733 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1734 and scaling_type == "SCALE_OUT":
1735 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1736 step = db_nslcmop_update["detailed-status"] = \
1737 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1738 # look for primitive
1739 primitive_params = {}
1740 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1741 if config_primitive["name"] == vnf_config_primitive:
1742 for parameter in config_primitive.get("parameter", ()):
1743 if 'default-value' in parameter and \
1744 parameter['default-value'] == "<VDU_SCALE_INFO>":
1745 primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
1746 default_flow_style=True,
1747 width=256)
1748 break
1749 else:
1750 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1751 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1752 "match any vnf-configuration:config-primitive".format(scaling_group,
1753 config_primitive))
1754 scale_process = "VCA"
1755 db_nsr_update["config-status"] = "configuring post-scaling"
1756
1757 result, result_detail = await self._ns_execute_primitive(nsr_deployed, nsr_name, vnf_index,
1758 None, None, None, vnf_config_primitive,
1759 primitive_params)
1760 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1761 vnf_config_primitive, result, result_detail))
1762 if result == "FAILED":
1763 raise LcmException(result_detail)
1764 db_nsr_update["config-status"] = old_config_status
1765 scale_process = None
1766
1767 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1768 db_nslcmop_update["statusEnteredTime"] = time()
1769 db_nslcmop_update["detailed-status"] = "done"
1770 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1771 db_nsr_update["operational-status"] = old_operational_status
1772 db_nsr_update["config-status"] = old_config_status
1773 return
1774 except (ROclient.ROClientException, DbException, LcmException) as e:
1775 self.logger.error(logging_text + "Exit Exception {}".format(e))
1776 exc = e
1777 except asyncio.CancelledError:
1778 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1779 exc = "Operation was cancelled"
1780 except Exception as e:
1781 exc = traceback.format_exc()
1782 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1783 finally:
1784 if exc:
1785 if db_nslcmop:
1786 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1787 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1788 db_nslcmop_update["statusEnteredTime"] = time()
1789 if db_nsr:
1790 db_nsr_update["operational-status"] = old_operational_status
1791 db_nsr_update["config-status"] = old_config_status
1792 db_nsr_update["detailed-status"] = ""
1793 db_nsr_update["_admin.nslcmop"] = None
1794 if scale_process:
1795 if "VCA" in scale_process:
1796 db_nsr_update["config-status"] = "failed"
1797 if "RO" in scale_process:
1798 db_nsr_update["operational-status"] = "failed"
1799 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1800 exc)
1801 try:
1802 if db_nslcmop and db_nslcmop_update:
1803 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1804 if db_nsr:
1805 db_nsr_update["_admin.nslcmop"] = None
1806 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1807 except DbException as e:
1808 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1809 if nslcmop_operation_state:
1810 try:
1811 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1812 "operationState": nslcmop_operation_state})
1813 # if cooldown_time:
1814 # await asyncio.sleep(cooldown_time)
1815 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1816 except Exception as e:
1817 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1818 self.logger.debug(logging_text + "Exit")
1819 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")