Fix bug 1432: No support for alternative images for sol006
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vnf_configuration, get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_vdu_configuration, get_kdu_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=None,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211
212 try:
213
214 nsr_id = filter.get('_id')
215
216 # read ns record from database
217 nsr = self.db.get_one(table='nsrs', q_filter=filter)
218 current_ns_status = nsr.get('nsState')
219
220 # get vca status for NS
221 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
222
223 # vcaStatus
224 db_dict = dict()
225 db_dict['vcaStatus'] = status_dict
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 @staticmethod
293 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
294 try:
295 env = Environment(undefined=StrictUndefined)
296 template = env.from_string(cloud_init_text)
297 return template.render(additional_params or {})
298 except UndefinedError as e:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
302 except (TemplateError, TemplateNotFound) as e:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id, vdu_id, e))
305
306 def _get_vdu_cloud_init_content(self, vdu, vnfd):
307 cloud_init_content = cloud_init_file = None
308 try:
309 if vdu.get("cloud-init-file"):
310 base_folder = vnfd["_admin"]["storage"]
311 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
312 vdu["cloud-init-file"])
313 with self.fs.file_open(cloud_init_file, "r") as ci_file:
314 cloud_init_content = ci_file.read()
315 elif vdu.get("cloud-init"):
316 cloud_init_content = vdu["cloud-init"]
317
318 return cloud_init_content
319 except FsException as e:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd["id"], vdu["id"], cloud_init_file, e))
322
323 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
324 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
325 additional_params = vdur.get("additionalParams")
326 return parse_yaml_strings(additional_params)
327
328 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
329 """
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
336 """
337 vnfd_RO = deepcopy(vnfd)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO.pop("_id", None)
340 vnfd_RO.pop("_admin", None)
341 vnfd_RO.pop("vnf-configuration", None)
342 vnfd_RO.pop("monitoring-param", None)
343 vnfd_RO.pop("scaling-group-descriptor", None)
344 vnfd_RO.pop("kdu", None)
345 vnfd_RO.pop("k8s-cluster", None)
346 if new_id:
347 vnfd_RO["id"] = new_id
348
349 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
350 for vdu in get_iterable(vnfd_RO, "vdu"):
351 vdu.pop("cloud-init-file", None)
352 vdu.pop("cloud-init", None)
353 return vnfd_RO
354
355 @staticmethod
356 def ip_profile_2_RO(ip_profile):
357 RO_ip_profile = deepcopy(ip_profile)
358 if "dns-server" in RO_ip_profile:
359 if isinstance(RO_ip_profile["dns-server"], list):
360 RO_ip_profile["dns-address"] = []
361 for ds in RO_ip_profile.pop("dns-server"):
362 RO_ip_profile["dns-address"].append(ds['address'])
363 else:
364 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
365 if RO_ip_profile.get("ip-version") == "ipv4":
366 RO_ip_profile["ip-version"] = "IPv4"
367 if RO_ip_profile.get("ip-version") == "ipv6":
368 RO_ip_profile["ip-version"] = "IPv6"
369 if "dhcp-params" in RO_ip_profile:
370 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
371 return RO_ip_profile
372
373 def _get_ro_vim_id_for_vim_account(self, vim_account):
374 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
375 if db_vim["_admin"]["operationalState"] != "ENABLED":
376 raise LcmException("VIM={} is not available. operationalState={}".format(
377 vim_account, db_vim["_admin"]["operationalState"]))
378 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
379 return RO_vim_id
380
381 def get_ro_wim_id_for_wim_account(self, wim_account):
382 if isinstance(wim_account, str):
383 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
384 if db_wim["_admin"]["operationalState"] != "ENABLED":
385 raise LcmException("WIM={} is not available. operationalState={}".format(
386 wim_account, db_wim["_admin"]["operationalState"]))
387 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
388 return RO_wim_id
389 else:
390 return wim_account
391
392 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
393
394 db_vdu_push_list = []
395 db_update = {"_admin.modified": time()}
396 if vdu_create:
397 for vdu_id, vdu_count in vdu_create.items():
398 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
399 if not vdur:
400 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
401 format(vdu_id))
402
403 for count in range(vdu_count):
404 vdur_copy = deepcopy(vdur)
405 vdur_copy["status"] = "BUILD"
406 vdur_copy["status-detailed"] = None
407 vdur_copy["ip-address"]: None
408 vdur_copy["_id"] = str(uuid4())
409 vdur_copy["count-index"] += count + 1
410 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
411 vdur_copy.pop("vim_info", None)
412 for iface in vdur_copy["interfaces"]:
413 if iface.get("fixed-ip"):
414 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
415 else:
416 iface.pop("ip-address", None)
417 if iface.get("fixed-mac"):
418 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
419 else:
420 iface.pop("mac-address", None)
421 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
422 db_vdu_push_list.append(vdur_copy)
423 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
424 if vdu_delete:
425 for vdu_id, vdu_count in vdu_delete.items():
426 if mark_delete:
427 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
428 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
429 else:
430 # it must be deleted one by one because common.db does not allow otherwise
431 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
432 for vdu in vdus_to_delete[:vdu_count]:
433 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
434 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
435 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
436 # modify passed dictionary db_vnfr
437 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
438 db_vnfr["vdur"] = db_vnfr_["vdur"]
439
440 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
441 """
442 Updates database nsr with the RO info for the created vld
443 :param ns_update_nsr: dictionary to be filled with the updated info
444 :param db_nsr: content of db_nsr. This is also modified
445 :param nsr_desc_RO: nsr descriptor from RO
446 :return: Nothing, LcmException is raised on errors
447 """
448
449 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
450 for net_RO in get_iterable(nsr_desc_RO, "nets"):
451 if vld["id"] != net_RO.get("ns_net_osm_id"):
452 continue
453 vld["vim-id"] = net_RO.get("vim_net_id")
454 vld["name"] = net_RO.get("vim_name")
455 vld["status"] = net_RO.get("status")
456 vld["status-detailed"] = net_RO.get("error_msg")
457 ns_update_nsr["vld.{}".format(vld_index)] = vld
458 break
459 else:
460 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
461
462 def set_vnfr_at_error(self, db_vnfrs, error_text):
463 try:
464 for db_vnfr in db_vnfrs.values():
465 vnfr_update = {"status": "ERROR"}
466 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
467 if "status" not in vdur:
468 vdur["status"] = "ERROR"
469 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
470 if error_text:
471 vdur["status-detailed"] = str(error_text)
472 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
473 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
474 except DbException as e:
475 self.logger.error("Cannot update vnf. {}".format(e))
476
477 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
478 """
479 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
480 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
481 :param nsr_desc_RO: nsr descriptor from RO
482 :return: Nothing, LcmException is raised on errors
483 """
484 for vnf_index, db_vnfr in db_vnfrs.items():
485 for vnf_RO in nsr_desc_RO["vnfs"]:
486 if vnf_RO["member_vnf_index"] != vnf_index:
487 continue
488 vnfr_update = {}
489 if vnf_RO.get("ip_address"):
490 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
491 elif not db_vnfr.get("ip-address"):
492 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
493 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
494
495 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
496 vdur_RO_count_index = 0
497 if vdur.get("pdu-type"):
498 continue
499 for vdur_RO in get_iterable(vnf_RO, "vms"):
500 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
501 continue
502 if vdur["count-index"] != vdur_RO_count_index:
503 vdur_RO_count_index += 1
504 continue
505 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
506 if vdur_RO.get("ip_address"):
507 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
508 else:
509 vdur["ip-address"] = None
510 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
511 vdur["name"] = vdur_RO.get("vim_name")
512 vdur["status"] = vdur_RO.get("status")
513 vdur["status-detailed"] = vdur_RO.get("error_msg")
514 for ifacer in get_iterable(vdur, "interfaces"):
515 for interface_RO in get_iterable(vdur_RO, "interfaces"):
516 if ifacer["name"] == interface_RO.get("internal_name"):
517 ifacer["ip-address"] = interface_RO.get("ip_address")
518 ifacer["mac-address"] = interface_RO.get("mac_address")
519 break
520 else:
521 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
522 "from VIM info"
523 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
524 vnfr_update["vdur.{}".format(vdu_index)] = vdur
525 break
526 else:
527 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
528 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
529
530 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
531 for net_RO in get_iterable(nsr_desc_RO, "nets"):
532 if vld["id"] != net_RO.get("vnf_net_osm_id"):
533 continue
534 vld["vim-id"] = net_RO.get("vim_net_id")
535 vld["name"] = net_RO.get("vim_name")
536 vld["status"] = net_RO.get("status")
537 vld["status-detailed"] = net_RO.get("error_msg")
538 vnfr_update["vld.{}".format(vld_index)] = vld
539 break
540 else:
541 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
542 vnf_index, vld["id"]))
543
544 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
545 break
546
547 else:
548 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
549
550 def _get_ns_config_info(self, nsr_id):
551 """
552 Generates a mapping between vnf,vdu elements and the N2VC id
553 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
554 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
555 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
556 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
557 """
558 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
559 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
560 mapping = {}
561 ns_config_info = {"osm-config-mapping": mapping}
562 for vca in vca_deployed_list:
563 if not vca["member-vnf-index"]:
564 continue
565 if not vca["vdu_id"]:
566 mapping[vca["member-vnf-index"]] = vca["application"]
567 else:
568 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
569 vca["application"]
570 return ns_config_info
571
572 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
573 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
574
575 db_vims = {}
576
577 def get_vim_account(vim_account_id):
578 nonlocal db_vims
579 if vim_account_id in db_vims:
580 return db_vims[vim_account_id]
581 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
582 db_vims[vim_account_id] = db_vim
583 return db_vim
584
585 # modify target_vld info with instantiation parameters
586 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
587 if vld_params.get("ip-profile"):
588 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
589 if vld_params.get("provider-network"):
590 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
591 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
592 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
593 if vld_params.get("wimAccountId"):
594 target_wim = "wim:{}".format(vld_params["wimAccountId"])
595 target_vld["vim_info"][target_wim] = {}
596 for param in ("vim-network-name", "vim-network-id"):
597 if vld_params.get(param):
598 if isinstance(vld_params[param], dict):
599 for vim, vim_net in vld_params[param]:
600 other_target_vim = "vim:" + vim
601 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
602 else: # isinstance str
603 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
604 if vld_params.get("common_id"):
605 target_vld["common_id"] = vld_params.get("common_id")
606
607 nslcmop_id = db_nslcmop["_id"]
608 target = {
609 "name": db_nsr["name"],
610 "ns": {"vld": []},
611 "vnf": [],
612 "image": deepcopy(db_nsr["image"]),
613 "flavor": deepcopy(db_nsr["flavor"]),
614 "action_id": nslcmop_id,
615 "cloud_init_content": {},
616 }
617 for image in target["image"]:
618 image["vim_info"] = {}
619 for flavor in target["flavor"]:
620 flavor["vim_info"] = {}
621
622 if db_nslcmop.get("lcmOperationType") != "instantiate":
623 # get parameters of instantiation:
624 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
625 "lcmOperationType": "instantiate"})[-1]
626 ns_params = db_nslcmop_instantiate.get("operationParams")
627 else:
628 ns_params = db_nslcmop.get("operationParams")
629 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
630 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
631
632 cp2target = {}
633 for vld_index, vld in enumerate(db_nsr.get("vld")):
634 target_vim = "vim:{}".format(ns_params["vimAccountId"])
635 target_vld = {
636 "id": vld["id"],
637 "name": vld["name"],
638 "mgmt-network": vld.get("mgmt-network", False),
639 "type": vld.get("type"),
640 "vim_info": {
641 target_vim: {
642 "vim_network_name": vld.get("vim-network-name"),
643 "vim_account_id": ns_params["vimAccountId"]
644 }
645 }
646 }
647 # check if this network needs SDN assist
648 if vld.get("pci-interfaces"):
649 db_vim = VimAccountDB.get_vim_account_with_id(target_vld["vim_info"][0]["vim_account_id"])
650 sdnc_id = db_vim["config"].get("sdn-controller")
651 if sdnc_id:
652 target_vld["vim_info"].append({"sdnc_id": sdnc_id})
653
654 nsd_vnf_profiles = get_vnf_profiles(nsd)
655 for nsd_vnf_profile in nsd_vnf_profiles:
656 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
657 if cp["virtual-link-profile-id"] == vld["id"]:
658 cp2target["member_vnf:{}.{}".format(
659 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
660 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
661 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
662
663 # check at nsd descriptor, if there is an ip-profile
664 vld_params = {}
665 virtual_link_profiles = get_virtual_link_profiles(nsd)
666
667 for vlp in virtual_link_profiles:
668 ip_profile = find_in_list(nsd["ip-profiles"],
669 lambda profile: profile["name"] == vlp["ip-profile-ref"])
670 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
671 # update vld_params with instantiation params
672 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
673 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
674 if vld_instantiation_params:
675 vld_params.update(vld_instantiation_params)
676 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
677 target["ns"]["vld"].append(target_vld)
678
679 for vnfr in db_vnfrs.values():
680 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
681 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
682 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
683 target_vnf = deepcopy(vnfr)
684 target_vim = "vim:{}".format(vnfr["vim-account-id"])
685 for vld in target_vnf.get("vld", ()):
686 # check if connected to a ns.vld, to fill target'
687 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
688 lambda cpd: cpd.get("id") == vld["id"])
689 if vnf_cp:
690 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
691 if cp2target.get(ns_cp):
692 vld["target"] = cp2target[ns_cp]
693
694 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
695 # check if this network needs SDN assist
696 target_sdn = None
697 if vld.get("pci-interfaces"):
698 db_vim = get_vim_account(vnfr["vim-account-id"])
699 sdnc_id = db_vim["config"].get("sdn-controller")
700 if sdnc_id:
701 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
702 target_sdn = "sdn:{}".format(sdnc_id)
703 vld["vim_info"][target_sdn] = {
704 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
705
706 # check at vnfd descriptor, if there is an ip-profile
707 vld_params = {}
708 vnfd_vlp = find_in_list(
709 get_virtual_link_profiles(vnfd),
710 lambda a_link_profile: a_link_profile["id"] == vld["id"]
711 )
712 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
713 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
714 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
715 ip_profile_dest_data = {}
716 if "ip-version" in ip_profile_source_data:
717 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
718 if "cidr" in ip_profile_source_data:
719 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
720 if "gateway-ip" in ip_profile_source_data:
721 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
722 if "dhcp-enabled" in ip_profile_source_data:
723 ip_profile_dest_data["dhcp-params"] = {
724 "enabled": ip_profile_source_data["dhcp-enabled"]
725 }
726
727 vld_params["ip-profile"] = ip_profile_dest_data
728 # update vld_params with instantiation params
729 if vnf_params:
730 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
731 lambda i_vld: i_vld["name"] == vld["id"])
732 if vld_instantiation_params:
733 vld_params.update(vld_instantiation_params)
734 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
735
736 vdur_list = []
737 for vdur in target_vnf.get("vdur", ()):
738 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
739 continue # This vdu must not be created
740 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
741
742 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
743
744 if ssh_keys_all:
745 vdu_configuration = get_vdu_configuration(vnfd, vdur["vdu-id-ref"])
746 vnf_configuration = get_vnf_configuration(vnfd)
747 if vdu_configuration and vdu_configuration.get("config-access") and \
748 vdu_configuration.get("config-access").get("ssh-access"):
749 vdur["ssh-keys"] = ssh_keys_all
750 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
751 elif vnf_configuration and vnf_configuration.get("config-access") and \
752 vnf_configuration.get("config-access").get("ssh-access") and \
753 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
754 vdur["ssh-keys"] = ssh_keys_all
755 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
756 elif ssh_keys_instantiation and \
757 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
758 vdur["ssh-keys"] = ssh_keys_instantiation
759
760 self.logger.debug("NS > vdur > {}".format(vdur))
761
762 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
763 # cloud-init
764 if vdud.get("cloud-init-file"):
765 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
766 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
767 if vdur["cloud-init"] not in target["cloud_init_content"]:
768 base_folder = vnfd["_admin"]["storage"]
769 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
770 vdud.get("cloud-init-file"))
771 with self.fs.file_open(cloud_init_file, "r") as ci_file:
772 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
773 elif vdud.get("cloud-init"):
774 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
775 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
776 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
777 vdur["additionalParams"] = vdur.get("additionalParams") or {}
778 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
779 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
780 vdur["additionalParams"] = deploy_params_vdu
781
782 # flavor
783 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
784 if target_vim not in ns_flavor["vim_info"]:
785 ns_flavor["vim_info"][target_vim] = {}
786
787 # deal with images
788 # in case alternative images are provided we must check if they should be applied
789 # for the vim_type, modify the vim_type taking into account
790 ns_image_id = int(vdur["ns-image-id"])
791 if vdur.get("alt-image-ids"):
792 db_vim = get_vim_account(vnfr["vim-account-id"])
793 vim_type = db_vim["vim_type"]
794 for alt_image_id in vdur.get("alt-image-ids"):
795 ns_alt_image = target["image"][int(alt_image_id)]
796 if vim_type == ns_alt_image.get("vim-type"):
797 # must use alternative image
798 self.logger.debug("use alternative image id: {}".format(alt_image_id))
799 ns_image_id = alt_image_id
800 vdur["ns-image-id"] = ns_image_id
801 break
802 ns_image = target["image"][int(ns_image_id)]
803 if target_vim not in ns_image["vim_info"]:
804 ns_image["vim_info"][target_vim] = {}
805
806 vdur["vim_info"] = {target_vim: {}}
807 # instantiation parameters
808 # if vnf_params:
809 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
810 # vdud["id"]), None)
811 vdur_list.append(vdur)
812 target_vnf["vdur"] = vdur_list
813 target["vnf"].append(target_vnf)
814
815 desc = await self.RO.deploy(nsr_id, target)
816 self.logger.debug("RO return > {}".format(desc))
817 action_id = desc["action_id"]
818 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
819
820 # Updating NSR
821 db_nsr_update = {
822 "_admin.deployed.RO.operational-status": "running",
823 "detailed-status": " ".join(stage)
824 }
825 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
826 self.update_db_2("nsrs", nsr_id, db_nsr_update)
827 self._write_op_status(nslcmop_id, stage)
828 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
829 return
830
831 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
832 detailed_status_old = None
833 db_nsr_update = {}
834 start_time = start_time or time()
835 while time() <= start_time + timeout:
836 desc_status = await self.RO.status(nsr_id, action_id)
837 self.logger.debug("Wait NG RO > {}".format(desc_status))
838 if desc_status["status"] == "FAILED":
839 raise NgRoException(desc_status["details"])
840 elif desc_status["status"] == "BUILD":
841 if stage:
842 stage[2] = "VIM: ({})".format(desc_status["details"])
843 elif desc_status["status"] == "DONE":
844 if stage:
845 stage[2] = "Deployed at VIM"
846 break
847 else:
848 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
849 if stage and nslcmop_id and stage[2] != detailed_status_old:
850 detailed_status_old = stage[2]
851 db_nsr_update["detailed-status"] = " ".join(stage)
852 self.update_db_2("nsrs", nsr_id, db_nsr_update)
853 self._write_op_status(nslcmop_id, stage)
854 await asyncio.sleep(15, loop=self.loop)
855 else: # timeout_ns_deploy
856 raise NgRoException("Timeout waiting ns to deploy")
857
858 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
859 db_nsr_update = {}
860 failed_detail = []
861 action_id = None
862 start_deploy = time()
863 try:
864 target = {
865 "ns": {"vld": []},
866 "vnf": [],
867 "image": [],
868 "flavor": [],
869 "action_id": nslcmop_id
870 }
871 desc = await self.RO.deploy(nsr_id, target)
872 action_id = desc["action_id"]
873 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
874 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
875 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
876
877 # wait until done
878 delete_timeout = 20 * 60 # 20 minutes
879 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
880
881 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
882 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
883 # delete all nsr
884 await self.RO.delete(nsr_id)
885 except Exception as e:
886 if isinstance(e, NgRoException) and e.http_code == 404: # not found
887 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
888 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
889 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
890 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
891 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
892 failed_detail.append("delete conflict: {}".format(e))
893 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
894 else:
895 failed_detail.append("delete error: {}".format(e))
896 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
897
898 if failed_detail:
899 stage[2] = "Error deleting from VIM"
900 else:
901 stage[2] = "Deleted from VIM"
902 db_nsr_update["detailed-status"] = " ".join(stage)
903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
904 self._write_op_status(nslcmop_id, stage)
905
906 if failed_detail:
907 raise LcmException("; ".join(failed_detail))
908 return
909
910 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
911 n2vc_key_list, stage):
912 """
913 Instantiate at RO
914 :param logging_text: preffix text to use at logging
915 :param nsr_id: nsr identity
916 :param nsd: database content of ns descriptor
917 :param db_nsr: database content of ns record
918 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
919 :param db_vnfrs:
920 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
921 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
922 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
923 :return: None or exception
924 """
925 try:
926 start_deploy = time()
927 ns_params = db_nslcmop.get("operationParams")
928 if ns_params and ns_params.get("timeout_ns_deploy"):
929 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
930 else:
931 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
932
933 # Check for and optionally request placement optimization. Database will be updated if placement activated
934 stage[2] = "Waiting for Placement."
935 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
936 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
937 for vnfr in db_vnfrs.values():
938 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
939 break
940 else:
941 ns_params["vimAccountId"] == vnfr["vim-account-id"]
942
943 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
944 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
945 except Exception as e:
946 stage[2] = "ERROR deploying at VIM"
947 self.set_vnfr_at_error(db_vnfrs, str(e))
948 self.logger.error("Error deploying at VIM {}".format(e),
949 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
950 NgRoException)))
951 raise
952
953 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
954 """
955 Wait for kdu to be up, get ip address
956 :param logging_text: prefix use for logging
957 :param nsr_id:
958 :param vnfr_id:
959 :param kdu_name:
960 :return: IP address
961 """
962
963 # self.logger.debug(logging_text + "Starting wait_kdu_up")
964 nb_tries = 0
965
966 while nb_tries < 360:
967 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
968 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
969 if not kdur:
970 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
971 if kdur.get("status"):
972 if kdur["status"] in ("READY", "ENABLED"):
973 return kdur.get("ip-address")
974 else:
975 raise LcmException("target KDU={} is in error state".format(kdu_name))
976
977 await asyncio.sleep(10, loop=self.loop)
978 nb_tries += 1
979 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
980
981 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
982 """
983 Wait for ip addres at RO, and optionally, insert public key in virtual machine
984 :param logging_text: prefix use for logging
985 :param nsr_id:
986 :param vnfr_id:
987 :param vdu_id:
988 :param vdu_index:
989 :param pub_key: public ssh key to inject, None to skip
990 :param user: user to apply the public ssh key
991 :return: IP address
992 """
993
994 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
995 ro_nsr_id = None
996 ip_address = None
997 nb_tries = 0
998 target_vdu_id = None
999 ro_retries = 0
1000
1001 while True:
1002
1003 ro_retries += 1
1004 if ro_retries >= 360: # 1 hour
1005 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1006
1007 await asyncio.sleep(10, loop=self.loop)
1008
1009 # get ip address
1010 if not target_vdu_id:
1011 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1012
1013 if not vdu_id: # for the VNF case
1014 if db_vnfr.get("status") == "ERROR":
1015 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1016 ip_address = db_vnfr.get("ip-address")
1017 if not ip_address:
1018 continue
1019 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1020 else: # VDU case
1021 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1022 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1023
1024 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1025 vdur = db_vnfr["vdur"][0]
1026 if not vdur:
1027 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1028 vdu_index))
1029 # New generation RO stores information at "vim_info"
1030 ng_ro_status = None
1031 target_vim = None
1032 if vdur.get("vim_info"):
1033 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1034 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1035 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1036 ip_address = vdur.get("ip-address")
1037 if not ip_address:
1038 continue
1039 target_vdu_id = vdur["vdu-id-ref"]
1040 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1041 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1042
1043 if not target_vdu_id:
1044 continue
1045
1046 # inject public key into machine
1047 if pub_key and user:
1048 self.logger.debug(logging_text + "Inserting RO key")
1049 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1050 if vdur.get("pdu-type"):
1051 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1052 return ip_address
1053 try:
1054 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1055 if self.ng_ro:
1056 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1057 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1058 }
1059 desc = await self.RO.deploy(nsr_id, target)
1060 action_id = desc["action_id"]
1061 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1062 break
1063 else:
1064 # wait until NS is deployed at RO
1065 if not ro_nsr_id:
1066 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1067 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1068 if not ro_nsr_id:
1069 continue
1070 result_dict = await self.RO.create_action(
1071 item="ns",
1072 item_id_name=ro_nsr_id,
1073 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1074 )
1075 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1076 if not result_dict or not isinstance(result_dict, dict):
1077 raise LcmException("Unknown response from RO when injecting key")
1078 for result in result_dict.values():
1079 if result.get("vim_result") == 200:
1080 break
1081 else:
1082 raise ROclient.ROClientException("error injecting key: {}".format(
1083 result.get("description")))
1084 break
1085 except NgRoException as e:
1086 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1087 except ROclient.ROClientException as e:
1088 if not nb_tries:
1089 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1090 format(e, 20*10))
1091 nb_tries += 1
1092 if nb_tries >= 20:
1093 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1094 else:
1095 break
1096
1097 return ip_address
1098
1099 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1100 """
1101 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1102 """
1103 my_vca = vca_deployed_list[vca_index]
1104 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1105 # vdu or kdu: no dependencies
1106 return
1107 timeout = 300
1108 while timeout >= 0:
1109 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1110 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1111 configuration_status_list = db_nsr["configurationStatus"]
1112 for index, vca_deployed in enumerate(configuration_status_list):
1113 if index == vca_index:
1114 # myself
1115 continue
1116 if not my_vca.get("member-vnf-index") or \
1117 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1118 internal_status = configuration_status_list[index].get("status")
1119 if internal_status == 'READY':
1120 continue
1121 elif internal_status == 'BROKEN':
1122 raise LcmException("Configuration aborted because dependent charm/s has failed")
1123 else:
1124 break
1125 else:
1126 # no dependencies, return
1127 return
1128 await asyncio.sleep(10)
1129 timeout -= 1
1130
1131 raise LcmException("Configuration aborted because dependent charm/s timeout")
1132
1133 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1134 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1135 ee_config_descriptor):
1136 nsr_id = db_nsr["_id"]
1137 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1138 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1139 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1140 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1141 db_dict = {
1142 'collection': 'nsrs',
1143 'filter': {'_id': nsr_id},
1144 'path': db_update_entry
1145 }
1146 step = ""
1147 try:
1148
1149 element_type = 'NS'
1150 element_under_configuration = nsr_id
1151
1152 vnfr_id = None
1153 if db_vnfr:
1154 vnfr_id = db_vnfr["_id"]
1155 osm_config["osm"]["vnf_id"] = vnfr_id
1156
1157 namespace = "{nsi}.{ns}".format(
1158 nsi=nsi_id if nsi_id else "",
1159 ns=nsr_id)
1160
1161 if vnfr_id:
1162 element_type = 'VNF'
1163 element_under_configuration = vnfr_id
1164 namespace += ".{}".format(vnfr_id)
1165 if vdu_id:
1166 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1167 element_type = 'VDU'
1168 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1169 osm_config["osm"]["vdu_id"] = vdu_id
1170 elif kdu_name:
1171 namespace += ".{}".format(kdu_name)
1172 element_type = 'KDU'
1173 element_under_configuration = kdu_name
1174 osm_config["osm"]["kdu_name"] = kdu_name
1175
1176 # Get artifact path
1177 artifact_path = "{}/{}/{}/{}".format(
1178 base_folder["folder"],
1179 base_folder["pkg-dir"],
1180 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1181 vca_name
1182 )
1183
1184 self.logger.debug("Artifact path > {}".format(artifact_path))
1185
1186 # get initial_config_primitive_list that applies to this element
1187 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1188
1189 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1190
1191 # add config if not present for NS charm
1192 ee_descriptor_id = ee_config_descriptor.get("id")
1193 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1194 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1195 vca_deployed, ee_descriptor_id)
1196
1197 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1198 # n2vc_redesign STEP 3.1
1199 # find old ee_id if exists
1200 ee_id = vca_deployed.get("ee_id")
1201
1202 vim_account_id = (
1203 deep_get(db_vnfr, ("vim-account-id",)) or
1204 deep_get(deploy_params, ("OSM", "vim_account_id"))
1205 )
1206 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1207 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1208 # create or register execution environment in VCA
1209 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1210
1211 self._write_configuration_status(
1212 nsr_id=nsr_id,
1213 vca_index=vca_index,
1214 status='CREATING',
1215 element_under_configuration=element_under_configuration,
1216 element_type=element_type
1217 )
1218
1219 step = "create execution environment"
1220 self.logger.debug(logging_text + step)
1221
1222 ee_id = None
1223 credentials = None
1224 if vca_type == "k8s_proxy_charm":
1225 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1226 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1227 namespace=namespace,
1228 artifact_path=artifact_path,
1229 db_dict=db_dict,
1230 cloud_name=vca_k8s_cloud,
1231 credential_name=vca_k8s_cloud_credential,
1232 )
1233 elif vca_type == "helm" or vca_type == "helm-v3":
1234 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1235 namespace=namespace,
1236 reuse_ee_id=ee_id,
1237 db_dict=db_dict,
1238 config=osm_config,
1239 artifact_path=artifact_path,
1240 vca_type=vca_type
1241 )
1242 else:
1243 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1244 namespace=namespace,
1245 reuse_ee_id=ee_id,
1246 db_dict=db_dict,
1247 cloud_name=vca_cloud,
1248 credential_name=vca_cloud_credential,
1249 )
1250
1251 elif vca_type == "native_charm":
1252 step = "Waiting to VM being up and getting IP address"
1253 self.logger.debug(logging_text + step)
1254 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1255 user=None, pub_key=None)
1256 credentials = {"hostname": rw_mgmt_ip}
1257 # get username
1258 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1259 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1260 # merged. Meanwhile let's get username from initial-config-primitive
1261 if not username and initial_config_primitive_list:
1262 for config_primitive in initial_config_primitive_list:
1263 for param in config_primitive.get("parameter", ()):
1264 if param["name"] == "ssh-username":
1265 username = param["value"]
1266 break
1267 if not username:
1268 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1269 "'config-access.ssh-access.default-user'")
1270 credentials["username"] = username
1271 # n2vc_redesign STEP 3.2
1272
1273 self._write_configuration_status(
1274 nsr_id=nsr_id,
1275 vca_index=vca_index,
1276 status='REGISTERING',
1277 element_under_configuration=element_under_configuration,
1278 element_type=element_type
1279 )
1280
1281 step = "register execution environment {}".format(credentials)
1282 self.logger.debug(logging_text + step)
1283 ee_id = await self.vca_map[vca_type].register_execution_environment(
1284 credentials=credentials,
1285 namespace=namespace,
1286 db_dict=db_dict,
1287 cloud_name=vca_cloud,
1288 credential_name=vca_cloud_credential,
1289 )
1290
1291 # for compatibility with MON/POL modules, the need model and application name at database
1292 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1293 ee_id_parts = ee_id.split('.')
1294 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1295 if len(ee_id_parts) >= 2:
1296 model_name = ee_id_parts[0]
1297 application_name = ee_id_parts[1]
1298 db_nsr_update[db_update_entry + "model"] = model_name
1299 db_nsr_update[db_update_entry + "application"] = application_name
1300
1301 # n2vc_redesign STEP 3.3
1302 step = "Install configuration Software"
1303
1304 self._write_configuration_status(
1305 nsr_id=nsr_id,
1306 vca_index=vca_index,
1307 status='INSTALLING SW',
1308 element_under_configuration=element_under_configuration,
1309 element_type=element_type,
1310 other_update=db_nsr_update
1311 )
1312
1313 # TODO check if already done
1314 self.logger.debug(logging_text + step)
1315 config = None
1316 if vca_type == "native_charm":
1317 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1318 if config_primitive:
1319 config = self._map_primitive_params(
1320 config_primitive,
1321 {},
1322 deploy_params
1323 )
1324 num_units = 1
1325 if vca_type == "lxc_proxy_charm":
1326 if element_type == "NS":
1327 num_units = db_nsr.get("config-units") or 1
1328 elif element_type == "VNF":
1329 num_units = db_vnfr.get("config-units") or 1
1330 elif element_type == "VDU":
1331 for v in db_vnfr["vdur"]:
1332 if vdu_id == v["vdu-id-ref"]:
1333 num_units = v.get("config-units") or 1
1334 break
1335 if vca_type != "k8s_proxy_charm":
1336 await self.vca_map[vca_type].install_configuration_sw(
1337 ee_id=ee_id,
1338 artifact_path=artifact_path,
1339 db_dict=db_dict,
1340 config=config,
1341 num_units=num_units,
1342 )
1343
1344 # write in db flag of configuration_sw already installed
1345 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1346
1347 # add relations for this VCA (wait for other peers related with this VCA)
1348 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1349 vca_index=vca_index, vca_type=vca_type)
1350
1351 # if SSH access is required, then get execution environment SSH public
1352 # if native charm we have waited already to VM be UP
1353 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1354 pub_key = None
1355 user = None
1356 # self.logger.debug("get ssh key block")
1357 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1358 # self.logger.debug("ssh key needed")
1359 # Needed to inject a ssh key
1360 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1361 step = "Install configuration Software, getting public ssh key"
1362 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1363
1364 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1365 else:
1366 # self.logger.debug("no need to get ssh key")
1367 step = "Waiting to VM being up and getting IP address"
1368 self.logger.debug(logging_text + step)
1369
1370 # n2vc_redesign STEP 5.1
1371 # wait for RO (ip-address) Insert pub_key into VM
1372 if vnfr_id:
1373 if kdu_name:
1374 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1375 else:
1376 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1377 vdu_index, user=user, pub_key=pub_key)
1378 else:
1379 rw_mgmt_ip = None # This is for a NS configuration
1380
1381 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1382
1383 # store rw_mgmt_ip in deploy params for later replacement
1384 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1385
1386 # n2vc_redesign STEP 6 Execute initial config primitive
1387 step = 'execute initial config primitive'
1388
1389 # wait for dependent primitives execution (NS -> VNF -> VDU)
1390 if initial_config_primitive_list:
1391 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1392
1393 # stage, in function of element type: vdu, kdu, vnf or ns
1394 my_vca = vca_deployed_list[vca_index]
1395 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1396 # VDU or KDU
1397 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1398 elif my_vca.get("member-vnf-index"):
1399 # VNF
1400 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1401 else:
1402 # NS
1403 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1404
1405 self._write_configuration_status(
1406 nsr_id=nsr_id,
1407 vca_index=vca_index,
1408 status='EXECUTING PRIMITIVE'
1409 )
1410
1411 self._write_op_status(
1412 op_id=nslcmop_id,
1413 stage=stage
1414 )
1415
1416 check_if_terminated_needed = True
1417 for initial_config_primitive in initial_config_primitive_list:
1418 # adding information on the vca_deployed if it is a NS execution environment
1419 if not vca_deployed["member-vnf-index"]:
1420 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1421 # TODO check if already done
1422 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1423
1424 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1425 self.logger.debug(logging_text + step)
1426 await self.vca_map[vca_type].exec_primitive(
1427 ee_id=ee_id,
1428 primitive_name=initial_config_primitive["name"],
1429 params_dict=primitive_params_,
1430 db_dict=db_dict
1431 )
1432 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1433 if check_if_terminated_needed:
1434 if config_descriptor.get('terminate-config-primitive'):
1435 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1436 check_if_terminated_needed = False
1437
1438 # TODO register in database that primitive is done
1439
1440 # STEP 7 Configure metrics
1441 if vca_type == "helm" or vca_type == "helm-v3":
1442 prometheus_jobs = await self.add_prometheus_metrics(
1443 ee_id=ee_id,
1444 artifact_path=artifact_path,
1445 ee_config_descriptor=ee_config_descriptor,
1446 vnfr_id=vnfr_id,
1447 nsr_id=nsr_id,
1448 target_ip=rw_mgmt_ip,
1449 )
1450 if prometheus_jobs:
1451 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1452
1453 step = "instantiated at VCA"
1454 self.logger.debug(logging_text + step)
1455
1456 self._write_configuration_status(
1457 nsr_id=nsr_id,
1458 vca_index=vca_index,
1459 status='READY'
1460 )
1461
1462 except Exception as e: # TODO not use Exception but N2VC exception
1463 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1464 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1465 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1466 self._write_configuration_status(
1467 nsr_id=nsr_id,
1468 vca_index=vca_index,
1469 status='BROKEN'
1470 )
1471 raise LcmException("{} {}".format(step, e)) from e
1472
1473 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1474 error_description: str = None, error_detail: str = None, other_update: dict = None):
1475 """
1476 Update db_nsr fields.
1477 :param nsr_id:
1478 :param ns_state:
1479 :param current_operation:
1480 :param current_operation_id:
1481 :param error_description:
1482 :param error_detail:
1483 :param other_update: Other required changes at database if provided, will be cleared
1484 :return:
1485 """
1486 try:
1487 db_dict = other_update or {}
1488 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1489 db_dict["_admin.current-operation"] = current_operation_id
1490 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1491 db_dict["currentOperation"] = current_operation
1492 db_dict["currentOperationID"] = current_operation_id
1493 db_dict["errorDescription"] = error_description
1494 db_dict["errorDetail"] = error_detail
1495
1496 if ns_state:
1497 db_dict["nsState"] = ns_state
1498 self.update_db_2("nsrs", nsr_id, db_dict)
1499 except DbException as e:
1500 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1501
1502 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1503 operation_state: str = None, other_update: dict = None):
1504 try:
1505 db_dict = other_update or {}
1506 db_dict['queuePosition'] = queuePosition
1507 if isinstance(stage, list):
1508 db_dict['stage'] = stage[0]
1509 db_dict['detailed-status'] = " ".join(stage)
1510 elif stage is not None:
1511 db_dict['stage'] = str(stage)
1512
1513 if error_message is not None:
1514 db_dict['errorMessage'] = error_message
1515 if operation_state is not None:
1516 db_dict['operationState'] = operation_state
1517 db_dict["statusEnteredTime"] = time()
1518 self.update_db_2("nslcmops", op_id, db_dict)
1519 except DbException as e:
1520 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1521
1522 def _write_all_config_status(self, db_nsr: dict, status: str):
1523 try:
1524 nsr_id = db_nsr["_id"]
1525 # configurationStatus
1526 config_status = db_nsr.get('configurationStatus')
1527 if config_status:
1528 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1529 enumerate(config_status) if v}
1530 # update status
1531 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1532
1533 except DbException as e:
1534 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1535
1536 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1537 element_under_configuration: str = None, element_type: str = None,
1538 other_update: dict = None):
1539
1540 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1541 # .format(vca_index, status))
1542
1543 try:
1544 db_path = 'configurationStatus.{}.'.format(vca_index)
1545 db_dict = other_update or {}
1546 if status:
1547 db_dict[db_path + 'status'] = status
1548 if element_under_configuration:
1549 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1550 if element_type:
1551 db_dict[db_path + 'elementType'] = element_type
1552 self.update_db_2("nsrs", nsr_id, db_dict)
1553 except DbException as e:
1554 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1555 .format(status, nsr_id, vca_index, e))
1556
1557 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1558 """
1559 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1560 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1561 Database is used because the result can be obtained from a different LCM worker in case of HA.
1562 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1563 :param db_nslcmop: database content of nslcmop
1564 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1565 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1566 computed 'vim-account-id'
1567 """
1568 modified = False
1569 nslcmop_id = db_nslcmop['_id']
1570 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1571 if placement_engine == "PLA":
1572 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1573 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1574 db_poll_interval = 5
1575 wait = db_poll_interval * 10
1576 pla_result = None
1577 while not pla_result and wait >= 0:
1578 await asyncio.sleep(db_poll_interval)
1579 wait -= db_poll_interval
1580 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1581 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1582
1583 if not pla_result:
1584 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1585
1586 for pla_vnf in pla_result['vnf']:
1587 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1588 if not pla_vnf.get('vimAccountId') or not vnfr:
1589 continue
1590 modified = True
1591 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1592 # Modifies db_vnfrs
1593 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1594 return modified
1595
1596 def update_nsrs_with_pla_result(self, params):
1597 try:
1598 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1599 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1600 except Exception as e:
1601 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1602
1603 async def instantiate(self, nsr_id, nslcmop_id):
1604 """
1605
1606 :param nsr_id: ns instance to deploy
1607 :param nslcmop_id: operation to run
1608 :return:
1609 """
1610
1611 # Try to lock HA task here
1612 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1613 if not task_is_locked_by_me:
1614 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1615 return
1616
1617 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1618 self.logger.debug(logging_text + "Enter")
1619
1620 # get all needed from database
1621
1622 # database nsrs record
1623 db_nsr = None
1624
1625 # database nslcmops record
1626 db_nslcmop = None
1627
1628 # update operation on nsrs
1629 db_nsr_update = {}
1630 # update operation on nslcmops
1631 db_nslcmop_update = {}
1632
1633 nslcmop_operation_state = None
1634 db_vnfrs = {} # vnf's info indexed by member-index
1635 # n2vc_info = {}
1636 tasks_dict_info = {} # from task to info text
1637 exc = None
1638 error_list = []
1639 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1640 # ^ stage, step, VIM progress
1641 try:
1642 # wait for any previous tasks in process
1643 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1644
1645 stage[1] = "Sync filesystem from database."
1646 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1647
1648 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1649 stage[1] = "Reading from database."
1650 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1651 db_nsr_update["detailed-status"] = "creating"
1652 db_nsr_update["operational-status"] = "init"
1653 self._write_ns_status(
1654 nsr_id=nsr_id,
1655 ns_state="BUILDING",
1656 current_operation="INSTANTIATING",
1657 current_operation_id=nslcmop_id,
1658 other_update=db_nsr_update
1659 )
1660 self._write_op_status(
1661 op_id=nslcmop_id,
1662 stage=stage,
1663 queuePosition=0
1664 )
1665
1666 # read from db: operation
1667 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1668 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1669 ns_params = db_nslcmop.get("operationParams")
1670 if ns_params and ns_params.get("timeout_ns_deploy"):
1671 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1672 else:
1673 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1674
1675 # read from db: ns
1676 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1677 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1678 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1679 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1680 db_nsr["nsd"] = nsd
1681 # nsr_name = db_nsr["name"] # TODO short-name??
1682
1683 # read from db: vnf's of this ns
1684 stage[1] = "Getting vnfrs from db."
1685 self.logger.debug(logging_text + stage[1])
1686 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1687
1688 # read from db: vnfd's for every vnf
1689 db_vnfds = [] # every vnfd data
1690
1691 # for each vnf in ns, read vnfd
1692 for vnfr in db_vnfrs_list:
1693 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1694 vnfd_id = vnfr["vnfd-id"]
1695 vnfd_ref = vnfr["vnfd-ref"]
1696
1697 # if we haven't this vnfd, read it from db
1698 if vnfd_id not in db_vnfds:
1699 # read from db
1700 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1701 self.logger.debug(logging_text + stage[1])
1702 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1703
1704 # store vnfd
1705 db_vnfds.append(vnfd)
1706
1707 # Get or generates the _admin.deployed.VCA list
1708 vca_deployed_list = None
1709 if db_nsr["_admin"].get("deployed"):
1710 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1711 if vca_deployed_list is None:
1712 vca_deployed_list = []
1713 configuration_status_list = []
1714 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1715 db_nsr_update["configurationStatus"] = configuration_status_list
1716 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1717 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1718 elif isinstance(vca_deployed_list, dict):
1719 # maintain backward compatibility. Change a dict to list at database
1720 vca_deployed_list = list(vca_deployed_list.values())
1721 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1722 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1723
1724 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1725 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1726 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1727
1728 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1729 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1730 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1731 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1732
1733 # n2vc_redesign STEP 2 Deploy Network Scenario
1734 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1735 self._write_op_status(
1736 op_id=nslcmop_id,
1737 stage=stage
1738 )
1739
1740 stage[1] = "Deploying KDUs."
1741 # self.logger.debug(logging_text + "Before deploy_kdus")
1742 # Call to deploy_kdus in case exists the "vdu:kdu" param
1743 await self.deploy_kdus(
1744 logging_text=logging_text,
1745 nsr_id=nsr_id,
1746 nslcmop_id=nslcmop_id,
1747 db_vnfrs=db_vnfrs,
1748 db_vnfds=db_vnfds,
1749 task_instantiation_info=tasks_dict_info,
1750 )
1751
1752 stage[1] = "Getting VCA public key."
1753 # n2vc_redesign STEP 1 Get VCA public ssh-key
1754 # feature 1429. Add n2vc public key to needed VMs
1755 n2vc_key = self.n2vc.get_public_key()
1756 n2vc_key_list = [n2vc_key]
1757 if self.vca_config.get("public_key"):
1758 n2vc_key_list.append(self.vca_config["public_key"])
1759
1760 stage[1] = "Deploying NS at VIM."
1761 task_ro = asyncio.ensure_future(
1762 self.instantiate_RO(
1763 logging_text=logging_text,
1764 nsr_id=nsr_id,
1765 nsd=nsd,
1766 db_nsr=db_nsr,
1767 db_nslcmop=db_nslcmop,
1768 db_vnfrs=db_vnfrs,
1769 db_vnfds=db_vnfds,
1770 n2vc_key_list=n2vc_key_list,
1771 stage=stage
1772 )
1773 )
1774 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1775 tasks_dict_info[task_ro] = "Deploying at VIM"
1776
1777 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1778 stage[1] = "Deploying Execution Environments."
1779 self.logger.debug(logging_text + stage[1])
1780
1781 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1782 for vnf_profile in get_vnf_profiles(nsd):
1783 vnfd_id = vnf_profile["vnfd-id"]
1784 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1785 member_vnf_index = str(vnf_profile["id"])
1786 db_vnfr = db_vnfrs[member_vnf_index]
1787 base_folder = vnfd["_admin"]["storage"]
1788 vdu_id = None
1789 vdu_index = 0
1790 vdu_name = None
1791 kdu_name = None
1792
1793 # Get additional parameters
1794 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1795 if db_vnfr.get("additionalParamsForVnf"):
1796 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1797
1798 descriptor_config = get_vnf_configuration(vnfd)
1799 if descriptor_config:
1800 self._deploy_n2vc(
1801 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1802 db_nsr=db_nsr,
1803 db_vnfr=db_vnfr,
1804 nslcmop_id=nslcmop_id,
1805 nsr_id=nsr_id,
1806 nsi_id=nsi_id,
1807 vnfd_id=vnfd_id,
1808 vdu_id=vdu_id,
1809 kdu_name=kdu_name,
1810 member_vnf_index=member_vnf_index,
1811 vdu_index=vdu_index,
1812 vdu_name=vdu_name,
1813 deploy_params=deploy_params,
1814 descriptor_config=descriptor_config,
1815 base_folder=base_folder,
1816 task_instantiation_info=tasks_dict_info,
1817 stage=stage
1818 )
1819
1820 # Deploy charms for each VDU that supports one.
1821 for vdud in get_vdu_list(vnfd):
1822 vdu_id = vdud["id"]
1823 descriptor_config = get_vdu_configuration(vnfd, vdu_id)
1824 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1825
1826 if vdur.get("additionalParams"):
1827 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1828 else:
1829 deploy_params_vdu = deploy_params
1830 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1831 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1832
1833 self.logger.debug("VDUD > {}".format(vdud))
1834 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1835 if descriptor_config:
1836 vdu_name = None
1837 kdu_name = None
1838 for vdu_index in range(vdud_count):
1839 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1840 self._deploy_n2vc(
1841 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1842 member_vnf_index, vdu_id, vdu_index),
1843 db_nsr=db_nsr,
1844 db_vnfr=db_vnfr,
1845 nslcmop_id=nslcmop_id,
1846 nsr_id=nsr_id,
1847 nsi_id=nsi_id,
1848 vnfd_id=vnfd_id,
1849 vdu_id=vdu_id,
1850 kdu_name=kdu_name,
1851 member_vnf_index=member_vnf_index,
1852 vdu_index=vdu_index,
1853 vdu_name=vdu_name,
1854 deploy_params=deploy_params_vdu,
1855 descriptor_config=descriptor_config,
1856 base_folder=base_folder,
1857 task_instantiation_info=tasks_dict_info,
1858 stage=stage
1859 )
1860 for kdud in get_kdu_list(vnfd):
1861 kdu_name = kdud["name"]
1862 descriptor_config = kdud.get('kdu-configuration')
1863 if descriptor_config:
1864 vdu_id = None
1865 vdu_index = 0
1866 vdu_name = None
1867 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1868 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1869 if kdur.get("additionalParams"):
1870 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1871
1872 self._deploy_n2vc(
1873 logging_text=logging_text,
1874 db_nsr=db_nsr,
1875 db_vnfr=db_vnfr,
1876 nslcmop_id=nslcmop_id,
1877 nsr_id=nsr_id,
1878 nsi_id=nsi_id,
1879 vnfd_id=vnfd_id,
1880 vdu_id=vdu_id,
1881 kdu_name=kdu_name,
1882 member_vnf_index=member_vnf_index,
1883 vdu_index=vdu_index,
1884 vdu_name=vdu_name,
1885 deploy_params=deploy_params_kdu,
1886 descriptor_config=descriptor_config,
1887 base_folder=base_folder,
1888 task_instantiation_info=tasks_dict_info,
1889 stage=stage
1890 )
1891
1892 # Check if this NS has a charm configuration
1893 descriptor_config = nsd.get("ns-configuration")
1894 if descriptor_config and descriptor_config.get("juju"):
1895 vnfd_id = None
1896 db_vnfr = None
1897 member_vnf_index = None
1898 vdu_id = None
1899 kdu_name = None
1900 vdu_index = 0
1901 vdu_name = None
1902
1903 # Get additional parameters
1904 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1905 if db_nsr.get("additionalParamsForNs"):
1906 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1907 base_folder = nsd["_admin"]["storage"]
1908 self._deploy_n2vc(
1909 logging_text=logging_text,
1910 db_nsr=db_nsr,
1911 db_vnfr=db_vnfr,
1912 nslcmop_id=nslcmop_id,
1913 nsr_id=nsr_id,
1914 nsi_id=nsi_id,
1915 vnfd_id=vnfd_id,
1916 vdu_id=vdu_id,
1917 kdu_name=kdu_name,
1918 member_vnf_index=member_vnf_index,
1919 vdu_index=vdu_index,
1920 vdu_name=vdu_name,
1921 deploy_params=deploy_params,
1922 descriptor_config=descriptor_config,
1923 base_folder=base_folder,
1924 task_instantiation_info=tasks_dict_info,
1925 stage=stage
1926 )
1927
1928 # rest of staff will be done at finally
1929
1930 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1931 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1932 exc = e
1933 except asyncio.CancelledError:
1934 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1935 exc = "Operation was cancelled"
1936 except Exception as e:
1937 exc = traceback.format_exc()
1938 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1939 finally:
1940 if exc:
1941 error_list.append(str(exc))
1942 try:
1943 # wait for pending tasks
1944 if tasks_dict_info:
1945 stage[1] = "Waiting for instantiate pending tasks."
1946 self.logger.debug(logging_text + stage[1])
1947 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1948 stage, nslcmop_id, nsr_id=nsr_id)
1949 stage[1] = stage[2] = ""
1950 except asyncio.CancelledError:
1951 error_list.append("Cancelled")
1952 # TODO cancel all tasks
1953 except Exception as exc:
1954 error_list.append(str(exc))
1955
1956 # update operation-status
1957 db_nsr_update["operational-status"] = "running"
1958 # let's begin with VCA 'configured' status (later we can change it)
1959 db_nsr_update["config-status"] = "configured"
1960 for task, task_name in tasks_dict_info.items():
1961 if not task.done() or task.cancelled() or task.exception():
1962 if task_name.startswith(self.task_name_deploy_vca):
1963 # A N2VC task is pending
1964 db_nsr_update["config-status"] = "failed"
1965 else:
1966 # RO or KDU task is pending
1967 db_nsr_update["operational-status"] = "failed"
1968
1969 # update status at database
1970 if error_list:
1971 error_detail = ". ".join(error_list)
1972 self.logger.error(logging_text + error_detail)
1973 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
1974 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
1975
1976 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1977 db_nslcmop_update["detailed-status"] = error_detail
1978 nslcmop_operation_state = "FAILED"
1979 ns_state = "BROKEN"
1980 else:
1981 error_detail = None
1982 error_description_nsr = error_description_nslcmop = None
1983 ns_state = "READY"
1984 db_nsr_update["detailed-status"] = "Done"
1985 db_nslcmop_update["detailed-status"] = "Done"
1986 nslcmop_operation_state = "COMPLETED"
1987
1988 if db_nsr:
1989 self._write_ns_status(
1990 nsr_id=nsr_id,
1991 ns_state=ns_state,
1992 current_operation="IDLE",
1993 current_operation_id=None,
1994 error_description=error_description_nsr,
1995 error_detail=error_detail,
1996 other_update=db_nsr_update
1997 )
1998 self._write_op_status(
1999 op_id=nslcmop_id,
2000 stage="",
2001 error_message=error_description_nslcmop,
2002 operation_state=nslcmop_operation_state,
2003 other_update=db_nslcmop_update,
2004 )
2005
2006 if nslcmop_operation_state:
2007 try:
2008 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2009 "operationState": nslcmop_operation_state},
2010 loop=self.loop)
2011 except Exception as e:
2012 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2013
2014 self.logger.debug(logging_text + "Exit")
2015 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2016
2017 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2018 timeout: int = 3600, vca_type: str = None) -> bool:
2019
2020 # steps:
2021 # 1. find all relations for this VCA
2022 # 2. wait for other peers related
2023 # 3. add relations
2024
2025 try:
2026 vca_type = vca_type or "lxc_proxy_charm"
2027
2028 # STEP 1: find all relations for this VCA
2029
2030 # read nsr record
2031 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2032 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2033
2034 # this VCA data
2035 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2036
2037 # read all ns-configuration relations
2038 ns_relations = list()
2039 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2040 if db_ns_relations:
2041 for r in db_ns_relations:
2042 # check if this VCA is in the relation
2043 if my_vca.get('member-vnf-index') in\
2044 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2045 ns_relations.append(r)
2046
2047 # read all vnf-configuration relations
2048 vnf_relations = list()
2049 db_vnfd_list = db_nsr.get('vnfd-id')
2050 if db_vnfd_list:
2051 for vnfd in db_vnfd_list:
2052 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2053 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2054 if db_vnf_relations:
2055 for r in db_vnf_relations:
2056 # check if this VCA is in the relation
2057 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2058 vnf_relations.append(r)
2059
2060 # if no relations, terminate
2061 if not ns_relations and not vnf_relations:
2062 self.logger.debug(logging_text + ' No relations')
2063 return True
2064
2065 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2066
2067 # add all relations
2068 start = time()
2069 while True:
2070 # check timeout
2071 now = time()
2072 if now - start >= timeout:
2073 self.logger.error(logging_text + ' : timeout adding relations')
2074 return False
2075
2076 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2077 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2078
2079 # for each defined NS relation, find the VCA's related
2080 for r in ns_relations.copy():
2081 from_vca_ee_id = None
2082 to_vca_ee_id = None
2083 from_vca_endpoint = None
2084 to_vca_endpoint = None
2085 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2086 for vca in vca_list:
2087 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2088 and vca.get('config_sw_installed'):
2089 from_vca_ee_id = vca.get('ee_id')
2090 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2091 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2092 and vca.get('config_sw_installed'):
2093 to_vca_ee_id = vca.get('ee_id')
2094 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2095 if from_vca_ee_id and to_vca_ee_id:
2096 # add relation
2097 await self.vca_map[vca_type].add_relation(
2098 ee_id_1=from_vca_ee_id,
2099 ee_id_2=to_vca_ee_id,
2100 endpoint_1=from_vca_endpoint,
2101 endpoint_2=to_vca_endpoint)
2102 # remove entry from relations list
2103 ns_relations.remove(r)
2104 else:
2105 # check failed peers
2106 try:
2107 vca_status_list = db_nsr.get('configurationStatus')
2108 if vca_status_list:
2109 for i in range(len(vca_list)):
2110 vca = vca_list[i]
2111 vca_status = vca_status_list[i]
2112 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2113 if vca_status.get('status') == 'BROKEN':
2114 # peer broken: remove relation from list
2115 ns_relations.remove(r)
2116 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2117 if vca_status.get('status') == 'BROKEN':
2118 # peer broken: remove relation from list
2119 ns_relations.remove(r)
2120 except Exception:
2121 # ignore
2122 pass
2123
2124 # for each defined VNF relation, find the VCA's related
2125 for r in vnf_relations.copy():
2126 from_vca_ee_id = None
2127 to_vca_ee_id = None
2128 from_vca_endpoint = None
2129 to_vca_endpoint = None
2130 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2131 for vca in vca_list:
2132 key_to_check = "vdu_id"
2133 if vca.get("vdu_id") is None:
2134 key_to_check = "vnfd_id"
2135 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2136 from_vca_ee_id = vca.get('ee_id')
2137 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2138 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2139 to_vca_ee_id = vca.get('ee_id')
2140 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2141 if from_vca_ee_id and to_vca_ee_id:
2142 # add relation
2143 await self.vca_map[vca_type].add_relation(
2144 ee_id_1=from_vca_ee_id,
2145 ee_id_2=to_vca_ee_id,
2146 endpoint_1=from_vca_endpoint,
2147 endpoint_2=to_vca_endpoint)
2148 # remove entry from relations list
2149 vnf_relations.remove(r)
2150 else:
2151 # check failed peers
2152 try:
2153 vca_status_list = db_nsr.get('configurationStatus')
2154 if vca_status_list:
2155 for i in range(len(vca_list)):
2156 vca = vca_list[i]
2157 vca_status = vca_status_list[i]
2158 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2159 if vca_status.get('status') == 'BROKEN':
2160 # peer broken: remove relation from list
2161 vnf_relations.remove(r)
2162 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2163 if vca_status.get('status') == 'BROKEN':
2164 # peer broken: remove relation from list
2165 vnf_relations.remove(r)
2166 except Exception:
2167 # ignore
2168 pass
2169
2170 # wait for next try
2171 await asyncio.sleep(5.0)
2172
2173 if not ns_relations and not vnf_relations:
2174 self.logger.debug('Relations added')
2175 break
2176
2177 return True
2178
2179 except Exception as e:
2180 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2181 return False
2182
2183 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2184 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2185
2186 try:
2187 k8sclustertype = k8s_instance_info["k8scluster-type"]
2188 # Instantiate kdu
2189 db_dict_install = {"collection": "nsrs",
2190 "filter": {"_id": nsr_id},
2191 "path": nsr_db_path}
2192
2193 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2194 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2195 kdu_model=k8s_instance_info["kdu-model"],
2196 atomic=True,
2197 params=k8params,
2198 db_dict=db_dict_install,
2199 timeout=timeout,
2200 kdu_name=k8s_instance_info["kdu-name"],
2201 namespace=k8s_instance_info["namespace"])
2202 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2203
2204 # Obtain services to obtain management service ip
2205 services = await self.k8scluster_map[k8sclustertype].get_services(
2206 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2207 kdu_instance=kdu_instance,
2208 namespace=k8s_instance_info["namespace"])
2209
2210 # Obtain management service info (if exists)
2211 vnfr_update_dict = {}
2212 if services:
2213 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2214 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2215 for mgmt_service in mgmt_services:
2216 for service in services:
2217 if service["name"].startswith(mgmt_service["name"]):
2218 # Mgmt service found, Obtain service ip
2219 ip = service.get("external_ip", service.get("cluster_ip"))
2220 if isinstance(ip, list) and len(ip) == 1:
2221 ip = ip[0]
2222
2223 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2224
2225 # Check if must update also mgmt ip at the vnf
2226 service_external_cp = mgmt_service.get("external-connection-point-ref")
2227 if service_external_cp:
2228 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2229 vnfr_update_dict["ip-address"] = ip
2230
2231 break
2232 else:
2233 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2234
2235 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2236 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2237
2238 kdu_config = kdud.get("kdu-configuration")
2239 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2240 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2241 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2242
2243 for initial_config_primitive in initial_config_primitive_list:
2244 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2245
2246 await asyncio.wait_for(
2247 self.k8scluster_map[k8sclustertype].exec_primitive(
2248 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2249 kdu_instance=kdu_instance,
2250 primitive_name=initial_config_primitive["name"],
2251 params=primitive_params_, db_dict={}),
2252 timeout=timeout)
2253
2254 except Exception as e:
2255 # Prepare update db with error and raise exception
2256 try:
2257 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2258 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2259 except Exception:
2260 # ignore to keep original exception
2261 pass
2262 # reraise original error
2263 raise
2264
2265 return kdu_instance
2266
2267 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2268 # Launch kdus if present in the descriptor
2269
2270 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2271
2272 async def _get_cluster_id(cluster_id, cluster_type):
2273 nonlocal k8scluster_id_2_uuic
2274 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2275 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2276
2277 # check if K8scluster is creating and wait look if previous tasks in process
2278 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2279 if task_dependency:
2280 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2281 self.logger.debug(logging_text + text)
2282 await asyncio.wait(task_dependency, timeout=3600)
2283
2284 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2285 if not db_k8scluster:
2286 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2287
2288 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2289 if not k8s_id:
2290 if cluster_type == "helm-chart-v3":
2291 try:
2292 # backward compatibility for existing clusters that have not been initialized for helm v3
2293 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2294 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2295 reuse_cluster_uuid=cluster_id)
2296 db_k8scluster_update = {}
2297 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2298 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2299 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2300 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2301 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2302 except Exception as e:
2303 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2304 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2305 cluster_type))
2306 else:
2307 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2308 format(cluster_id, cluster_type))
2309 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2310 return k8s_id
2311
2312 logging_text += "Deploy kdus: "
2313 step = ""
2314 try:
2315 db_nsr_update = {"_admin.deployed.K8s": []}
2316 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2317
2318 index = 0
2319 updated_cluster_list = []
2320 updated_v3_cluster_list = []
2321
2322 for vnfr_data in db_vnfrs.values():
2323 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2324 # Step 0: Prepare and set parameters
2325 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2326 vnfd_id = vnfr_data.get('vnfd-id')
2327 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2328 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2329 namespace = kdur.get("k8s-namespace")
2330 if kdur.get("helm-chart"):
2331 kdumodel = kdur["helm-chart"]
2332 # Default version: helm3, if helm-version is v2 assign v2
2333 k8sclustertype = "helm-chart-v3"
2334 self.logger.debug("kdur: {}".format(kdur))
2335 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2336 k8sclustertype = "helm-chart"
2337 elif kdur.get("juju-bundle"):
2338 kdumodel = kdur["juju-bundle"]
2339 k8sclustertype = "juju-bundle"
2340 else:
2341 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2342 "juju-bundle. Maybe an old NBI version is running".
2343 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2344 # check if kdumodel is a file and exists
2345 try:
2346 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2347 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2348 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2349 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2350 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2351 kdumodel)
2352 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2353 kdumodel = self.fs.path + filename
2354 except (asyncio.TimeoutError, asyncio.CancelledError):
2355 raise
2356 except Exception: # it is not a file
2357 pass
2358
2359 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2360 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2361 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2362
2363 # Synchronize repos
2364 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2365 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2366 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2367 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2368 if del_repo_list or added_repo_dict:
2369 if k8sclustertype == "helm-chart":
2370 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2371 updated = {'_admin.helm_charts_added.' +
2372 item: name for item, name in added_repo_dict.items()}
2373 updated_cluster_list.append(cluster_uuid)
2374 elif k8sclustertype == "helm-chart-v3":
2375 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2376 updated = {'_admin.helm_charts_v3_added.' +
2377 item: name for item, name in added_repo_dict.items()}
2378 updated_v3_cluster_list.append(cluster_uuid)
2379 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2380 "'{}' to_delete: {}, to_add: {}".
2381 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2382 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2383
2384 # Instantiate kdu
2385 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2386 kdur["kdu-name"], k8s_cluster_id)
2387 k8s_instance_info = {"kdu-instance": None,
2388 "k8scluster-uuid": cluster_uuid,
2389 "k8scluster-type": k8sclustertype,
2390 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2391 "kdu-name": kdur["kdu-name"],
2392 "kdu-model": kdumodel,
2393 "namespace": namespace}
2394 db_path = "_admin.deployed.K8s.{}".format(index)
2395 db_nsr_update[db_path] = k8s_instance_info
2396 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2397 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2398 task = asyncio.ensure_future(
2399 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2400 k8s_instance_info, k8params=desc_params, timeout=600))
2401 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2402 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2403
2404 index += 1
2405
2406 except (LcmException, asyncio.CancelledError):
2407 raise
2408 except Exception as e:
2409 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2410 if isinstance(e, (N2VCException, DbException)):
2411 self.logger.error(logging_text + msg)
2412 else:
2413 self.logger.critical(logging_text + msg, exc_info=True)
2414 raise LcmException(msg)
2415 finally:
2416 if db_nsr_update:
2417 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2418
2419 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2420 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2421 base_folder, task_instantiation_info, stage):
2422 # launch instantiate_N2VC in a asyncio task and register task object
2423 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2424 # if not found, create one entry and update database
2425 # fill db_nsr._admin.deployed.VCA.<index>
2426
2427 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2428 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2429 ee_list = [descriptor_config]
2430 elif descriptor_config.get("execution-environment-list"):
2431 ee_list = descriptor_config.get("execution-environment-list")
2432 else: # other types as script are not supported
2433 ee_list = []
2434
2435 for ee_item in ee_list:
2436 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2437 ee_item.get("helm-chart")))
2438 ee_descriptor_id = ee_item.get("id")
2439 if ee_item.get("juju"):
2440 vca_name = ee_item['juju'].get('charm')
2441 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2442 if ee_item['juju'].get('cloud') == "k8s":
2443 vca_type = "k8s_proxy_charm"
2444 elif ee_item['juju'].get('proxy') is False:
2445 vca_type = "native_charm"
2446 elif ee_item.get("helm-chart"):
2447 vca_name = ee_item['helm-chart']
2448 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2449 vca_type = "helm"
2450 else:
2451 vca_type = "helm-v3"
2452 else:
2453 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2454 continue
2455
2456 vca_index = -1
2457 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2458 if not vca_deployed:
2459 continue
2460 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2461 vca_deployed.get("vdu_id") == vdu_id and \
2462 vca_deployed.get("kdu_name") == kdu_name and \
2463 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2464 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2465 break
2466 else:
2467 # not found, create one.
2468 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2469 if vdu_id:
2470 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2471 elif kdu_name:
2472 target += "/kdu/{}".format(kdu_name)
2473 vca_deployed = {
2474 "target_element": target,
2475 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2476 "member-vnf-index": member_vnf_index,
2477 "vdu_id": vdu_id,
2478 "kdu_name": kdu_name,
2479 "vdu_count_index": vdu_index,
2480 "operational-status": "init", # TODO revise
2481 "detailed-status": "", # TODO revise
2482 "step": "initial-deploy", # TODO revise
2483 "vnfd_id": vnfd_id,
2484 "vdu_name": vdu_name,
2485 "type": vca_type,
2486 "ee_descriptor_id": ee_descriptor_id
2487 }
2488 vca_index += 1
2489
2490 # create VCA and configurationStatus in db
2491 db_dict = {
2492 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2493 "configurationStatus.{}".format(vca_index): dict()
2494 }
2495 self.update_db_2("nsrs", nsr_id, db_dict)
2496
2497 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2498
2499 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2500 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2501 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2502
2503 # Launch task
2504 task_n2vc = asyncio.ensure_future(
2505 self.instantiate_N2VC(
2506 logging_text=logging_text,
2507 vca_index=vca_index,
2508 nsi_id=nsi_id,
2509 db_nsr=db_nsr,
2510 db_vnfr=db_vnfr,
2511 vdu_id=vdu_id,
2512 kdu_name=kdu_name,
2513 vdu_index=vdu_index,
2514 deploy_params=deploy_params,
2515 config_descriptor=descriptor_config,
2516 base_folder=base_folder,
2517 nslcmop_id=nslcmop_id,
2518 stage=stage,
2519 vca_type=vca_type,
2520 vca_name=vca_name,
2521 ee_config_descriptor=ee_item
2522 )
2523 )
2524 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2525 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2526 member_vnf_index or "", vdu_id or "")
2527
2528 @staticmethod
2529 def _create_nslcmop(nsr_id, operation, params):
2530 """
2531 Creates a ns-lcm-opp content to be stored at database.
2532 :param nsr_id: internal id of the instance
2533 :param operation: instantiate, terminate, scale, action, ...
2534 :param params: user parameters for the operation
2535 :return: dictionary following SOL005 format
2536 """
2537 # Raise exception if invalid arguments
2538 if not (nsr_id and operation and params):
2539 raise LcmException(
2540 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2541 now = time()
2542 _id = str(uuid4())
2543 nslcmop = {
2544 "id": _id,
2545 "_id": _id,
2546 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2547 "operationState": "PROCESSING",
2548 "statusEnteredTime": now,
2549 "nsInstanceId": nsr_id,
2550 "lcmOperationType": operation,
2551 "startTime": now,
2552 "isAutomaticInvocation": False,
2553 "operationParams": params,
2554 "isCancelPending": False,
2555 "links": {
2556 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2557 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2558 }
2559 }
2560 return nslcmop
2561
2562 def _format_additional_params(self, params):
2563 params = params or {}
2564 for key, value in params.items():
2565 if str(value).startswith("!!yaml "):
2566 params[key] = yaml.safe_load(value[7:])
2567 return params
2568
2569 def _get_terminate_primitive_params(self, seq, vnf_index):
2570 primitive = seq.get('name')
2571 primitive_params = {}
2572 params = {
2573 "member_vnf_index": vnf_index,
2574 "primitive": primitive,
2575 "primitive_params": primitive_params,
2576 }
2577 desc_params = {}
2578 return self._map_primitive_params(seq, params, desc_params)
2579
2580 # sub-operations
2581
2582 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2583 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2584 if op.get('operationState') == 'COMPLETED':
2585 # b. Skip sub-operation
2586 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2587 return self.SUBOPERATION_STATUS_SKIP
2588 else:
2589 # c. retry executing sub-operation
2590 # The sub-operation exists, and operationState != 'COMPLETED'
2591 # Update operationState = 'PROCESSING' to indicate a retry.
2592 operationState = 'PROCESSING'
2593 detailed_status = 'In progress'
2594 self._update_suboperation_status(
2595 db_nslcmop, op_index, operationState, detailed_status)
2596 # Return the sub-operation index
2597 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2598 # with arguments extracted from the sub-operation
2599 return op_index
2600
2601 # Find a sub-operation where all keys in a matching dictionary must match
2602 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2603 def _find_suboperation(self, db_nslcmop, match):
2604 if db_nslcmop and match:
2605 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2606 for i, op in enumerate(op_list):
2607 if all(op.get(k) == match[k] for k in match):
2608 return i
2609 return self.SUBOPERATION_STATUS_NOT_FOUND
2610
2611 # Update status for a sub-operation given its index
2612 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2613 # Update DB for HA tasks
2614 q_filter = {'_id': db_nslcmop['_id']}
2615 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2616 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2617 self.db.set_one("nslcmops",
2618 q_filter=q_filter,
2619 update_dict=update_dict,
2620 fail_on_empty=False)
2621
2622 # Add sub-operation, return the index of the added sub-operation
2623 # Optionally, set operationState, detailed-status, and operationType
2624 # Status and type are currently set for 'scale' sub-operations:
2625 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2626 # 'detailed-status' : status message
2627 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2628 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2629 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2630 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2631 RO_nsr_id=None, RO_scaling_info=None):
2632 if not db_nslcmop:
2633 return self.SUBOPERATION_STATUS_NOT_FOUND
2634 # Get the "_admin.operations" list, if it exists
2635 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2636 op_list = db_nslcmop_admin.get('operations')
2637 # Create or append to the "_admin.operations" list
2638 new_op = {'member_vnf_index': vnf_index,
2639 'vdu_id': vdu_id,
2640 'vdu_count_index': vdu_count_index,
2641 'primitive': primitive,
2642 'primitive_params': mapped_primitive_params}
2643 if operationState:
2644 new_op['operationState'] = operationState
2645 if detailed_status:
2646 new_op['detailed-status'] = detailed_status
2647 if operationType:
2648 new_op['lcmOperationType'] = operationType
2649 if RO_nsr_id:
2650 new_op['RO_nsr_id'] = RO_nsr_id
2651 if RO_scaling_info:
2652 new_op['RO_scaling_info'] = RO_scaling_info
2653 if not op_list:
2654 # No existing operations, create key 'operations' with current operation as first list element
2655 db_nslcmop_admin.update({'operations': [new_op]})
2656 op_list = db_nslcmop_admin.get('operations')
2657 else:
2658 # Existing operations, append operation to list
2659 op_list.append(new_op)
2660
2661 db_nslcmop_update = {'_admin.operations': op_list}
2662 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2663 op_index = len(op_list) - 1
2664 return op_index
2665
2666 # Helper methods for scale() sub-operations
2667
2668 # pre-scale/post-scale:
2669 # Check for 3 different cases:
2670 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2671 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2672 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2673 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2674 operationType, RO_nsr_id=None, RO_scaling_info=None):
2675 # Find this sub-operation
2676 if RO_nsr_id and RO_scaling_info:
2677 operationType = 'SCALE-RO'
2678 match = {
2679 'member_vnf_index': vnf_index,
2680 'RO_nsr_id': RO_nsr_id,
2681 'RO_scaling_info': RO_scaling_info,
2682 }
2683 else:
2684 match = {
2685 'member_vnf_index': vnf_index,
2686 'primitive': vnf_config_primitive,
2687 'primitive_params': primitive_params,
2688 'lcmOperationType': operationType
2689 }
2690 op_index = self._find_suboperation(db_nslcmop, match)
2691 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2692 # a. New sub-operation
2693 # The sub-operation does not exist, add it.
2694 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2695 # The following parameters are set to None for all kind of scaling:
2696 vdu_id = None
2697 vdu_count_index = None
2698 vdu_name = None
2699 if RO_nsr_id and RO_scaling_info:
2700 vnf_config_primitive = None
2701 primitive_params = None
2702 else:
2703 RO_nsr_id = None
2704 RO_scaling_info = None
2705 # Initial status for sub-operation
2706 operationState = 'PROCESSING'
2707 detailed_status = 'In progress'
2708 # Add sub-operation for pre/post-scaling (zero or more operations)
2709 self._add_suboperation(db_nslcmop,
2710 vnf_index,
2711 vdu_id,
2712 vdu_count_index,
2713 vdu_name,
2714 vnf_config_primitive,
2715 primitive_params,
2716 operationState,
2717 detailed_status,
2718 operationType,
2719 RO_nsr_id,
2720 RO_scaling_info)
2721 return self.SUBOPERATION_STATUS_NEW
2722 else:
2723 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2724 # or op_index (operationState != 'COMPLETED')
2725 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2726
2727 # Function to return execution_environment id
2728
2729 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2730 # TODO vdu_index_count
2731 for vca in vca_deployed_list:
2732 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2733 return vca["ee_id"]
2734
2735 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2736 vca_index, destroy_ee=True, exec_primitives=True):
2737 """
2738 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2739 :param logging_text:
2740 :param db_nslcmop:
2741 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2742 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2743 :param vca_index: index in the database _admin.deployed.VCA
2744 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2745 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2746 not executed properly
2747 :return: None or exception
2748 """
2749
2750 self.logger.debug(
2751 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2752 vca_index, vca_deployed, config_descriptor, destroy_ee
2753 )
2754 )
2755
2756 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2757
2758 # execute terminate_primitives
2759 if exec_primitives:
2760 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2761 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2762 vdu_id = vca_deployed.get("vdu_id")
2763 vdu_count_index = vca_deployed.get("vdu_count_index")
2764 vdu_name = vca_deployed.get("vdu_name")
2765 vnf_index = vca_deployed.get("member-vnf-index")
2766 if terminate_primitives and vca_deployed.get("needed_terminate"):
2767 for seq in terminate_primitives:
2768 # For each sequence in list, get primitive and call _ns_execute_primitive()
2769 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2770 vnf_index, seq.get("name"))
2771 self.logger.debug(logging_text + step)
2772 # Create the primitive for each sequence, i.e. "primitive": "touch"
2773 primitive = seq.get('name')
2774 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2775
2776 # Add sub-operation
2777 self._add_suboperation(db_nslcmop,
2778 vnf_index,
2779 vdu_id,
2780 vdu_count_index,
2781 vdu_name,
2782 primitive,
2783 mapped_primitive_params)
2784 # Sub-operations: Call _ns_execute_primitive() instead of action()
2785 try:
2786 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2787 mapped_primitive_params,
2788 vca_type=vca_type)
2789 except LcmException:
2790 # this happens when VCA is not deployed. In this case it is not needed to terminate
2791 continue
2792 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2793 if result not in result_ok:
2794 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2795 "error {}".format(seq.get("name"), vnf_index, result_detail))
2796 # set that this VCA do not need terminated
2797 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2798 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2799
2800 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2801 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2802
2803 if destroy_ee:
2804 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
2805
2806 async def _delete_all_N2VC(self, db_nsr: dict):
2807 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2808 namespace = "." + db_nsr["_id"]
2809 try:
2810 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2811 except N2VCNotFound: # already deleted. Skip
2812 pass
2813 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2814
2815 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2816 """
2817 Terminates a deployment from RO
2818 :param logging_text:
2819 :param nsr_deployed: db_nsr._admin.deployed
2820 :param nsr_id:
2821 :param nslcmop_id:
2822 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2823 this method will update only the index 2, but it will write on database the concatenated content of the list
2824 :return:
2825 """
2826 db_nsr_update = {}
2827 failed_detail = []
2828 ro_nsr_id = ro_delete_action = None
2829 if nsr_deployed and nsr_deployed.get("RO"):
2830 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2831 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2832 try:
2833 if ro_nsr_id:
2834 stage[2] = "Deleting ns from VIM."
2835 db_nsr_update["detailed-status"] = " ".join(stage)
2836 self._write_op_status(nslcmop_id, stage)
2837 self.logger.debug(logging_text + stage[2])
2838 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2839 self._write_op_status(nslcmop_id, stage)
2840 desc = await self.RO.delete("ns", ro_nsr_id)
2841 ro_delete_action = desc["action_id"]
2842 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2843 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2844 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2845 if ro_delete_action:
2846 # wait until NS is deleted from VIM
2847 stage[2] = "Waiting ns deleted from VIM."
2848 detailed_status_old = None
2849 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2850 ro_delete_action))
2851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2852 self._write_op_status(nslcmop_id, stage)
2853
2854 delete_timeout = 20 * 60 # 20 minutes
2855 while delete_timeout > 0:
2856 desc = await self.RO.show(
2857 "ns",
2858 item_id_name=ro_nsr_id,
2859 extra_item="action",
2860 extra_item_id=ro_delete_action)
2861
2862 # deploymentStatus
2863 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2864
2865 ns_status, ns_status_info = self.RO.check_action_status(desc)
2866 if ns_status == "ERROR":
2867 raise ROclient.ROClientException(ns_status_info)
2868 elif ns_status == "BUILD":
2869 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2870 elif ns_status == "ACTIVE":
2871 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2872 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2873 break
2874 else:
2875 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2876 if stage[2] != detailed_status_old:
2877 detailed_status_old = stage[2]
2878 db_nsr_update["detailed-status"] = " ".join(stage)
2879 self._write_op_status(nslcmop_id, stage)
2880 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2881 await asyncio.sleep(5, loop=self.loop)
2882 delete_timeout -= 5
2883 else: # delete_timeout <= 0:
2884 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2885
2886 except Exception as e:
2887 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2888 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2889 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2890 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2891 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2892 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2893 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2894 failed_detail.append("delete conflict: {}".format(e))
2895 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2896 else:
2897 failed_detail.append("delete error: {}".format(e))
2898 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2899
2900 # Delete nsd
2901 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2902 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2903 try:
2904 stage[2] = "Deleting nsd from RO."
2905 db_nsr_update["detailed-status"] = " ".join(stage)
2906 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2907 self._write_op_status(nslcmop_id, stage)
2908 await self.RO.delete("nsd", ro_nsd_id)
2909 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2910 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2911 except Exception as e:
2912 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2913 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2914 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2915 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2916 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2917 self.logger.debug(logging_text + failed_detail[-1])
2918 else:
2919 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2920 self.logger.error(logging_text + failed_detail[-1])
2921
2922 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2923 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2924 if not vnf_deployed or not vnf_deployed["id"]:
2925 continue
2926 try:
2927 ro_vnfd_id = vnf_deployed["id"]
2928 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2929 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2930 db_nsr_update["detailed-status"] = " ".join(stage)
2931 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2932 self._write_op_status(nslcmop_id, stage)
2933 await self.RO.delete("vnfd", ro_vnfd_id)
2934 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2935 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2936 except Exception as e:
2937 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2938 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2939 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2940 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2941 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2942 self.logger.debug(logging_text + failed_detail[-1])
2943 else:
2944 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2945 self.logger.error(logging_text + failed_detail[-1])
2946
2947 if failed_detail:
2948 stage[2] = "Error deleting from VIM"
2949 else:
2950 stage[2] = "Deleted from VIM"
2951 db_nsr_update["detailed-status"] = " ".join(stage)
2952 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2953 self._write_op_status(nslcmop_id, stage)
2954
2955 if failed_detail:
2956 raise LcmException("; ".join(failed_detail))
2957
2958 async def terminate(self, nsr_id, nslcmop_id):
2959 # Try to lock HA task here
2960 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2961 if not task_is_locked_by_me:
2962 return
2963
2964 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2965 self.logger.debug(logging_text + "Enter")
2966 timeout_ns_terminate = self.timeout_ns_terminate
2967 db_nsr = None
2968 db_nslcmop = None
2969 operation_params = None
2970 exc = None
2971 error_list = [] # annotates all failed error messages
2972 db_nslcmop_update = {}
2973 autoremove = False # autoremove after terminated
2974 tasks_dict_info = {}
2975 db_nsr_update = {}
2976 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2977 # ^ contains [stage, step, VIM-status]
2978 try:
2979 # wait for any previous tasks in process
2980 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2981
2982 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2983 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2984 operation_params = db_nslcmop.get("operationParams") or {}
2985 if operation_params.get("timeout_ns_terminate"):
2986 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2987 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2988 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2989
2990 db_nsr_update["operational-status"] = "terminating"
2991 db_nsr_update["config-status"] = "terminating"
2992 self._write_ns_status(
2993 nsr_id=nsr_id,
2994 ns_state="TERMINATING",
2995 current_operation="TERMINATING",
2996 current_operation_id=nslcmop_id,
2997 other_update=db_nsr_update
2998 )
2999 self._write_op_status(
3000 op_id=nslcmop_id,
3001 queuePosition=0,
3002 stage=stage
3003 )
3004 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3005 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3006 return
3007
3008 stage[1] = "Getting vnf descriptors from db."
3009 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3010 db_vnfds_from_id = {}
3011 db_vnfds_from_member_index = {}
3012 # Loop over VNFRs
3013 for vnfr in db_vnfrs_list:
3014 vnfd_id = vnfr["vnfd-id"]
3015 if vnfd_id not in db_vnfds_from_id:
3016 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3017 db_vnfds_from_id[vnfd_id] = vnfd
3018 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3019
3020 # Destroy individual execution environments when there are terminating primitives.
3021 # Rest of EE will be deleted at once
3022 # TODO - check before calling _destroy_N2VC
3023 # if not operation_params.get("skip_terminate_primitives"):#
3024 # or not vca.get("needed_terminate"):
3025 stage[0] = "Stage 2/3 execute terminating primitives."
3026 self.logger.debug(logging_text + stage[0])
3027 stage[1] = "Looking execution environment that needs terminate."
3028 self.logger.debug(logging_text + stage[1])
3029
3030 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3031 config_descriptor = None
3032 if not vca or not vca.get("ee_id"):
3033 continue
3034 if not vca.get("member-vnf-index"):
3035 # ns
3036 config_descriptor = db_nsr.get("ns-configuration")
3037 elif vca.get("vdu_id"):
3038 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3039 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3040 if vdud:
3041 config_descriptor = vdud.get("vdu-configuration")
3042 elif vca.get("kdu_name"):
3043 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3044 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3045 if kdud:
3046 config_descriptor = kdud.get("kdu-configuration")
3047 else:
3048 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3049 vca_type = vca.get("type")
3050 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3051 vca.get("needed_terminate"))
3052 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3053 # pending native charms
3054 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3055 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3056 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3057 task = asyncio.ensure_future(
3058 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3059 destroy_ee, exec_terminate_primitives))
3060 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3061
3062 # wait for pending tasks of terminate primitives
3063 if tasks_dict_info:
3064 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3065 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3066 min(self.timeout_charm_delete, timeout_ns_terminate),
3067 stage, nslcmop_id)
3068 tasks_dict_info.clear()
3069 if error_list:
3070 return # raise LcmException("; ".join(error_list))
3071
3072 # remove All execution environments at once
3073 stage[0] = "Stage 3/3 delete all."
3074
3075 if nsr_deployed.get("VCA"):
3076 stage[1] = "Deleting all execution environments."
3077 self.logger.debug(logging_text + stage[1])
3078 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3079 timeout=self.timeout_charm_delete))
3080 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3081 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3082
3083 # Delete from k8scluster
3084 stage[1] = "Deleting KDUs."
3085 self.logger.debug(logging_text + stage[1])
3086 # print(nsr_deployed)
3087 for kdu in get_iterable(nsr_deployed, "K8s"):
3088 if not kdu or not kdu.get("kdu-instance"):
3089 continue
3090 kdu_instance = kdu.get("kdu-instance")
3091 if kdu.get("k8scluster-type") in self.k8scluster_map:
3092 task_delete_kdu_instance = asyncio.ensure_future(
3093 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3094 cluster_uuid=kdu.get("k8scluster-uuid"),
3095 kdu_instance=kdu_instance))
3096 else:
3097 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3098 format(kdu.get("k8scluster-type")))
3099 continue
3100 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3101
3102 # remove from RO
3103 stage[1] = "Deleting ns from VIM."
3104 if self.ng_ro:
3105 task_delete_ro = asyncio.ensure_future(
3106 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3107 else:
3108 task_delete_ro = asyncio.ensure_future(
3109 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3110 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3111
3112 # rest of staff will be done at finally
3113
3114 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3115 self.logger.error(logging_text + "Exit Exception {}".format(e))
3116 exc = e
3117 except asyncio.CancelledError:
3118 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3119 exc = "Operation was cancelled"
3120 except Exception as e:
3121 exc = traceback.format_exc()
3122 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3123 finally:
3124 if exc:
3125 error_list.append(str(exc))
3126 try:
3127 # wait for pending tasks
3128 if tasks_dict_info:
3129 stage[1] = "Waiting for terminate pending tasks."
3130 self.logger.debug(logging_text + stage[1])
3131 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3132 stage, nslcmop_id)
3133 stage[1] = stage[2] = ""
3134 except asyncio.CancelledError:
3135 error_list.append("Cancelled")
3136 # TODO cancell all tasks
3137 except Exception as exc:
3138 error_list.append(str(exc))
3139 # update status at database
3140 if error_list:
3141 error_detail = "; ".join(error_list)
3142 # self.logger.error(logging_text + error_detail)
3143 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3144 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3145
3146 db_nsr_update["operational-status"] = "failed"
3147 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3148 db_nslcmop_update["detailed-status"] = error_detail
3149 nslcmop_operation_state = "FAILED"
3150 ns_state = "BROKEN"
3151 else:
3152 error_detail = None
3153 error_description_nsr = error_description_nslcmop = None
3154 ns_state = "NOT_INSTANTIATED"
3155 db_nsr_update["operational-status"] = "terminated"
3156 db_nsr_update["detailed-status"] = "Done"
3157 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3158 db_nslcmop_update["detailed-status"] = "Done"
3159 nslcmop_operation_state = "COMPLETED"
3160
3161 if db_nsr:
3162 self._write_ns_status(
3163 nsr_id=nsr_id,
3164 ns_state=ns_state,
3165 current_operation="IDLE",
3166 current_operation_id=None,
3167 error_description=error_description_nsr,
3168 error_detail=error_detail,
3169 other_update=db_nsr_update
3170 )
3171 self._write_op_status(
3172 op_id=nslcmop_id,
3173 stage="",
3174 error_message=error_description_nslcmop,
3175 operation_state=nslcmop_operation_state,
3176 other_update=db_nslcmop_update,
3177 )
3178 if ns_state == "NOT_INSTANTIATED":
3179 try:
3180 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3181 except DbException as e:
3182 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3183 format(nsr_id, e))
3184 if operation_params:
3185 autoremove = operation_params.get("autoremove", False)
3186 if nslcmop_operation_state:
3187 try:
3188 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3189 "operationState": nslcmop_operation_state,
3190 "autoremove": autoremove},
3191 loop=self.loop)
3192 except Exception as e:
3193 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3194
3195 self.logger.debug(logging_text + "Exit")
3196 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3197
3198 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3199 time_start = time()
3200 error_detail_list = []
3201 error_list = []
3202 pending_tasks = list(created_tasks_info.keys())
3203 num_tasks = len(pending_tasks)
3204 num_done = 0
3205 stage[1] = "{}/{}.".format(num_done, num_tasks)
3206 self._write_op_status(nslcmop_id, stage)
3207 while pending_tasks:
3208 new_error = None
3209 _timeout = timeout + time_start - time()
3210 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3211 return_when=asyncio.FIRST_COMPLETED)
3212 num_done += len(done)
3213 if not done: # Timeout
3214 for task in pending_tasks:
3215 new_error = created_tasks_info[task] + ": Timeout"
3216 error_detail_list.append(new_error)
3217 error_list.append(new_error)
3218 break
3219 for task in done:
3220 if task.cancelled():
3221 exc = "Cancelled"
3222 else:
3223 exc = task.exception()
3224 if exc:
3225 if isinstance(exc, asyncio.TimeoutError):
3226 exc = "Timeout"
3227 new_error = created_tasks_info[task] + ": {}".format(exc)
3228 error_list.append(created_tasks_info[task])
3229 error_detail_list.append(new_error)
3230 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3231 K8sException, NgRoException)):
3232 self.logger.error(logging_text + new_error)
3233 else:
3234 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3235 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3236 else:
3237 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3238 stage[1] = "{}/{}.".format(num_done, num_tasks)
3239 if new_error:
3240 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3241 if nsr_id: # update also nsr
3242 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3243 "errorDetail": ". ".join(error_detail_list)})
3244 self._write_op_status(nslcmop_id, stage)
3245 return error_detail_list
3246
3247 @staticmethod
3248 def _map_primitive_params(primitive_desc, params, instantiation_params):
3249 """
3250 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3251 The default-value is used. If it is between < > it look for a value at instantiation_params
3252 :param primitive_desc: portion of VNFD/NSD that describes primitive
3253 :param params: Params provided by user
3254 :param instantiation_params: Instantiation params provided by user
3255 :return: a dictionary with the calculated params
3256 """
3257 calculated_params = {}
3258 for parameter in primitive_desc.get("parameter", ()):
3259 param_name = parameter["name"]
3260 if param_name in params:
3261 calculated_params[param_name] = params[param_name]
3262 elif "default-value" in parameter or "value" in parameter:
3263 if "value" in parameter:
3264 calculated_params[param_name] = parameter["value"]
3265 else:
3266 calculated_params[param_name] = parameter["default-value"]
3267 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3268 and calculated_params[param_name].endswith(">"):
3269 if calculated_params[param_name][1:-1] in instantiation_params:
3270 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3271 else:
3272 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3273 format(calculated_params[param_name], primitive_desc["name"]))
3274 else:
3275 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3276 format(param_name, primitive_desc["name"]))
3277
3278 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3279 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3280 default_flow_style=True, width=256)
3281 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3282 calculated_params[param_name] = calculated_params[param_name][7:]
3283 if parameter.get("data-type") == "INTEGER":
3284 try:
3285 calculated_params[param_name] = int(calculated_params[param_name])
3286 except ValueError: # error converting string to int
3287 raise LcmException(
3288 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3289 elif parameter.get("data-type") == "BOOLEAN":
3290 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3291
3292 # add always ns_config_info if primitive name is config
3293 if primitive_desc["name"] == "config":
3294 if "ns_config_info" in instantiation_params:
3295 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3296 return calculated_params
3297
3298 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3299 ee_descriptor_id=None):
3300 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3301 for vca in deployed_vca:
3302 if not vca:
3303 continue
3304 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3305 continue
3306 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3307 continue
3308 if kdu_name and kdu_name != vca["kdu_name"]:
3309 continue
3310 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3311 continue
3312 break
3313 else:
3314 # vca_deployed not found
3315 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3316 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3317 ee_descriptor_id))
3318 # get ee_id
3319 ee_id = vca.get("ee_id")
3320 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3321 if not ee_id:
3322 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3323 "execution environment"
3324 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3325 return ee_id, vca_type
3326
3327 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3328 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3329 try:
3330 if primitive == "config":
3331 primitive_params = {"params": primitive_params}
3332
3333 vca_type = vca_type or "lxc_proxy_charm"
3334
3335 while retries >= 0:
3336 try:
3337 output = await asyncio.wait_for(
3338 self.vca_map[vca_type].exec_primitive(
3339 ee_id=ee_id,
3340 primitive_name=primitive,
3341 params_dict=primitive_params,
3342 progress_timeout=self.timeout_progress_primitive,
3343 total_timeout=self.timeout_primitive,
3344 db_dict=db_dict),
3345 timeout=timeout or self.timeout_primitive)
3346 # execution was OK
3347 break
3348 except asyncio.CancelledError:
3349 raise
3350 except Exception as e: # asyncio.TimeoutError
3351 if isinstance(e, asyncio.TimeoutError):
3352 e = "Timeout"
3353 retries -= 1
3354 if retries >= 0:
3355 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3356 # wait and retry
3357 await asyncio.sleep(retries_interval, loop=self.loop)
3358 else:
3359 return 'FAILED', str(e)
3360
3361 return 'COMPLETED', output
3362
3363 except (LcmException, asyncio.CancelledError):
3364 raise
3365 except Exception as e:
3366 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3367
3368 async def action(self, nsr_id, nslcmop_id):
3369 # Try to lock HA task here
3370 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3371 if not task_is_locked_by_me:
3372 return
3373
3374 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3375 self.logger.debug(logging_text + "Enter")
3376 # get all needed from database
3377 db_nsr = None
3378 db_nslcmop = None
3379 db_nsr_update = {}
3380 db_nslcmop_update = {}
3381 nslcmop_operation_state = None
3382 error_description_nslcmop = None
3383 exc = None
3384 try:
3385 # wait for any previous tasks in process
3386 step = "Waiting for previous operations to terminate"
3387 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3388
3389 self._write_ns_status(
3390 nsr_id=nsr_id,
3391 ns_state=None,
3392 current_operation="RUNNING ACTION",
3393 current_operation_id=nslcmop_id
3394 )
3395
3396 step = "Getting information from database"
3397 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3398 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3399
3400 nsr_deployed = db_nsr["_admin"].get("deployed")
3401 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3402 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3403 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3404 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3405 primitive = db_nslcmop["operationParams"]["primitive"]
3406 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3407 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3408
3409 if vnf_index:
3410 step = "Getting vnfr from database"
3411 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3412 step = "Getting vnfd from database"
3413 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3414 else:
3415 step = "Getting nsd from database"
3416 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3417
3418 # for backward compatibility
3419 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3420 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3421 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3422 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3423
3424 # look for primitive
3425 config_primitive_desc = descriptor_configuration = None
3426 if vdu_id:
3427 descriptor_configuration = get_vdu_configuration(db_vnfd, vdu_id)
3428 elif kdu_name:
3429 descriptor_configuration = get_kdu_configuration(db_vnfd, kdu_name)
3430 elif vnf_index:
3431 descriptor_configuration = get_vnf_configuration(db_vnfd)
3432 else:
3433 descriptor_configuration = db_nsd.get("ns-configuration")
3434
3435 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3436 for config_primitive in descriptor_configuration["config-primitive"]:
3437 if config_primitive["name"] == primitive:
3438 config_primitive_desc = config_primitive
3439 break
3440
3441 if not config_primitive_desc:
3442 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3443 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3444 format(primitive))
3445 primitive_name = primitive
3446 ee_descriptor_id = None
3447 else:
3448 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3449 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3450
3451 if vnf_index:
3452 if vdu_id:
3453 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3454 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3455 elif kdu_name:
3456 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3457 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3458 else:
3459 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3460 else:
3461 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3462 if kdu_name and get_kdu_configuration(db_vnfd, kdu_name):
3463 kdu_configuration = get_kdu_configuration(db_vnfd, kdu_name)
3464 actions = set()
3465 for primitive in kdu_configuration.get("initial-config-primitive", []):
3466 actions.add(primitive["name"])
3467 for primitive in kdu_configuration.get("config-primitive", []):
3468 actions.add(primitive["name"])
3469 kdu_action = True if primitive_name in actions else False
3470
3471 # TODO check if ns is in a proper status
3472 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3473 # kdur and desc_params already set from before
3474 if primitive_params:
3475 desc_params.update(primitive_params)
3476 # TODO Check if we will need something at vnf level
3477 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3478 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3479 break
3480 else:
3481 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3482
3483 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3484 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3485 raise LcmException(msg)
3486
3487 db_dict = {"collection": "nsrs",
3488 "filter": {"_id": nsr_id},
3489 "path": "_admin.deployed.K8s.{}".format(index)}
3490 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3491 step = "Executing kdu {}".format(primitive_name)
3492 if primitive_name == "upgrade":
3493 if desc_params.get("kdu_model"):
3494 kdu_model = desc_params.get("kdu_model")
3495 del desc_params["kdu_model"]
3496 else:
3497 kdu_model = kdu.get("kdu-model")
3498 parts = kdu_model.split(sep=":")
3499 if len(parts) == 2:
3500 kdu_model = parts[0]
3501
3502 detailed_status = await asyncio.wait_for(
3503 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3504 cluster_uuid=kdu.get("k8scluster-uuid"),
3505 kdu_instance=kdu.get("kdu-instance"),
3506 atomic=True, kdu_model=kdu_model,
3507 params=desc_params, db_dict=db_dict,
3508 timeout=timeout_ns_action),
3509 timeout=timeout_ns_action + 10)
3510 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3511 elif primitive_name == "rollback":
3512 detailed_status = await asyncio.wait_for(
3513 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3514 cluster_uuid=kdu.get("k8scluster-uuid"),
3515 kdu_instance=kdu.get("kdu-instance"),
3516 db_dict=db_dict),
3517 timeout=timeout_ns_action)
3518 elif primitive_name == "status":
3519 detailed_status = await asyncio.wait_for(
3520 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3521 cluster_uuid=kdu.get("k8scluster-uuid"),
3522 kdu_instance=kdu.get("kdu-instance")),
3523 timeout=timeout_ns_action)
3524 else:
3525 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3526 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3527
3528 detailed_status = await asyncio.wait_for(
3529 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3530 cluster_uuid=kdu.get("k8scluster-uuid"),
3531 kdu_instance=kdu_instance,
3532 primitive_name=primitive_name,
3533 params=params, db_dict=db_dict,
3534 timeout=timeout_ns_action),
3535 timeout=timeout_ns_action)
3536
3537 if detailed_status:
3538 nslcmop_operation_state = 'COMPLETED'
3539 else:
3540 detailed_status = ''
3541 nslcmop_operation_state = 'FAILED'
3542 else:
3543 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3544 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3545 ee_descriptor_id=ee_descriptor_id)
3546 db_nslcmop_notif = {"collection": "nslcmops",
3547 "filter": {"_id": nslcmop_id},
3548 "path": "admin.VCA"}
3549 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3550 ee_id,
3551 primitive=primitive_name,
3552 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3553 timeout=timeout_ns_action,
3554 vca_type=vca_type,
3555 db_dict=db_nslcmop_notif)
3556
3557 db_nslcmop_update["detailed-status"] = detailed_status
3558 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3559 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3560 detailed_status))
3561 return # database update is called inside finally
3562
3563 except (DbException, LcmException, N2VCException, K8sException) as e:
3564 self.logger.error(logging_text + "Exit Exception {}".format(e))
3565 exc = e
3566 except asyncio.CancelledError:
3567 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3568 exc = "Operation was cancelled"
3569 except asyncio.TimeoutError:
3570 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3571 exc = "Timeout"
3572 except Exception as e:
3573 exc = traceback.format_exc()
3574 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3575 finally:
3576 if exc:
3577 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3578 "FAILED {}: {}".format(step, exc)
3579 nslcmop_operation_state = "FAILED"
3580 if db_nsr:
3581 self._write_ns_status(
3582 nsr_id=nsr_id,
3583 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3584 current_operation="IDLE",
3585 current_operation_id=None,
3586 # error_description=error_description_nsr,
3587 # error_detail=error_detail,
3588 other_update=db_nsr_update
3589 )
3590
3591 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3592 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3593
3594 if nslcmop_operation_state:
3595 try:
3596 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3597 "operationState": nslcmop_operation_state},
3598 loop=self.loop)
3599 except Exception as e:
3600 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3601 self.logger.debug(logging_text + "Exit")
3602 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3603 return nslcmop_operation_state, detailed_status
3604
3605 async def scale(self, nsr_id, nslcmop_id):
3606 # Try to lock HA task here
3607 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3608 if not task_is_locked_by_me:
3609 return
3610
3611 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3612 stage = ['', '', '']
3613 # ^ stage, step, VIM progress
3614 self.logger.debug(logging_text + "Enter")
3615 # get all needed from database
3616 db_nsr = None
3617 db_nslcmop_update = {}
3618 db_nsr_update = {}
3619 exc = None
3620 # in case of error, indicates what part of scale was failed to put nsr at error status
3621 scale_process = None
3622 old_operational_status = ""
3623 old_config_status = ""
3624 try:
3625 # wait for any previous tasks in process
3626 step = "Waiting for previous operations to terminate"
3627 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3628 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3629 current_operation="SCALING", current_operation_id=nslcmop_id)
3630
3631 step = "Getting nslcmop from database"
3632 self.logger.debug(step + " after having waited for previous tasks to be completed")
3633 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3634
3635 step = "Getting nsr from database"
3636 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3637 old_operational_status = db_nsr["operational-status"]
3638 old_config_status = db_nsr["config-status"]
3639
3640 step = "Parsing scaling parameters"
3641 db_nsr_update["operational-status"] = "scaling"
3642 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3643 nsr_deployed = db_nsr["_admin"].get("deployed")
3644
3645 #######
3646 nsr_deployed = db_nsr["_admin"].get("deployed")
3647 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3648 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3649 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3650 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3651 #######
3652
3653 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3654 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3655 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3656 # for backward compatibility
3657 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3658 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3659 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3660 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3661
3662 step = "Getting vnfr from database"
3663 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3664
3665 step = "Getting vnfd from database"
3666 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3667
3668 step = "Getting scaling-group-descriptor"
3669 scaling_descriptor = find_in_list(
3670 get_scaling_aspect(
3671 db_vnfd
3672 ),
3673 lambda scale_desc: scale_desc["name"] == scaling_group
3674 )
3675 if not scaling_descriptor:
3676 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3677 "at vnfd:scaling-group-descriptor".format(scaling_group))
3678
3679 step = "Sending scale order to VIM"
3680 # TODO check if ns is in a proper status
3681 nb_scale_op = 0
3682 if not db_nsr["_admin"].get("scaling-group"):
3683 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3684 admin_scale_index = 0
3685 else:
3686 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3687 if admin_scale_info["name"] == scaling_group:
3688 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3689 break
3690 else: # not found, set index one plus last element and add new entry with the name
3691 admin_scale_index += 1
3692 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3693 RO_scaling_info = []
3694 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3695 if scaling_type == "SCALE_OUT":
3696 if "aspect-delta-details" not in scaling_descriptor:
3697 raise LcmException(
3698 "Aspect delta details not fount in scaling descriptor {}".format(
3699 scaling_descriptor["name"]
3700 )
3701 )
3702 # count if max-instance-count is reached
3703 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3704
3705 vdu_scaling_info["scaling_direction"] = "OUT"
3706 vdu_scaling_info["vdu-create"] = {}
3707 for delta in deltas:
3708 for vdu_delta in delta["vdu-delta"]:
3709 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3710 vdu_index = get_vdu_index(db_vnfr, vdu_delta["id"])
3711 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3712 if cloud_init_text:
3713 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3714 cloud_init_list = []
3715
3716 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3717 max_instance_count = 10
3718 if vdu_profile and "max-number-of-instances" in vdu_profile:
3719 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3720
3721 deafult_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3722
3723 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3724
3725 if nb_scale_op + deafult_instance_num > max_instance_count:
3726 raise LcmException(
3727 "reached the limit of {} (max-instance-count) "
3728 "scaling-out operations for the "
3729 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3730 )
3731 for x in range(vdu_delta.get("number-of-instances", 1)):
3732 if cloud_init_text:
3733 # TODO Information of its own ip is not available because db_vnfr is not updated.
3734 additional_params["OSM"] = get_osm_params(
3735 db_vnfr,
3736 vdu_delta["id"],
3737 vdu_index + x
3738 )
3739 cloud_init_list.append(
3740 self._parse_cloud_init(
3741 cloud_init_text,
3742 additional_params,
3743 db_vnfd["id"],
3744 vdud["id"]
3745 )
3746 )
3747 RO_scaling_info.append(
3748 {
3749 "osm_vdu_id": vdu_delta["id"],
3750 "member-vnf-index": vnf_index,
3751 "type": "create",
3752 "count": vdu_delta.get("number-of-instances", 1)
3753 }
3754 )
3755 if cloud_init_list:
3756 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3757 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3758
3759 elif scaling_type == "SCALE_IN":
3760 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3761 min_instance_count = int(scaling_descriptor["min-instance-count"])
3762
3763 vdu_scaling_info["scaling_direction"] = "IN"
3764 vdu_scaling_info["vdu-delete"] = {}
3765 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3766 for delta in deltas:
3767 for vdu_delta in delta["vdu-delta"]:
3768 min_instance_count = 0
3769 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3770 if vdu_profile and "min-number-of-instances" in vdu_profile:
3771 min_instance_count = vdu_profile["min-number-of-instances"]
3772
3773 deafult_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3774
3775 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3776 if nb_scale_op + deafult_instance_num < min_instance_count:
3777 raise LcmException(
3778 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3779 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3780 )
3781 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3782 "type": "delete", "count": vdu_delta.get("number-of-instances", 1)})
3783 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3784
3785 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3786 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3787 if vdu_scaling_info["scaling_direction"] == "IN":
3788 for vdur in reversed(db_vnfr["vdur"]):
3789 if vdu_delete.get(vdur["vdu-id-ref"]):
3790 vdu_delete[vdur["vdu-id-ref"]] -= 1
3791 vdu_scaling_info["vdu"].append({
3792 "name": vdur.get("name") or vdur.get("vdu-name"),
3793 "vdu_id": vdur["vdu-id-ref"],
3794 "interface": []
3795 })
3796 for interface in vdur["interfaces"]:
3797 vdu_scaling_info["vdu"][-1]["interface"].append({
3798 "name": interface["name"],
3799 "ip_address": interface["ip-address"],
3800 "mac_address": interface.get("mac-address"),
3801 })
3802 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3803
3804 # PRE-SCALE BEGIN
3805 step = "Executing pre-scale vnf-config-primitive"
3806 if scaling_descriptor.get("scaling-config-action"):
3807 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3808 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3809 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3810 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3811 step = db_nslcmop_update["detailed-status"] = \
3812 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3813
3814 # look for primitive
3815 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3816 if config_primitive["name"] == vnf_config_primitive:
3817 break
3818 else:
3819 raise LcmException(
3820 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3821 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3822 "primitive".format(scaling_group, vnf_config_primitive))
3823
3824 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3825 if db_vnfr.get("additionalParamsForVnf"):
3826 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3827
3828 scale_process = "VCA"
3829 db_nsr_update["config-status"] = "configuring pre-scaling"
3830 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3831
3832 # Pre-scale retry check: Check if this sub-operation has been executed before
3833 op_index = self._check_or_add_scale_suboperation(
3834 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3835 if op_index == self.SUBOPERATION_STATUS_SKIP:
3836 # Skip sub-operation
3837 result = 'COMPLETED'
3838 result_detail = 'Done'
3839 self.logger.debug(logging_text +
3840 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3841 vnf_config_primitive, result, result_detail))
3842 else:
3843 if op_index == self.SUBOPERATION_STATUS_NEW:
3844 # New sub-operation: Get index of this sub-operation
3845 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3846 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3847 format(vnf_config_primitive))
3848 else:
3849 # retry: Get registered params for this existing sub-operation
3850 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3851 vnf_index = op.get('member_vnf_index')
3852 vnf_config_primitive = op.get('primitive')
3853 primitive_params = op.get('primitive_params')
3854 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3855 format(vnf_config_primitive))
3856 # Execute the primitive, either with new (first-time) or registered (reintent) args
3857 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3858 primitive_name = config_primitive.get("execution-environment-primitive",
3859 vnf_config_primitive)
3860 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3861 member_vnf_index=vnf_index,
3862 vdu_id=None,
3863 vdu_count_index=None,
3864 ee_descriptor_id=ee_descriptor_id)
3865 result, result_detail = await self._ns_execute_primitive(
3866 ee_id, primitive_name, primitive_params, vca_type)
3867 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3868 vnf_config_primitive, result, result_detail))
3869 # Update operationState = COMPLETED | FAILED
3870 self._update_suboperation_status(
3871 db_nslcmop, op_index, result, result_detail)
3872
3873 if result == "FAILED":
3874 raise LcmException(result_detail)
3875 db_nsr_update["config-status"] = old_config_status
3876 scale_process = None
3877 # PRE-SCALE END
3878
3879 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3880 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3881
3882 # SCALE RO - BEGIN
3883 if RO_scaling_info:
3884 scale_process = "RO"
3885 if self.ro_config.get("ng"):
3886 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
3887 vdu_scaling_info.pop("vdu-create", None)
3888 vdu_scaling_info.pop("vdu-delete", None)
3889
3890 scale_process = None
3891 if db_nsr_update:
3892 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3893
3894 # POST-SCALE BEGIN
3895 # execute primitive service POST-SCALING
3896 step = "Executing post-scale vnf-config-primitive"
3897 if scaling_descriptor.get("scaling-config-action"):
3898 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3899 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3900 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3901 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3902 step = db_nslcmop_update["detailed-status"] = \
3903 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3904
3905 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3906 if db_vnfr.get("additionalParamsForVnf"):
3907 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3908
3909 # look for primitive
3910 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3911 if config_primitive["name"] == vnf_config_primitive:
3912 break
3913 else:
3914 raise LcmException(
3915 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3916 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3917 "config-primitive".format(scaling_group, vnf_config_primitive))
3918 scale_process = "VCA"
3919 db_nsr_update["config-status"] = "configuring post-scaling"
3920 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3921
3922 # Post-scale retry check: Check if this sub-operation has been executed before
3923 op_index = self._check_or_add_scale_suboperation(
3924 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3925 if op_index == self.SUBOPERATION_STATUS_SKIP:
3926 # Skip sub-operation
3927 result = 'COMPLETED'
3928 result_detail = 'Done'
3929 self.logger.debug(logging_text +
3930 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3931 format(vnf_config_primitive, result, result_detail))
3932 else:
3933 if op_index == self.SUBOPERATION_STATUS_NEW:
3934 # New sub-operation: Get index of this sub-operation
3935 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3936 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3937 format(vnf_config_primitive))
3938 else:
3939 # retry: Get registered params for this existing sub-operation
3940 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3941 vnf_index = op.get('member_vnf_index')
3942 vnf_config_primitive = op.get('primitive')
3943 primitive_params = op.get('primitive_params')
3944 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3945 format(vnf_config_primitive))
3946 # Execute the primitive, either with new (first-time) or registered (reintent) args
3947 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3948 primitive_name = config_primitive.get("execution-environment-primitive",
3949 vnf_config_primitive)
3950 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3951 member_vnf_index=vnf_index,
3952 vdu_id=None,
3953 vdu_count_index=None,
3954 ee_descriptor_id=ee_descriptor_id)
3955 result, result_detail = await self._ns_execute_primitive(
3956 ee_id, primitive_name, primitive_params, vca_type)
3957 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3958 vnf_config_primitive, result, result_detail))
3959 # Update operationState = COMPLETED | FAILED
3960 self._update_suboperation_status(
3961 db_nslcmop, op_index, result, result_detail)
3962
3963 if result == "FAILED":
3964 raise LcmException(result_detail)
3965 db_nsr_update["config-status"] = old_config_status
3966 scale_process = None
3967 # POST-SCALE END
3968
3969 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3970 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3971 else old_operational_status
3972 db_nsr_update["config-status"] = old_config_status
3973 return
3974 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
3975 self.logger.error(logging_text + "Exit Exception {}".format(e))
3976 exc = e
3977 except asyncio.CancelledError:
3978 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3979 exc = "Operation was cancelled"
3980 except Exception as e:
3981 exc = traceback.format_exc()
3982 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3983 finally:
3984 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
3985 if exc:
3986 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
3987 nslcmop_operation_state = "FAILED"
3988 if db_nsr:
3989 db_nsr_update["operational-status"] = old_operational_status
3990 db_nsr_update["config-status"] = old_config_status
3991 db_nsr_update["detailed-status"] = ""
3992 if scale_process:
3993 if "VCA" in scale_process:
3994 db_nsr_update["config-status"] = "failed"
3995 if "RO" in scale_process:
3996 db_nsr_update["operational-status"] = "failed"
3997 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3998 exc)
3999 else:
4000 error_description_nslcmop = None
4001 nslcmop_operation_state = "COMPLETED"
4002 db_nslcmop_update["detailed-status"] = "Done"
4003
4004 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4005 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4006 if db_nsr:
4007 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4008 current_operation_id=None, other_update=db_nsr_update)
4009
4010 if nslcmop_operation_state:
4011 try:
4012 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4013 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4014 except Exception as e:
4015 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4016 self.logger.debug(logging_text + "Exit")
4017 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4018
4019 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4020 nsr_id = db_nslcmop["nsInstanceId"]
4021 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4022 db_vnfrs = {}
4023
4024 # read from db: vnfd's for every vnf
4025 db_vnfds = []
4026
4027 # for each vnf in ns, read vnfd
4028 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4029 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4030 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4031 # if we haven't this vnfd, read it from db
4032 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4033 # read from db
4034 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4035 db_vnfds.append(vnfd)
4036 n2vc_key = self.n2vc.get_public_key()
4037 n2vc_key_list = [n2vc_key]
4038 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4039 mark_delete=True)
4040 # db_vnfr has been updated, update db_vnfrs to use it
4041 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4042 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4043 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4044 timeout_ns_deploy=self.timeout_ns_deploy)
4045 if vdu_scaling_info.get("vdu-delete"):
4046 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4047
4048 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4049 if not self.prometheus:
4050 return
4051 # look if exist a file called 'prometheus*.j2' and
4052 artifact_content = self.fs.dir_ls(artifact_path)
4053 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4054 if not job_file:
4055 return
4056 with self.fs.file_open((artifact_path, job_file), "r") as f:
4057 job_data = f.read()
4058
4059 # TODO get_service
4060 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4061 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4062 host_port = "80"
4063 vnfr_id = vnfr_id.replace("-", "")
4064 variables = {
4065 "JOB_NAME": vnfr_id,
4066 "TARGET_IP": target_ip,
4067 "EXPORTER_POD_IP": host_name,
4068 "EXPORTER_POD_PORT": host_port,
4069 }
4070 job_list = self.prometheus.parse_job(job_data, variables)
4071 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4072 for job in job_list:
4073 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4074 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4075 job["nsr_id"] = nsr_id
4076 job_dict = {jl["job_name"]: jl for jl in job_list}
4077 if await self.prometheus.update(job_dict):
4078 return list(job_dict.keys())
4079
4080 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4081 """
4082 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4083
4084 :param: vim_account_id: VIM Account ID
4085
4086 :return: (cloud_name, cloud_credential)
4087 """
4088 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4089 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4090
4091 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4092 """
4093 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4094
4095 :param: vim_account_id: VIM Account ID
4096
4097 :return: (cloud_name, cloud_credential)
4098 """
4099 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4100 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")