5d34b05f02c4737f8e2a0ac542a211e4374286d7
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import functools
24 import traceback
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 import ROclient
28 from lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase
29
30 from osm_common.dbbase import DbException
31 from osm_common.fsbase import FsException
32 from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed
33
34 from copy import copy, deepcopy
35 from http import HTTPStatus
36 from time import time
37 from uuid import uuid4
38
39 __author__ = "Alfonso Tierno"
40
41
42 def get_iterable(in_dict, in_key):
43 """
44 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
45 :param in_dict: a dictionary
46 :param in_key: the key to look for at in_dict
47 :return: in_dict[in_var] or () if it is None or not present
48 """
49 if not in_dict.get(in_key):
50 return ()
51 return in_dict[in_key]
52
53
54 def populate_dict(target_dict, key_list, value):
55 """
56 Upate target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
57 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
58 :param target_dict: dictionary to be changed
59 :param key_list: list of keys to insert at target_dict
60 :param value:
61 :return: None
62 """
63 for key in key_list[0:-1]:
64 if key not in target_dict:
65 target_dict[key] = {}
66 target_dict = target_dict[key]
67 target_dict[key_list[-1]] = value
68
69
70 class NsLcm(LcmBase):
71 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
72 total_deploy_timeout = 2 * 3600 # global timeout for deployment
73 timeout_charm_delete = 10 * 60
74 timeout_primitive = 10 * 60 # timeout for primitive execution
75
76 def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 # logging
83 self.logger = logging.getLogger('lcm.ns')
84 self.loop = loop
85 self.lcm_tasks = lcm_tasks
86
87 super().__init__(db, msg, fs, self.logger)
88
89 self.ro_config = ro_config
90
91 self.n2vc = N2VC(
92 log=self.logger,
93 server=vca_config['host'],
94 port=vca_config['port'],
95 user=vca_config['user'],
96 secret=vca_config['secret'],
97 # TODO: This should point to the base folder where charms are stored,
98 # if there is a common one (like object storage). Otherwise, leave
99 # it unset and pass it via DeployCharms
100 # artifacts=vca_config[''],
101 artifacts=None,
102 )
103
104 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
105 """
106 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
107 :param vnfd: input vnfd
108 :param new_id: overrides vnf id if provided
109 :param additionalParams: Instantiation params for VNFs provided
110 :param nsrId: Id of the NSR
111 :return: copy of vnfd
112 """
113 try:
114 vnfd_RO = deepcopy(vnfd)
115 # remove unused by RO configuration, monitoring, scaling and internal keys
116 vnfd_RO.pop("_id", None)
117 vnfd_RO.pop("_admin", None)
118 vnfd_RO.pop("vnf-configuration", None)
119 vnfd_RO.pop("monitoring-param", None)
120 vnfd_RO.pop("scaling-group-descriptor", None)
121 if new_id:
122 vnfd_RO["id"] = new_id
123
124 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
125 for vdu in get_iterable(vnfd_RO, "vdu"):
126 cloud_init_file = None
127 if vdu.get("cloud-init-file"):
128 base_folder = vnfd["_admin"]["storage"]
129 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
130 vdu["cloud-init-file"])
131 with self.fs.file_open(cloud_init_file, "r") as ci_file:
132 cloud_init_content = ci_file.read()
133 vdu.pop("cloud-init-file", None)
134 elif vdu.get("cloud-init"):
135 cloud_init_content = vdu["cloud-init"]
136 else:
137 continue
138
139 env = Environment()
140 ast = env.parse(cloud_init_content)
141 mandatory_vars = meta.find_undeclared_variables(ast)
142 if mandatory_vars:
143 for var in mandatory_vars:
144 if not additionalParams or var not in additionalParams.keys():
145 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
146 "file, must be provided in the instantiation parameters inside the "
147 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
148 template = Template(cloud_init_content)
149 cloud_init_content = template.render(additionalParams or {})
150 vdu["cloud-init"] = cloud_init_content
151
152 return vnfd_RO
153 except FsException as e:
154 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
155 format(vnfd["id"], vdu["id"], cloud_init_file, e))
156 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
157 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
158 format(vnfd["id"], vdu["id"], e))
159
160 def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None):
161 """
162 Callback both for charm status change and task completion
163 :param model_name: Charm model name
164 :param application_name: Charm application name
165 :param status: Can be
166 - blocked: The unit needs manual intervention
167 - maintenance: The unit is actively deploying/configuring
168 - waiting: The unit is waiting for another charm to be ready
169 - active: The unit is deployed, configured, and ready
170 - error: The charm has failed and needs attention.
171 - terminated: The charm has been destroyed
172 - removing,
173 - removed
174 :param message: detailed message error
175 :param n2vc_info: dictionary with information shared with instantiate task. It contains:
176 nsr_id:
177 nslcmop_id:
178 lcmOperationType: currently "instantiate"
179 deployed: dictionary with {<application>: {operational-status: <status>, detailed-status: <text>}}
180 db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value
181 n2vc_event: event used to notify instantiation task that some change has been produced
182 :param task: None for charm status change, or task for completion task callback
183 :return:
184 """
185 try:
186 nsr_id = n2vc_info["nsr_id"]
187 deployed = n2vc_info["deployed"]
188 db_nsr_update = n2vc_info["db_update"]
189 nslcmop_id = n2vc_info["nslcmop_id"]
190 ns_operation = n2vc_info["lcmOperationType"]
191 n2vc_event = n2vc_info["n2vc_event"]
192 logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id,
193 application_name)
194 for vca_index, vca_deployed in enumerate(deployed):
195 if not vca_deployed:
196 continue
197 if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]:
198 break
199 else:
200 self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA")
201 return
202 if task:
203 if task.cancelled():
204 self.logger.debug(logging_text + " task Cancelled")
205 vca_deployed['operational-status'] = "error"
206 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
207 vca_deployed['detailed-status'] = "Task Cancelled"
208 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled"
209
210 elif task.done():
211 exc = task.exception()
212 if exc:
213 self.logger.error(logging_text + " task Exception={}".format(exc))
214 vca_deployed['operational-status'] = "error"
215 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error"
216 vca_deployed['detailed-status'] = str(exc)
217 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc)
218 else:
219 self.logger.debug(logging_text + " task Done")
220 # task is Done, but callback is still ongoing. So ignore
221 return
222 elif status:
223 self.logger.debug(logging_text + " Enter status={} message={}".format(status, message))
224 if vca_deployed['operational-status'] == status:
225 return # same status, ignore
226 vca_deployed['operational-status'] = status
227 db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status
228 vca_deployed['detailed-status'] = str(message)
229 db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message)
230 else:
231 self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True)
232 return
233 # wake up instantiate task
234 n2vc_event.set()
235 except Exception as e:
236 self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True)
237
238 def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
239 """
240 Creates a RO ns descriptor from OSM ns_instantiate params
241 :param ns_params: OSM instantiate params
242 :return: The RO ns descriptor
243 """
244 vim_2_RO = {}
245 wim_2_RO = {}
246 # TODO feature 1417: Check that no instantiation is set over PDU
247 # check if PDU forces a concrete vim-network-id and add it
248 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
249
250 def vim_account_2_RO(vim_account):
251 if vim_account in vim_2_RO:
252 return vim_2_RO[vim_account]
253
254 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
255 if db_vim["_admin"]["operationalState"] != "ENABLED":
256 raise LcmException("VIM={} is not available. operationalState={}".format(
257 vim_account, db_vim["_admin"]["operationalState"]))
258 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
259 vim_2_RO[vim_account] = RO_vim_id
260 return RO_vim_id
261
262 def wim_account_2_RO(wim_account):
263 if isinstance(wim_account, str):
264 if wim_account in wim_2_RO:
265 return wim_2_RO[wim_account]
266
267 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
268 if db_wim["_admin"]["operationalState"] != "ENABLED":
269 raise LcmException("WIM={} is not available. operationalState={}".format(
270 wim_account, db_wim["_admin"]["operationalState"]))
271 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
272 wim_2_RO[wim_account] = RO_wim_id
273 return RO_wim_id
274 else:
275 return wim_account
276
277 def ip_profile_2_RO(ip_profile):
278 RO_ip_profile = deepcopy((ip_profile))
279 if "dns-server" in RO_ip_profile:
280 if isinstance(RO_ip_profile["dns-server"], list):
281 RO_ip_profile["dns-address"] = []
282 for ds in RO_ip_profile.pop("dns-server"):
283 RO_ip_profile["dns-address"].append(ds['address'])
284 else:
285 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
286 if RO_ip_profile.get("ip-version") == "ipv4":
287 RO_ip_profile["ip-version"] = "IPv4"
288 if RO_ip_profile.get("ip-version") == "ipv6":
289 RO_ip_profile["ip-version"] = "IPv6"
290 if "dhcp-params" in RO_ip_profile:
291 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
292 return RO_ip_profile
293
294 if not ns_params:
295 return None
296 RO_ns_params = {
297 # "name": ns_params["nsName"],
298 # "description": ns_params.get("nsDescription"),
299 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
300 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
301 # "scenario": ns_params["nsdId"],
302 }
303 if n2vc_key_list:
304 for vnfd_ref, vnfd in vnfd_dict.items():
305 vdu_needed_access = []
306 mgmt_cp = None
307 if vnfd.get("vnf-configuration"):
308 if vnfd.get("mgmt-interface"):
309 if vnfd["mgmt-interface"].get("vdu-id"):
310 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
311 elif vnfd["mgmt-interface"].get("cp"):
312 mgmt_cp = vnfd["mgmt-interface"]["cp"]
313
314 for vdu in vnfd.get("vdu", ()):
315 if vdu.get("vdu-configuration"):
316 vdu_needed_access.append(vdu["id"])
317 elif mgmt_cp:
318 for vdu_interface in vdu.get("interface"):
319 if vdu_interface.get("external-connection-point-ref") and \
320 vdu_interface["external-connection-point-ref"] == mgmt_cp:
321 vdu_needed_access.append(vdu["id"])
322 mgmt_cp = None
323 break
324
325 if vdu_needed_access:
326 for vnf_member in nsd.get("constituent-vnfd"):
327 if vnf_member["vnfd-id-ref"] != vnfd_ref:
328 continue
329 for vdu in vdu_needed_access:
330 populate_dict(RO_ns_params,
331 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
332 n2vc_key_list)
333
334 if ns_params.get("vduImage"):
335 RO_ns_params["vduImage"] = ns_params["vduImage"]
336
337 if ns_params.get("ssh_keys"):
338 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
339 for vnf_params in get_iterable(ns_params, "vnf"):
340 for constituent_vnfd in nsd["constituent-vnfd"]:
341 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
342 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
343 break
344 else:
345 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
346 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
347 if vnf_params.get("vimAccountId"):
348 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "datacenter"),
349 vim_account_2_RO(vnf_params["vimAccountId"]))
350
351 for vdu_params in get_iterable(vnf_params, "vdu"):
352 # TODO feature 1417: check that this VDU exist and it is not a PDU
353 if vdu_params.get("volume"):
354 for volume_params in vdu_params["volume"]:
355 if volume_params.get("vim-volume-id"):
356 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
357 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
358 volume_params["vim-volume-id"])
359 if vdu_params.get("interface"):
360 for interface_params in vdu_params["interface"]:
361 if interface_params.get("ip-address"):
362 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
363 vdu_params["id"], "interfaces", interface_params["name"],
364 "ip_address"),
365 interface_params["ip-address"])
366 if interface_params.get("mac-address"):
367 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
368 vdu_params["id"], "interfaces", interface_params["name"],
369 "mac_address"),
370 interface_params["mac-address"])
371 if interface_params.get("floating-ip-required"):
372 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
373 vdu_params["id"], "interfaces", interface_params["name"],
374 "floating-ip"),
375 interface_params["floating-ip-required"])
376
377 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
378 if internal_vld_params.get("vim-network-name"):
379 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
380 internal_vld_params["name"], "vim-network-name"),
381 internal_vld_params["vim-network-name"])
382 if internal_vld_params.get("vim-network-id"):
383 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
384 internal_vld_params["name"], "vim-network-id"),
385 internal_vld_params["vim-network-id"])
386 if internal_vld_params.get("ip-profile"):
387 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
388 internal_vld_params["name"], "ip-profile"),
389 ip_profile_2_RO(internal_vld_params["ip-profile"]))
390
391 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
392 # look for interface
393 iface_found = False
394 for vdu_descriptor in vnf_descriptor["vdu"]:
395 for vdu_interface in vdu_descriptor["interface"]:
396 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
397 if icp_params.get("ip-address"):
398 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
399 vdu_descriptor["id"], "interfaces",
400 vdu_interface["name"], "ip_address"),
401 icp_params["ip-address"])
402
403 if icp_params.get("mac-address"):
404 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
405 vdu_descriptor["id"], "interfaces",
406 vdu_interface["name"], "mac_address"),
407 icp_params["mac-address"])
408 iface_found = True
409 break
410 if iface_found:
411 break
412 else:
413 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
414 "internal-vld:id-ref={} is not present at vnfd:internal-"
415 "connection-point".format(vnf_params["member-vnf-index"],
416 icp_params["id-ref"]))
417
418 for vld_params in get_iterable(ns_params, "vld"):
419 if "ip-profile" in vld_params:
420 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
421 ip_profile_2_RO(vld_params["ip-profile"]))
422
423 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
424 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
425 wim_account_2_RO(vld_params["wimAccountId"])),
426 if vld_params.get("vim-network-name"):
427 RO_vld_sites = []
428 if isinstance(vld_params["vim-network-name"], dict):
429 for vim_account, vim_net in vld_params["vim-network-name"].items():
430 RO_vld_sites.append({
431 "netmap-use": vim_net,
432 "datacenter": vim_account_2_RO(vim_account)
433 })
434 else: # isinstance str
435 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
436 if RO_vld_sites:
437 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
438 if vld_params.get("vim-network-id"):
439 RO_vld_sites = []
440 if isinstance(vld_params["vim-network-id"], dict):
441 for vim_account, vim_net in vld_params["vim-network-id"].items():
442 RO_vld_sites.append({
443 "netmap-use": vim_net,
444 "datacenter": vim_account_2_RO(vim_account)
445 })
446 else: # isinstance str
447 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
448 if RO_vld_sites:
449 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
450 if vld_params.get("ns-net"):
451 if isinstance(vld_params["ns-net"], dict):
452 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
453 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
454 if RO_vld_ns_net:
455 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
456 if "vnfd-connection-point-ref" in vld_params:
457 for cp_params in vld_params["vnfd-connection-point-ref"]:
458 # look for interface
459 for constituent_vnfd in nsd["constituent-vnfd"]:
460 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
461 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
462 break
463 else:
464 raise LcmException(
465 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
466 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
467 match_cp = False
468 for vdu_descriptor in vnf_descriptor["vdu"]:
469 for interface_descriptor in vdu_descriptor["interface"]:
470 if interface_descriptor.get("external-connection-point-ref") == \
471 cp_params["vnfd-connection-point-ref"]:
472 match_cp = True
473 break
474 if match_cp:
475 break
476 else:
477 raise LcmException(
478 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
479 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
480 cp_params["member-vnf-index-ref"],
481 cp_params["vnfd-connection-point-ref"],
482 vnf_descriptor["id"]))
483 if cp_params.get("ip-address"):
484 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
485 vdu_descriptor["id"], "interfaces",
486 interface_descriptor["name"], "ip_address"),
487 cp_params["ip-address"])
488 if cp_params.get("mac-address"):
489 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
490 vdu_descriptor["id"], "interfaces",
491 interface_descriptor["name"], "mac_address"),
492 cp_params["mac-address"])
493 return RO_ns_params
494
495 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
496 # make a copy to do not change
497 vdu_create = copy(vdu_create)
498 vdu_delete = copy(vdu_delete)
499
500 vdurs = db_vnfr.get("vdur")
501 if vdurs is None:
502 vdurs = []
503 vdu_index = len(vdurs)
504 while vdu_index:
505 vdu_index -= 1
506 vdur = vdurs[vdu_index]
507 if vdur.get("pdu-type"):
508 continue
509 vdu_id_ref = vdur["vdu-id-ref"]
510 if vdu_create and vdu_create.get(vdu_id_ref):
511 for index in range(0, vdu_create[vdu_id_ref]):
512 vdur = deepcopy(vdur)
513 vdur["_id"] = str(uuid4())
514 vdur["count-index"] += 1
515 vdurs.insert(vdu_index+1+index, vdur)
516 del vdu_create[vdu_id_ref]
517 if vdu_delete and vdu_delete.get(vdu_id_ref):
518 del vdurs[vdu_index]
519 vdu_delete[vdu_id_ref] -= 1
520 if not vdu_delete[vdu_id_ref]:
521 del vdu_delete[vdu_id_ref]
522 # check all operations are done
523 if vdu_create or vdu_delete:
524 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
525 vdu_create))
526 if vdu_delete:
527 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
528 vdu_delete))
529
530 vnfr_update = {"vdur": vdurs}
531 db_vnfr["vdur"] = vdurs
532 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
533
534 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
535 """
536 Updates database nsr with the RO info for the created vld
537 :param ns_update_nsr: dictionary to be filled with the updated info
538 :param db_nsr: content of db_nsr. This is also modified
539 :param nsr_desc_RO: nsr descriptor from RO
540 :return: Nothing, LcmException is raised on errors
541 """
542
543 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
544 for net_RO in get_iterable(nsr_desc_RO, "nets"):
545 if vld["id"] != net_RO.get("ns_net_osm_id"):
546 continue
547 vld["vim-id"] = net_RO.get("vim_net_id")
548 vld["name"] = net_RO.get("vim_name")
549 vld["status"] = net_RO.get("status")
550 vld["status-detailed"] = net_RO.get("error_msg")
551 ns_update_nsr["vld.{}".format(vld_index)] = vld
552 break
553 else:
554 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
555
556 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
557 """
558 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
559 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
560 :param nsr_desc_RO: nsr descriptor from RO
561 :return: Nothing, LcmException is raised on errors
562 """
563 for vnf_index, db_vnfr in db_vnfrs.items():
564 for vnf_RO in nsr_desc_RO["vnfs"]:
565 if vnf_RO["member_vnf_index"] != vnf_index:
566 continue
567 vnfr_update = {}
568 if vnf_RO.get("ip_address"):
569 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
570 elif not db_vnfr.get("ip-address"):
571 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
572
573 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
574 vdur_RO_count_index = 0
575 if vdur.get("pdu-type"):
576 continue
577 for vdur_RO in get_iterable(vnf_RO, "vms"):
578 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
579 continue
580 if vdur["count-index"] != vdur_RO_count_index:
581 vdur_RO_count_index += 1
582 continue
583 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
584 if vdur_RO.get("ip_address"):
585 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
586 else:
587 vdur["ip-address"] = None
588 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
589 vdur["name"] = vdur_RO.get("vim_name")
590 vdur["status"] = vdur_RO.get("status")
591 vdur["status-detailed"] = vdur_RO.get("error_msg")
592 for ifacer in get_iterable(vdur, "interfaces"):
593 for interface_RO in get_iterable(vdur_RO, "interfaces"):
594 if ifacer["name"] == interface_RO.get("internal_name"):
595 ifacer["ip-address"] = interface_RO.get("ip_address")
596 ifacer["mac-address"] = interface_RO.get("mac_address")
597 break
598 else:
599 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
600 "at RO info".format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
601 vnfr_update["vdur.{}".format(vdu_index)] = vdur
602 break
603 else:
604 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} at "
605 "RO info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
606
607 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
608 for net_RO in get_iterable(nsr_desc_RO, "nets"):
609 if vld["id"] != net_RO.get("vnf_net_osm_id"):
610 continue
611 vld["vim-id"] = net_RO.get("vim_net_id")
612 vld["name"] = net_RO.get("vim_name")
613 vld["status"] = net_RO.get("status")
614 vld["status-detailed"] = net_RO.get("error_msg")
615 vnfr_update["vld.{}".format(vld_index)] = vld
616 break
617 else:
618 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} at RO info".format(
619 vnf_index, vld["id"]))
620
621 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
622 break
623
624 else:
625 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
626
627 async def instantiate(self, nsr_id, nslcmop_id):
628 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
629 self.logger.debug(logging_text + "Enter")
630 # get all needed from database
631 start_deploy = time()
632 db_nsr = None
633 db_nslcmop = None
634 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
635 db_nslcmop_update = {}
636 nslcmop_operation_state = None
637 db_vnfrs = {}
638 RO_descriptor_number = 0 # number of descriptors created at RO
639 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
640 n2vc_info = {}
641 exc = None
642 try:
643 step = "Getting nslcmop={} from db".format(nslcmop_id)
644 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
645 step = "Getting nsr={} from db".format(nsr_id)
646 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
647 ns_params = db_nslcmop.get("operationParams")
648 nsd = db_nsr["nsd"]
649 nsr_name = db_nsr["name"] # TODO short-name??
650
651 # look if previous tasks in process
652 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
653 if task_dependency:
654 step = db_nslcmop_update["detailed-status"] = \
655 "Waiting for related tasks to be completed: {}".format(task_name)
656 self.logger.debug(logging_text + step)
657 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
658 _, pending = await asyncio.wait(task_dependency, timeout=3600)
659 if pending:
660 raise LcmException("Timeout waiting related tasks to be completed")
661
662 step = "Getting vnfrs from db"
663 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
664 db_vnfds_ref = {}
665 db_vnfds = {}
666 for vnfr in db_vnfrs_list:
667 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
668 vnfd_id = vnfr["vnfd-id"]
669 vnfd_ref = vnfr["vnfd-ref"]
670 if vnfd_id not in db_vnfds:
671 step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
672 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
673 db_vnfds_ref[vnfd_ref] = vnfd
674 db_vnfds[vnfd_id] = vnfd
675
676 # Get or generates the _admin.deployed,VCA list
677 vca_deployed_list = None
678 if db_nsr["_admin"].get("deployed"):
679 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
680 if vca_deployed_list is None:
681 vca_deployed_list = []
682 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
683 elif isinstance(vca_deployed_list, dict):
684 # maintain backward compatibility. Change a dict to list at database
685 vca_deployed_list = list(vca_deployed_list.values())
686 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
687
688 db_nsr_update["detailed-status"] = "creating"
689 db_nsr_update["operational-status"] = "init"
690 if not db_nsr["_admin"].get("deployed") or not db_nsr["_admin"]["deployed"].get("RO") or \
691 not db_nsr["_admin"]["deployed"]["RO"].get("vnfd"):
692 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
693 db_nsr_update["_admin.deployed.RO.vnfd"] = []
694
695 RO = ROclient.ROClient(self.loop, **self.ro_config)
696
697 # set state to INSTANTIATED. When instantiated NBI will not delete directly
698 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
699 self.update_db_2("nsrs", nsr_id, db_nsr_update)
700
701 # get vnfds, instantiate at RO
702 for c_vnf in nsd.get("constituent-vnfd", ()):
703 member_vnf_index = c_vnf["member-vnf-index"]
704 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
705 vnfd_ref = vnfd["id"]
706 step = db_nsr_update["detailed-status"] = "Creating vnfd='{}' member-vnf-index='{}' at RO".format(
707 vnfd_ref, member_vnf_index)
708 # self.logger.debug(logging_text + step)
709 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
710 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
711 RO_descriptor_number += 1
712
713 # look position at deployed.RO.vnfd if not present it will be appended at the end
714 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
715 if vnf_deployed["member-vnf-index"] == member_vnf_index:
716 break
717 else:
718 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
719 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
720
721 # look if present
722 RO_update = {"member-vnf-index": member_vnf_index}
723 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
724 if vnfd_list:
725 RO_update["id"] = vnfd_list[0]["uuid"]
726 self.logger.debug(logging_text + "vnfd='{}' member-vnf-index='{}' exists at RO. Using RO_id={}".
727 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
728 else:
729 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
730 get("additionalParamsForVnf"), nsr_id)
731 desc = await RO.create("vnfd", descriptor=vnfd_RO)
732 RO_update["id"] = desc["uuid"]
733 self.logger.debug(logging_text + "vnfd='{}' member-vnf-index='{}' created at RO. RO_id={}".format(
734 vnfd_ref, member_vnf_index, desc["uuid"]))
735 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
736 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
737 self.update_db_2("nsrs", nsr_id, db_nsr_update)
738
739 # create nsd at RO
740 nsd_ref = nsd["id"]
741 step = db_nsr_update["detailed-status"] = "Creating nsd={} at RO".format(nsd_ref)
742 # self.logger.debug(logging_text + step)
743
744 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
745 RO_descriptor_number += 1
746 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
747 if nsd_list:
748 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
749 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
750 nsd_ref, RO_nsd_uuid))
751 else:
752 nsd_RO = deepcopy(nsd)
753 nsd_RO["id"] = RO_osm_nsd_id
754 nsd_RO.pop("_id", None)
755 nsd_RO.pop("_admin", None)
756 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
757 member_vnf_index = c_vnf["member-vnf-index"]
758 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
759 for c_vld in nsd_RO.get("vld", ()):
760 for cp in c_vld.get("vnfd-connection-point-ref", ()):
761 member_vnf_index = cp["member-vnf-index-ref"]
762 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
763
764 desc = await RO.create("nsd", descriptor=nsd_RO)
765 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
766 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
767 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
768 self.update_db_2("nsrs", nsr_id, db_nsr_update)
769
770 # Crate ns at RO
771 # if present use it unless in error status
772 RO_nsr_id = db_nsr["_admin"].get("deployed", {}).get("RO", {}).get("nsr_id")
773 if RO_nsr_id:
774 try:
775 step = db_nsr_update["detailed-status"] = "Looking for existing ns at RO"
776 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
777 desc = await RO.show("ns", RO_nsr_id)
778 except ROclient.ROClientException as e:
779 if e.http_code != HTTPStatus.NOT_FOUND:
780 raise
781 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
782 if RO_nsr_id:
783 ns_status, ns_status_info = RO.check_ns_status(desc)
784 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
785 if ns_status == "ERROR":
786 step = db_nsr_update["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
787 self.logger.debug(logging_text + step)
788 await RO.delete("ns", RO_nsr_id)
789 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
790 if not RO_nsr_id:
791 step = db_nsr_update["detailed-status"] = "Checking dependencies"
792 # self.logger.debug(logging_text + step)
793
794 # check if VIM is creating and wait look if previous tasks in process
795 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
796 if task_dependency:
797 step = "Waiting for related tasks to be completed: {}".format(task_name)
798 self.logger.debug(logging_text + step)
799 await asyncio.wait(task_dependency, timeout=3600)
800 if ns_params.get("vnf"):
801 for vnf in ns_params["vnf"]:
802 if "vimAccountId" in vnf:
803 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
804 vnf["vimAccountId"])
805 if task_dependency:
806 step = "Waiting for related tasks to be completed: {}".format(task_name)
807 self.logger.debug(logging_text + step)
808 await asyncio.wait(task_dependency, timeout=3600)
809
810 step = db_nsr_update["detailed-status"] = "Checking instantiation parameters"
811
812 # feature 1429. Add n2vc public key to needed VMs
813 n2vc_key = await self.n2vc.GetPublicKey()
814 RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, [n2vc_key])
815
816 step = db_nsr_update["detailed-status"] = "Creating ns at RO"
817 desc = await RO.create("ns", descriptor=RO_ns_params,
818 name=db_nsr["name"],
819 scenario=RO_nsd_uuid)
820 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
821 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
822 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
823 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
824 self.update_db_2("nsrs", nsr_id, db_nsr_update)
825
826 # wait until NS is ready
827 step = ns_status_detailed = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
828 detailed_status_old = None
829 self.logger.debug(logging_text + step)
830
831 while time() <= start_deploy + self.total_deploy_timeout:
832 desc = await RO.show("ns", RO_nsr_id)
833 ns_status, ns_status_info = RO.check_ns_status(desc)
834 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
835 if ns_status == "ERROR":
836 raise ROclient.ROClientException(ns_status_info)
837 elif ns_status == "BUILD":
838 detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
839 elif ns_status == "ACTIVE":
840 step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
841 try:
842 self.ns_update_vnfr(db_vnfrs, desc)
843 break
844 except LcmExceptionNoMgmtIP:
845 pass
846 else:
847 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
848 if detailed_status != detailed_status_old:
849 detailed_status_old = db_nsr_update["detailed-status"] = detailed_status
850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
851 await asyncio.sleep(5, loop=self.loop)
852 else: # total_deploy_timeout
853 raise ROclient.ROClientException("Timeout waiting ns to be ready")
854
855 step = "Updating NSR"
856 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
857
858 db_nsr["detailed-status"] = "Configuring vnfr"
859 self.update_db_2("nsrs", nsr_id, db_nsr_update)
860
861 # The parameters we'll need to deploy a charm
862 number_to_configure = 0
863
864 def deploy_charm(vnf_index, vdu_id, vdu_name, vdu_count_index, charm_params, n2vc_info):
865 """An inner function to deploy the charm from either vnf or vdu
866 vnf_index is mandatory. vdu_id can be None for a vnf configuration or the id for vdu configuration
867 """
868 if not charm_params["rw_mgmt_ip"]:
869 raise LcmException("vnfd/vdu has not management ip address to configure it")
870 # Login to the VCA.
871 # if number_to_configure == 0:
872 # self.logger.debug("Logging into N2VC...")
873 # task = asyncio.ensure_future(self.n2vc.login())
874 # yield from asyncio.wait_for(task, 30.0)
875 # self.logger.debug("Logged into N2VC!")
876
877 # # await self.n2vc.login()
878
879 # Note: The charm needs to exist on disk at the location
880 # specified by charm_path.
881 base_folder = vnfd["_admin"]["storage"]
882 storage_params = self.fs.get_params()
883 charm_path = "{}{}/{}/charms/{}".format(
884 storage_params["path"],
885 base_folder["folder"],
886 base_folder["pkg-dir"],
887 proxy_charm
888 )
889
890 # ns_name will be ignored in the current version of N2VC
891 # but will be implemented for the next point release.
892 model_name = "default" # TODO bug 585 nsr_id
893 if vdu_id:
894 vdu_id_text = vdu_id + "-"
895 else:
896 vdu_id_text = "-"
897 application_name = self.n2vc.FormatApplicationName(nsr_name, vnf_index, vdu_id_text)
898
899 vca_index = len(vca_deployed_list)
900 # trunk name and add two char index at the end to ensure that it is unique. It is assumed no more than
901 # 26*26 charm in the same NS
902 application_name = application_name[0:48]
903 application_name += chr(97 + vca_index // 26) + chr(97 + vca_index % 26)
904 vca_deployed_ = {
905 "member-vnf-index": vnf_index,
906 "vdu_id": vdu_id,
907 "model": model_name,
908 "application": application_name,
909 "operational-status": "init",
910 "detailed-status": "",
911 "vnfd_id": vnfd_id,
912 "vdu_name": vdu_name,
913 "vdu_count_index": vdu_count_index,
914 }
915 vca_deployed_list.append(vca_deployed_)
916 db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = vca_deployed_
917 self.update_db_2("nsrs", nsr_id, db_nsr_update)
918
919 self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path,
920 proxy_charm))
921 if not n2vc_info:
922 n2vc_info["nsr_id"] = nsr_id
923 n2vc_info["nslcmop_id"] = nslcmop_id
924 n2vc_info["n2vc_event"] = asyncio.Event(loop=self.loop)
925 n2vc_info["lcmOperationType"] = "instantiate"
926 n2vc_info["deployed"] = vca_deployed_list
927 n2vc_info["db_update"] = db_nsr_update
928 task = asyncio.ensure_future(
929 self.n2vc.DeployCharms(
930 model_name, # The network service name
931 application_name, # The application name
932 vnfd, # The vnf descriptor
933 charm_path, # Path to charm
934 charm_params, # Runtime params, like mgmt ip
935 {}, # for native charms only
936 self.n2vc_callback, # Callback for status changes
937 n2vc_info, # Callback parameter
938 None, # Callback parameter (task)
939 )
940 )
941 task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None, None,
942 n2vc_info))
943 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "create_charm:" + application_name, task)
944
945 step = "Looking for needed vnfd to configure"
946 self.logger.debug(logging_text + step)
947
948 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
949 vnfd_id = c_vnf["vnfd-id-ref"]
950 vnf_index = str(c_vnf["member-vnf-index"])
951 vnfd = db_vnfds_ref[vnfd_id]
952
953 # Get additional parameters
954 vnfr_params = {}
955 if db_vnfrs[vnf_index].get("additionalParamsForVnf"):
956 vnfr_params = db_vnfrs[vnf_index]["additionalParamsForVnf"].copy()
957 for k, v in vnfr_params.items():
958 if isinstance(v, str) and v.startswith("!!yaml "):
959 vnfr_params[k] = yaml.safe_load(v[7:])
960
961 # Check if this VNF has a charm configuration
962 vnf_config = vnfd.get("vnf-configuration")
963 if vnf_config and vnf_config.get("juju"):
964 proxy_charm = vnf_config["juju"]["charm"]
965
966 if proxy_charm:
967 step = "connecting to N2VC to configure vnf {}".format(vnf_index)
968 vnfr_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"]
969 charm_params = {
970 "user_values": vnfr_params,
971 "rw_mgmt_ip": db_vnfrs[vnf_index]["ip-address"],
972 "initial-config-primitive": vnf_config.get('initial-config-primitive') or {}
973 }
974
975 # Login to the VCA. If there are multiple calls to login(),
976 # subsequent calls will be a nop and return immediately.
977 await self.n2vc.login()
978 deploy_charm(vnf_index, None, None, None, charm_params, n2vc_info)
979 number_to_configure += 1
980
981 # Deploy charms for each VDU that supports one.
982 for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')):
983 vdu_config = vdu.get('vdu-configuration')
984 proxy_charm = None
985
986 if vdu_config and vdu_config.get("juju"):
987 proxy_charm = vdu_config["juju"]["charm"]
988
989 if proxy_charm:
990 step = "connecting to N2VC to configure vdu {} from vnf {}".format(vdu["id"], vnf_index)
991 await self.n2vc.login()
992 vdur = db_vnfrs[vnf_index]["vdur"][vdu_index]
993 # TODO for the moment only first vdu_id contains a charm deployed
994 if vdur["vdu-id-ref"] != vdu["id"]:
995 raise LcmException("Mismatch vdur {}, vdu {} at index {} for vnf {}"
996 .format(vdur["vdu-id-ref"], vdu["id"], vdu_index, vnf_index))
997 vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
998 charm_params = {
999 "user_values": vnfr_params,
1000 "rw_mgmt_ip": vdur["ip-address"],
1001 "initial-config-primitive": vdu_config.get('initial-config-primitive') or {}
1002 }
1003 deploy_charm(vnf_index, vdu["id"], vdur.get("name"), vdur["count-index"],
1004 charm_params, n2vc_info)
1005 number_to_configure += 1
1006
1007 db_nsr_update["operational-status"] = "running"
1008 configuration_failed = False
1009 if number_to_configure:
1010 old_status = "configuring: init: {}".format(number_to_configure)
1011 db_nsr_update["config-status"] = old_status
1012 db_nsr_update["detailed-status"] = old_status
1013 db_nslcmop_update["detailed-status"] = old_status
1014
1015 # wait until all are configured.
1016 while time() <= start_deploy + self.total_deploy_timeout:
1017 if db_nsr_update:
1018 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1019 if db_nslcmop_update:
1020 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1021 # TODO add a fake task that set n2vc_event after some time
1022 await n2vc_info["n2vc_event"].wait()
1023 n2vc_info["n2vc_event"].clear()
1024 all_active = True
1025 status_map = {}
1026 n2vc_error_text = [] # contain text error list. If empty no one is in error status
1027 now = time()
1028 for vca_deployed in vca_deployed_list:
1029 vca_status = vca_deployed["operational-status"]
1030 if vca_status not in status_map:
1031 # Initialize it
1032 status_map[vca_status] = 0
1033 status_map[vca_status] += 1
1034
1035 if vca_status == "active":
1036 vca_deployed.pop("time_first_error", None)
1037 vca_deployed.pop("status_first_error", None)
1038 continue
1039
1040 all_active = False
1041 if vca_status in ("error", "blocked"):
1042 vca_deployed["detailed-status-error"] = vca_deployed["detailed-status"]
1043 # if not first time in this status error
1044 if not vca_deployed.get("time_first_error"):
1045 vca_deployed["time_first_error"] = now
1046 continue
1047 if vca_deployed.get("time_first_error") and \
1048 now <= vca_deployed["time_first_error"] + self.timeout_vca_on_error:
1049 n2vc_error_text.append("member_vnf_index={} vdu_id={} {}: {}"
1050 .format(vca_deployed["member-vnf-index"],
1051 vca_deployed["vdu_id"], vca_status,
1052 vca_deployed["detailed-status-error"]))
1053
1054 if all_active:
1055 break
1056 elif n2vc_error_text:
1057 db_nsr_update["config-status"] = "failed"
1058 error_text = "fail configuring " + ";".join(n2vc_error_text)
1059 db_nsr_update["detailed-status"] = error_text
1060 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
1061 db_nslcmop_update["detailed-status"] = error_text
1062 db_nslcmop_update["statusEnteredTime"] = time()
1063 configuration_failed = True
1064 break
1065 else:
1066 cs = "configuring: "
1067 separator = ""
1068 for status, num in status_map.items():
1069 cs += separator + "{}: {}".format(status, num)
1070 separator = ", "
1071 if old_status != cs:
1072 db_nsr_update["config-status"] = cs
1073 db_nsr_update["detailed-status"] = cs
1074 db_nslcmop_update["detailed-status"] = cs
1075 old_status = cs
1076 else: # total_deploy_timeout
1077 raise LcmException("Timeout waiting ns to be configured")
1078
1079 if not configuration_failed:
1080 # all is done
1081 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1082 db_nslcmop_update["statusEnteredTime"] = time()
1083 db_nslcmop_update["detailed-status"] = "done"
1084 db_nsr_update["config-status"] = "configured"
1085 db_nsr_update["detailed-status"] = "done"
1086
1087 return
1088
1089 except (ROclient.ROClientException, DbException, LcmException) as e:
1090 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
1091 exc = e
1092 except asyncio.CancelledError:
1093 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1094 exc = "Operation was cancelled"
1095 except Exception as e:
1096 exc = traceback.format_exc()
1097 self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e),
1098 exc_info=True)
1099 finally:
1100 if exc:
1101 if db_nsr:
1102 db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
1103 db_nsr_update["operational-status"] = "failed"
1104 if db_nslcmop:
1105 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1106 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1107 db_nslcmop_update["statusEnteredTime"] = time()
1108 try:
1109 if db_nsr:
1110 db_nsr_update["_admin.nslcmop"] = None
1111 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1112 if db_nslcmop_update:
1113 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1114 except DbException as e:
1115 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1116 if nslcmop_operation_state:
1117 try:
1118 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1119 "operationState": nslcmop_operation_state},
1120 loop=self.loop)
1121 except Exception as e:
1122 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1123
1124 self.logger.debug(logging_text + "Exit")
1125 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
1126
1127 async def _destroy_charm(self, model, application):
1128 """
1129 Order N2VC destroy a charm
1130 :param model:
1131 :param application:
1132 :return: True if charm does not exist. False if it exist
1133 """
1134 if not await self.n2vc.HasApplication(model, application):
1135 return True # Already removed
1136 await self.n2vc.RemoveCharms(model, application)
1137 return False
1138
1139 async def _wait_charm_destroyed(self, model, application, timeout):
1140 """
1141 Wait until charm does not exist
1142 :param model:
1143 :param application:
1144 :param timeout:
1145 :return: True if not exist, False if timeout
1146 """
1147 while True:
1148 if not await self.n2vc.HasApplication(model, application):
1149 return True
1150 if timeout < 0:
1151 return False
1152 await asyncio.sleep(10)
1153 timeout -= 10
1154
1155 async def terminate(self, nsr_id, nslcmop_id):
1156 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
1157 self.logger.debug(logging_text + "Enter")
1158 db_nsr = None
1159 db_nslcmop = None
1160 exc = None
1161 failed_detail = [] # annotates all failed error messages
1162 vca_time_destroy = None # time of where destroy charm order
1163 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1164 db_nslcmop_update = {}
1165 nslcmop_operation_state = None
1166 autoremove = False # autoremove after terminated
1167 try:
1168 step = "Getting nslcmop={} from db".format(nslcmop_id)
1169 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1170 step = "Getting nsr={} from db".format(nsr_id)
1171 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1172 # nsd = db_nsr["nsd"]
1173 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed"))
1174 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
1175 return
1176 # #TODO check if VIM is creating and wait
1177 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1178
1179 db_nsr_update["operational-status"] = "terminating"
1180 db_nsr_update["config-status"] = "terminating"
1181
1182 if nsr_deployed and nsr_deployed.get("VCA"):
1183 try:
1184 step = "Scheduling configuration charms removing"
1185 db_nsr_update["detailed-status"] = "Deleting charms"
1186 self.logger.debug(logging_text + step)
1187 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1188 # for backward compatibility
1189 if isinstance(nsr_deployed["VCA"], dict):
1190 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1191 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1192 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1193
1194 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1195 if vca_deployed:
1196 if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]):
1197 vca_deployed.clear()
1198 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1199 else:
1200 vca_time_destroy = time()
1201 except Exception as e:
1202 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
1203
1204 # remove from RO
1205 RO_fail = False
1206 RO = ROclient.ROClient(self.loop, **self.ro_config)
1207
1208 # Delete ns
1209 RO_nsr_id = RO_delete_action = None
1210 if nsr_deployed and nsr_deployed.get("RO"):
1211 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
1212 RO_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
1213 try:
1214 if RO_nsr_id:
1215 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = "Deleting ns at RO"
1216 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1217 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1218 self.logger.debug(logging_text + step)
1219 desc = await RO.delete("ns", RO_nsr_id)
1220 RO_delete_action = desc["action_id"]
1221 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = RO_delete_action
1222 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1223 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1224 if RO_delete_action:
1225 # wait until NS is deleted from VIM
1226 step = detailed_status = "Waiting ns deleted from VIM. RO_id={} RO_delete_action={}".\
1227 format(RO_nsr_id, RO_delete_action)
1228 detailed_status_old = None
1229 self.logger.debug(logging_text + step)
1230
1231 delete_timeout = 20 * 60 # 20 minutes
1232 while delete_timeout > 0:
1233 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1234 extra_item_id=RO_delete_action)
1235 ns_status, ns_status_info = RO.check_action_status(desc)
1236 if ns_status == "ERROR":
1237 raise ROclient.ROClientException(ns_status_info)
1238 elif ns_status == "BUILD":
1239 detailed_status = step + "; {}".format(ns_status_info)
1240 elif ns_status == "ACTIVE":
1241 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1242 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1243 break
1244 else:
1245 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1246 if detailed_status != detailed_status_old:
1247 detailed_status_old = db_nslcmop_update["detailed-status"] = \
1248 db_nsr_update["detailed-status"] = detailed_status
1249 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1250 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1251 await asyncio.sleep(5, loop=self.loop)
1252 delete_timeout -= 5
1253 else: # delete_timeout <= 0:
1254 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
1255
1256 except ROclient.ROClientException as e:
1257 if e.http_code == 404: # not found
1258 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1259 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1260 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1261 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
1262 elif e.http_code == 409: # conflict
1263 failed_detail.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
1264 self.logger.debug(logging_text + failed_detail[-1])
1265 RO_fail = True
1266 else:
1267 failed_detail.append("RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
1268 self.logger.error(logging_text + failed_detail[-1])
1269 RO_fail = True
1270
1271 # Delete nsd
1272 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("nsd_id"):
1273 RO_nsd_id = nsr_deployed["RO"]["nsd_id"]
1274 try:
1275 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1276 "Deleting nsd at RO"
1277 await RO.delete("nsd", RO_nsd_id)
1278 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
1279 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1280 except ROclient.ROClientException as e:
1281 if e.http_code == 404: # not found
1282 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
1283 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
1284 elif e.http_code == 409: # conflict
1285 failed_detail.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
1286 self.logger.debug(logging_text + failed_detail[-1])
1287 RO_fail = True
1288 else:
1289 failed_detail.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
1290 self.logger.error(logging_text + failed_detail[-1])
1291 RO_fail = True
1292
1293 if not RO_fail and nsr_deployed and nsr_deployed.get("RO") and nsr_deployed["RO"].get("vnfd"):
1294 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
1295 if not vnf_deployed or not vnf_deployed["id"]:
1296 continue
1297 try:
1298 RO_vnfd_id = vnf_deployed["id"]
1299 step = db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] =\
1300 "Deleting member-vnf-index={} RO_vnfd_id={} from RO".format(
1301 vnf_deployed["member-vnf-index"], RO_vnfd_id)
1302 await RO.delete("vnfd", RO_vnfd_id)
1303 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
1304 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1305 except ROclient.ROClientException as e:
1306 if e.http_code == 404: # not found
1307 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
1308 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
1309 elif e.http_code == 409: # conflict
1310 failed_detail.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
1311 self.logger.debug(logging_text + failed_detail[-1])
1312 else:
1313 failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
1314 self.logger.error(logging_text + failed_detail[-1])
1315
1316 # wait until charm deleted
1317 if vca_time_destroy:
1318 db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \
1319 "Waiting for deletion of configuration charms"
1320 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1321 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1322 for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]):
1323 if not vca_deployed:
1324 continue
1325 step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"])
1326 timeout = self.timeout_charm_delete - int(time() - vca_time_destroy)
1327 if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"],
1328 timeout):
1329 failed_detail.append("VCA[application_name={}] Deletion timeout".format(
1330 vca_deployed["application"]))
1331 else:
1332 db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None
1333
1334 if failed_detail:
1335 self.logger.error(logging_text + " ;".join(failed_detail))
1336 db_nsr_update["operational-status"] = "failed"
1337 db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
1338 db_nslcmop_update["detailed-status"] = "; ".join(failed_detail)
1339 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1340 db_nslcmop_update["statusEnteredTime"] = time()
1341 else:
1342 db_nsr_update["operational-status"] = "terminated"
1343 db_nsr_update["detailed-status"] = "Done"
1344 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
1345 db_nslcmop_update["detailed-status"] = "Done"
1346 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1347 db_nslcmop_update["statusEnteredTime"] = time()
1348 if db_nslcmop["operationParams"].get("autoremove"):
1349 autoremove = True
1350
1351 except (ROclient.ROClientException, DbException) as e:
1352 self.logger.error(logging_text + "Exit Exception {}".format(e))
1353 exc = e
1354 except asyncio.CancelledError:
1355 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1356 exc = "Operation was cancelled"
1357 except Exception as e:
1358 exc = traceback.format_exc()
1359 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1360 finally:
1361 if exc and db_nslcmop:
1362 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1363 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1364 db_nslcmop_update["statusEnteredTime"] = time()
1365 try:
1366 if db_nslcmop and db_nslcmop_update:
1367 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1368 if db_nsr:
1369 db_nsr_update["_admin.nslcmop"] = None
1370 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1371 except DbException as e:
1372 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1373 if nslcmop_operation_state:
1374 try:
1375 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1376 "operationState": nslcmop_operation_state,
1377 "autoremove": autoremove},
1378 loop=self.loop)
1379 except Exception as e:
1380 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1381 self.logger.debug(logging_text + "Exit")
1382 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
1383
1384 @staticmethod
1385 def _map_primitive_params(primitive_desc, params, instantiation_params):
1386 """
1387 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
1388 The default-value is used. If it is between < > it look for a value at instantiation_params
1389 :param primitive_desc: portion of VNFD/NSD that describes primitive
1390 :param params: Params provided by user
1391 :param instantiation_params: Instantiation params provided by user
1392 :return: a dictionary with the calculated params
1393 """
1394 calculated_params = {}
1395 for parameter in primitive_desc.get("parameter", ()):
1396 param_name = parameter["name"]
1397 if param_name in params:
1398 calculated_params[param_name] = params[param_name]
1399 elif "default-value" in parameter:
1400 calculated_params[param_name] = parameter["default-value"]
1401 if isinstance(parameter["default-value"], str) and parameter["default-value"].startswith("<") and \
1402 parameter["default-value"].endswith(">"):
1403 if parameter["default-value"][1:-1] in instantiation_params:
1404 calculated_params[param_name] = instantiation_params[parameter["default-value"][1:-1]]
1405 else:
1406 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1407 format(parameter["default-value"], primitive_desc["name"]))
1408 else:
1409 raise LcmException("Parameter {} needed to execute primitive {} not provided".
1410 format(param_name, primitive_desc["name"]))
1411
1412 if isinstance(calculated_params[param_name], (dict, list, tuple)):
1413 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
1414 width=256)
1415 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
1416 calculated_params[param_name] = calculated_params[param_name][7:]
1417 return calculated_params
1418
1419 async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index,
1420 primitive, primitive_params):
1421 start_primitive_time = time()
1422 try:
1423 for vca_deployed in db_deployed["VCA"]:
1424 if not vca_deployed:
1425 continue
1426 if member_vnf_index != vca_deployed["member-vnf-index"] or vdu_id != vca_deployed["vdu_id"]:
1427 continue
1428 if vdu_name and vdu_name != vca_deployed["vdu_name"]:
1429 continue
1430 if vdu_count_index and vdu_count_index != vca_deployed["vdu_count_index"]:
1431 continue
1432 break
1433 else:
1434 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not "
1435 "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index))
1436 model_name = vca_deployed.get("model")
1437 application_name = vca_deployed.get("application")
1438 if not model_name or not application_name:
1439 raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not "
1440 "model or application name" .format(member_vnf_index, vdu_id, vdu_name,
1441 vdu_count_index))
1442 if vca_deployed["operational-status"] != "active":
1443 raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format(
1444 member_vnf_index, vdu_id, vca_deployed["operational-status"]))
1445 callback = None # self.n2vc_callback
1446 callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None]
1447 await self.n2vc.login()
1448 primitive_id = await self.n2vc.ExecutePrimitive(
1449 model_name,
1450 application_name,
1451 primitive,
1452 callback,
1453 *callback_args,
1454 **primitive_params
1455 )
1456 while time() - start_primitive_time < self.timeout_primitive:
1457 primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id)
1458 if primitive_result_ in ("running", "pending"):
1459 pass
1460 elif primitive_result_ in ("completed", "failed"):
1461 primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED"
1462 detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id)
1463 break
1464 else:
1465 detailed_result = "Invalid N2VC.GetPrimitiveStatus = {} obtained".format(primitive_result_)
1466 primitive_result = "FAILED"
1467 break
1468 await asyncio.sleep(5)
1469 else:
1470 raise LcmException("timeout after {} seconds".format(self.timeout_primitive))
1471 return primitive_result, detailed_result
1472 except (N2VCPrimitiveExecutionFailed, LcmException) as e:
1473 return "FAILED", str(e)
1474
1475 async def action(self, nsr_id, nslcmop_id):
1476 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
1477 self.logger.debug(logging_text + "Enter")
1478 # get all needed from database
1479 db_nsr = None
1480 db_nslcmop = None
1481 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1482 db_nslcmop_update = {}
1483 nslcmop_operation_state = None
1484 exc = None
1485 try:
1486 step = "Getting information from database"
1487 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1488 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1489
1490 nsr_deployed = db_nsr["_admin"].get("deployed")
1491 vnf_index = db_nslcmop["operationParams"]["member_vnf_index"]
1492 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
1493 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
1494 vdu_name = db_nslcmop["operationParams"].get("vdu_name")
1495
1496 step = "Getting vnfr from database"
1497 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1498 step = "Getting vnfd from database"
1499 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1500
1501 # look if previous tasks in process
1502 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1503 if task_dependency:
1504 step = db_nslcmop_update["detailed-status"] = \
1505 "Waiting for related tasks to be completed: {}".format(task_name)
1506 self.logger.debug(logging_text + step)
1507 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1508 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1509 if pending:
1510 raise LcmException("Timeout waiting related tasks to be completed")
1511
1512 # for backward compatibility
1513 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1514 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1515 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1516 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1517
1518 primitive = db_nslcmop["operationParams"]["primitive"]
1519 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
1520
1521 # look for primitive
1522 config_primitive_desc = None
1523 if vdu_id:
1524 for vdu in get_iterable(db_vnfd, "vdu"):
1525 if vdu_id == vdu["id"]:
1526 for config_primitive in vdu.get("vdu-configuration", {}).get("config-primitive", ()):
1527 if config_primitive["name"] == primitive:
1528 config_primitive_desc = config_primitive
1529 break
1530 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1531 if config_primitive["name"] == primitive:
1532 config_primitive_desc = config_primitive
1533 break
1534 if not config_primitive_desc:
1535 raise LcmException("Primitive {} not found at vnf-configuration:config-primitive or vdu:"
1536 "vdu-configuration:config-primitive".format(primitive))
1537
1538 vnfr_params = {}
1539 if db_vnfr.get("additionalParamsForVnf"):
1540 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1541
1542 # TODO check if ns is in a proper status
1543 result, result_detail = await self._ns_execute_primitive(
1544 nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive,
1545 self._map_primitive_params(config_primitive_desc, primitive_params, vnfr_params))
1546 db_nslcmop_update["detailed-status"] = result_detail
1547 db_nslcmop_update["operationState"] = nslcmop_operation_state = result
1548 db_nslcmop_update["statusEnteredTime"] = time()
1549 self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail))
1550 return # database update is called inside finally
1551
1552 except (DbException, LcmException) as e:
1553 self.logger.error(logging_text + "Exit Exception {}".format(e))
1554 exc = e
1555 except asyncio.CancelledError:
1556 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1557 exc = "Operation was cancelled"
1558 except Exception as e:
1559 exc = traceback.format_exc()
1560 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1561 finally:
1562 if exc and db_nslcmop:
1563 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1564 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1565 db_nslcmop_update["statusEnteredTime"] = time()
1566 try:
1567 if db_nslcmop_update:
1568 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1569 if db_nsr:
1570 db_nsr_update["_admin.nslcmop"] = None
1571 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1572 except DbException as e:
1573 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1574 self.logger.debug(logging_text + "Exit")
1575 if nslcmop_operation_state:
1576 try:
1577 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1578 "operationState": nslcmop_operation_state},
1579 loop=self.loop)
1580 except Exception as e:
1581 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1582 self.logger.debug(logging_text + "Exit")
1583 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
1584
1585 async def scale(self, nsr_id, nslcmop_id):
1586 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
1587 self.logger.debug(logging_text + "Enter")
1588 # get all needed from database
1589 db_nsr = None
1590 db_nslcmop = None
1591 db_nslcmop_update = {}
1592 nslcmop_operation_state = None
1593 db_nsr_update = {"_admin.nslcmop": nslcmop_id}
1594 exc = None
1595 # in case of error, indicates what part of scale was failed to put nsr at error status
1596 scale_process = None
1597 old_operational_status = ""
1598 old_config_status = ""
1599 vnfr_scaled = False
1600 try:
1601 step = "Getting nslcmop from database"
1602 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1603 step = "Getting nsr from database"
1604 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1605
1606 old_operational_status = db_nsr["operational-status"]
1607 old_config_status = db_nsr["config-status"]
1608
1609 # look if previous tasks in process
1610 task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
1611 if task_dependency:
1612 step = db_nslcmop_update["detailed-status"] = \
1613 "Waiting for related tasks to be completed: {}".format(task_name)
1614 self.logger.debug(logging_text + step)
1615 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1616 _, pending = await asyncio.wait(task_dependency, timeout=3600)
1617 if pending:
1618 raise LcmException("Timeout waiting related tasks to be completed")
1619
1620 step = "Parsing scaling parameters"
1621 db_nsr_update["operational-status"] = "scaling"
1622 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1623 nsr_deployed = db_nsr["_admin"].get("deployed")
1624 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
1625 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
1626 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
1627 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
1628 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
1629
1630 # for backward compatibility
1631 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
1632 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
1633 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
1634 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1635
1636 step = "Getting vnfr from database"
1637 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
1638 step = "Getting vnfd from database"
1639 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
1640 step = "Getting scaling-group-descriptor"
1641 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
1642 if scaling_descriptor["name"] == scaling_group:
1643 break
1644 else:
1645 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
1646 "at vnfd:scaling-group-descriptor".format(scaling_group))
1647 # cooldown_time = 0
1648 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
1649 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
1650 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
1651 # break
1652
1653 # TODO check if ns is in a proper status
1654 step = "Sending scale order to RO"
1655 nb_scale_op = 0
1656 if not db_nsr["_admin"].get("scaling-group"):
1657 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
1658 admin_scale_index = 0
1659 else:
1660 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
1661 if admin_scale_info["name"] == scaling_group:
1662 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
1663 break
1664 else: # not found, set index one plus last element and add new entry with the name
1665 admin_scale_index += 1
1666 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
1667 RO_scaling_info = []
1668 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
1669 if scaling_type == "SCALE_OUT":
1670 # count if max-instance-count is reached
1671 if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
1672 max_instance_count = int(scaling_descriptor["max-instance-count"])
1673 if nb_scale_op >= max_instance_count:
1674 raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
1675 " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1676 nb_scale_op = nb_scale_op + 1
1677 vdu_scaling_info["scaling_direction"] = "OUT"
1678 vdu_scaling_info["vdu-create"] = {}
1679 for vdu_scale_info in scaling_descriptor["vdu"]:
1680 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1681 "type": "create", "count": vdu_scale_info.get("count", 1)})
1682 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1683 elif scaling_type == "SCALE_IN":
1684 # count if min-instance-count is reached
1685 min_instance_count = 0
1686 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
1687 min_instance_count = int(scaling_descriptor["min-instance-count"])
1688 if nb_scale_op <= min_instance_count:
1689 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
1690 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
1691 nb_scale_op = nb_scale_op - 1
1692 vdu_scaling_info["scaling_direction"] = "IN"
1693 vdu_scaling_info["vdu-delete"] = {}
1694 for vdu_scale_info in scaling_descriptor["vdu"]:
1695 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
1696 "type": "delete", "count": vdu_scale_info.get("count", 1)})
1697 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
1698
1699 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
1700 vdu_create = vdu_scaling_info.get("vdu-create")
1701 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
1702 if vdu_scaling_info["scaling_direction"] == "IN":
1703 for vdur in reversed(db_vnfr["vdur"]):
1704 if vdu_delete.get(vdur["vdu-id-ref"]):
1705 vdu_delete[vdur["vdu-id-ref"]] -= 1
1706 vdu_scaling_info["vdu"].append({
1707 "name": vdur["name"],
1708 "vdu_id": vdur["vdu-id-ref"],
1709 "interface": []
1710 })
1711 for interface in vdur["interfaces"]:
1712 vdu_scaling_info["vdu"][-1]["interface"].append({
1713 "name": interface["name"],
1714 "ip_address": interface["ip-address"],
1715 "mac_address": interface.get("mac-address"),
1716 })
1717 vdu_delete = vdu_scaling_info.pop("vdu-delete")
1718
1719 # execute primitive service PRE-SCALING
1720 step = "Executing pre-scale vnf-config-primitive"
1721 if scaling_descriptor.get("scaling-config-action"):
1722 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1723 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
1724 and scaling_type == "SCALE_IN":
1725 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1726 step = db_nslcmop_update["detailed-status"] = \
1727 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
1728
1729 # look for primitive
1730 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1731 if config_primitive["name"] == vnf_config_primitive:
1732 break
1733 else:
1734 raise LcmException(
1735 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
1736 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
1737 "primitive".format(scaling_group, config_primitive))
1738
1739 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1740 if db_vnfr.get("additionalParamsForVnf"):
1741 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1742
1743 scale_process = "VCA"
1744 db_nsr_update["config-status"] = "configuring pre-scaling"
1745 result, result_detail = await self._ns_execute_primitive(
1746 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1747 self._map_primitive_params(config_primitive, {}, vnfr_params))
1748 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1749 vnf_config_primitive, result, result_detail))
1750 if result == "FAILED":
1751 raise LcmException(result_detail)
1752 db_nsr_update["config-status"] = old_config_status
1753 scale_process = None
1754
1755 if RO_scaling_info:
1756 scale_process = "RO"
1757 RO = ROclient.ROClient(self.loop, **self.ro_config)
1758 RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
1759 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
1760 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
1761 # wait until ready
1762 RO_nslcmop_id = RO_desc["instance_action_id"]
1763 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
1764
1765 RO_task_done = False
1766 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
1767 detailed_status_old = None
1768 self.logger.debug(logging_text + step)
1769
1770 deployment_timeout = 1 * 3600 # One hour
1771 while deployment_timeout > 0:
1772 if not RO_task_done:
1773 desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
1774 extra_item_id=RO_nslcmop_id)
1775 ns_status, ns_status_info = RO.check_action_status(desc)
1776 if ns_status == "ERROR":
1777 raise ROclient.ROClientException(ns_status_info)
1778 elif ns_status == "BUILD":
1779 detailed_status = step + "; {}".format(ns_status_info)
1780 elif ns_status == "ACTIVE":
1781 RO_task_done = True
1782 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
1783 self.logger.debug(logging_text + step)
1784 else:
1785 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
1786 else:
1787 desc = await RO.show("ns", RO_nsr_id)
1788 ns_status, ns_status_info = RO.check_ns_status(desc)
1789 if ns_status == "ERROR":
1790 raise ROclient.ROClientException(ns_status_info)
1791 elif ns_status == "BUILD":
1792 detailed_status = step + "; {}".format(ns_status_info)
1793 elif ns_status == "ACTIVE":
1794 step = detailed_status = \
1795 "Waiting for management IP address reported by the VIM. Updating VNFRs"
1796 if not vnfr_scaled:
1797 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
1798 vnfr_scaled = True
1799 try:
1800 desc = await RO.show("ns", RO_nsr_id)
1801 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
1802 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
1803 break
1804 except LcmExceptionNoMgmtIP:
1805 pass
1806 else:
1807 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1808 if detailed_status != detailed_status_old:
1809 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
1810 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1811
1812 await asyncio.sleep(5, loop=self.loop)
1813 deployment_timeout -= 5
1814 if deployment_timeout <= 0:
1815 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1816
1817 # update VDU_SCALING_INFO with the obtained ip_addresses
1818 if vdu_scaling_info["scaling_direction"] == "OUT":
1819 for vdur in reversed(db_vnfr["vdur"]):
1820 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
1821 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
1822 vdu_scaling_info["vdu"].append({
1823 "name": vdur["name"],
1824 "vdu_id": vdur["vdu-id-ref"],
1825 "interface": []
1826 })
1827 for interface in vdur["interfaces"]:
1828 vdu_scaling_info["vdu"][-1]["interface"].append({
1829 "name": interface["name"],
1830 "ip_address": interface["ip-address"],
1831 "mac_address": interface.get("mac-address"),
1832 })
1833 del vdu_scaling_info["vdu-create"]
1834
1835 scale_process = None
1836 if db_nsr_update:
1837 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1838
1839 # execute primitive service POST-SCALING
1840 step = "Executing post-scale vnf-config-primitive"
1841 if scaling_descriptor.get("scaling-config-action"):
1842 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
1843 if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
1844 and scaling_type == "SCALE_OUT":
1845 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
1846 step = db_nslcmop_update["detailed-status"] = \
1847 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
1848
1849 vnfr_params = {"<VDU_SCALE_INFO>": vdu_scaling_info}
1850 if db_vnfr.get("additionalParamsForVnf"):
1851 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
1852
1853 # look for primitive
1854 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
1855 if config_primitive["name"] == vnf_config_primitive:
1856 break
1857 else:
1858 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
1859 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
1860 "match any vnf-configuration:config-primitive".format(scaling_group,
1861 config_primitive))
1862 scale_process = "VCA"
1863 db_nsr_update["config-status"] = "configuring post-scaling"
1864
1865 result, result_detail = await self._ns_execute_primitive(
1866 nsr_deployed, vnf_index, None, None, None, vnf_config_primitive,
1867 self._map_primitive_params(config_primitive, {}, vnfr_params))
1868 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
1869 vnf_config_primitive, result, result_detail))
1870 if result == "FAILED":
1871 raise LcmException(result_detail)
1872 db_nsr_update["config-status"] = old_config_status
1873 scale_process = None
1874
1875 db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
1876 db_nslcmop_update["statusEnteredTime"] = time()
1877 db_nslcmop_update["detailed-status"] = "done"
1878 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
1879 db_nsr_update["operational-status"] = old_operational_status
1880 db_nsr_update["config-status"] = old_config_status
1881 return
1882 except (ROclient.ROClientException, DbException, LcmException) as e:
1883 self.logger.error(logging_text + "Exit Exception {}".format(e))
1884 exc = e
1885 except asyncio.CancelledError:
1886 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
1887 exc = "Operation was cancelled"
1888 except Exception as e:
1889 exc = traceback.format_exc()
1890 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
1891 finally:
1892 if exc:
1893 if db_nslcmop:
1894 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
1895 db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
1896 db_nslcmop_update["statusEnteredTime"] = time()
1897 if db_nsr:
1898 db_nsr_update["operational-status"] = old_operational_status
1899 db_nsr_update["config-status"] = old_config_status
1900 db_nsr_update["detailed-status"] = ""
1901 db_nsr_update["_admin.nslcmop"] = None
1902 if scale_process:
1903 if "VCA" in scale_process:
1904 db_nsr_update["config-status"] = "failed"
1905 if "RO" in scale_process:
1906 db_nsr_update["operational-status"] = "failed"
1907 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
1908 exc)
1909 try:
1910 if db_nslcmop and db_nslcmop_update:
1911 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
1912 if db_nsr:
1913 db_nsr_update["_admin.nslcmop"] = None
1914 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1915 except DbException as e:
1916 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1917 if nslcmop_operation_state:
1918 try:
1919 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
1920 "operationState": nslcmop_operation_state},
1921 loop=self.loop)
1922 # if cooldown_time:
1923 # await asyncio.sleep(cooldown_time)
1924 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
1925 except Exception as e:
1926 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
1927 self.logger.debug(logging_text + "Exit")
1928 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")