fix(configurations): LCM adapted for new configuration container in IM
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=None,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211
212 try:
213
214 nsr_id = filter.get('_id')
215
216 # read ns record from database
217 nsr = self.db.get_one(table='nsrs', q_filter=filter)
218 current_ns_status = nsr.get('nsState')
219
220 # get vca status for NS
221 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
222
223 # vcaStatus
224 db_dict = dict()
225 db_dict['vcaStatus'] = status_dict
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 @staticmethod
293 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
294 try:
295 env = Environment(undefined=StrictUndefined)
296 template = env.from_string(cloud_init_text)
297 return template.render(additional_params or {})
298 except UndefinedError as e:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
302 except (TemplateError, TemplateNotFound) as e:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id, vdu_id, e))
305
306 def _get_vdu_cloud_init_content(self, vdu, vnfd):
307 cloud_init_content = cloud_init_file = None
308 try:
309 if vdu.get("cloud-init-file"):
310 base_folder = vnfd["_admin"]["storage"]
311 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
312 vdu["cloud-init-file"])
313 with self.fs.file_open(cloud_init_file, "r") as ci_file:
314 cloud_init_content = ci_file.read()
315 elif vdu.get("cloud-init"):
316 cloud_init_content = vdu["cloud-init"]
317
318 return cloud_init_content
319 except FsException as e:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd["id"], vdu["id"], cloud_init_file, e))
322
323 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
324 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
325 additional_params = vdur.get("additionalParams")
326 return parse_yaml_strings(additional_params)
327
328 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
329 """
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
336 """
337 vnfd_RO = deepcopy(vnfd)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO.pop("_id", None)
340 vnfd_RO.pop("_admin", None)
341 vnfd_RO.pop("monitoring-param", None)
342 vnfd_RO.pop("scaling-group-descriptor", None)
343 vnfd_RO.pop("kdu", None)
344 vnfd_RO.pop("k8s-cluster", None)
345 if new_id:
346 vnfd_RO["id"] = new_id
347
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu in get_iterable(vnfd_RO, "vdu"):
350 vdu.pop("cloud-init-file", None)
351 vdu.pop("cloud-init", None)
352 return vnfd_RO
353
354 @staticmethod
355 def ip_profile_2_RO(ip_profile):
356 RO_ip_profile = deepcopy(ip_profile)
357 if "dns-server" in RO_ip_profile:
358 if isinstance(RO_ip_profile["dns-server"], list):
359 RO_ip_profile["dns-address"] = []
360 for ds in RO_ip_profile.pop("dns-server"):
361 RO_ip_profile["dns-address"].append(ds['address'])
362 else:
363 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
364 if RO_ip_profile.get("ip-version") == "ipv4":
365 RO_ip_profile["ip-version"] = "IPv4"
366 if RO_ip_profile.get("ip-version") == "ipv6":
367 RO_ip_profile["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile:
369 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
370 return RO_ip_profile
371
372 def _get_ro_vim_id_for_vim_account(self, vim_account):
373 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
374 if db_vim["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account, db_vim["_admin"]["operationalState"]))
377 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
378 return RO_vim_id
379
380 def get_ro_wim_id_for_wim_account(self, wim_account):
381 if isinstance(wim_account, str):
382 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
383 if db_wim["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account, db_wim["_admin"]["operationalState"]))
386 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
387 return RO_wim_id
388 else:
389 return wim_account
390
391 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
392
393 db_vdu_push_list = []
394 db_update = {"_admin.modified": time()}
395 if vdu_create:
396 for vdu_id, vdu_count in vdu_create.items():
397 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
398 if not vdur:
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
400 format(vdu_id))
401
402 for count in range(vdu_count):
403 vdur_copy = deepcopy(vdur)
404 vdur_copy["status"] = "BUILD"
405 vdur_copy["status-detailed"] = None
406 vdur_copy["ip-address"]: None
407 vdur_copy["_id"] = str(uuid4())
408 vdur_copy["count-index"] += count + 1
409 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
410 vdur_copy.pop("vim_info", None)
411 for iface in vdur_copy["interfaces"]:
412 if iface.get("fixed-ip"):
413 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
414 else:
415 iface.pop("ip-address", None)
416 if iface.get("fixed-mac"):
417 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
418 else:
419 iface.pop("mac-address", None)
420 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list.append(vdur_copy)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
423 if vdu_delete:
424 for vdu_id, vdu_count in vdu_delete.items():
425 if mark_delete:
426 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
427 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
428 else:
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
431 for vdu in vdus_to_delete[:vdu_count]:
432 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
433 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
434 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
435 # modify passed dictionary db_vnfr
436 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
437 db_vnfr["vdur"] = db_vnfr_["vdur"]
438
439 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
440 """
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
446 """
447
448 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
449 for net_RO in get_iterable(nsr_desc_RO, "nets"):
450 if vld["id"] != net_RO.get("ns_net_osm_id"):
451 continue
452 vld["vim-id"] = net_RO.get("vim_net_id")
453 vld["name"] = net_RO.get("vim_name")
454 vld["status"] = net_RO.get("status")
455 vld["status-detailed"] = net_RO.get("error_msg")
456 ns_update_nsr["vld.{}".format(vld_index)] = vld
457 break
458 else:
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
460
461 def set_vnfr_at_error(self, db_vnfrs, error_text):
462 try:
463 for db_vnfr in db_vnfrs.values():
464 vnfr_update = {"status": "ERROR"}
465 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
466 if "status" not in vdur:
467 vdur["status"] = "ERROR"
468 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
469 if error_text:
470 vdur["status-detailed"] = str(error_text)
471 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
472 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
473 except DbException as e:
474 self.logger.error("Cannot update vnf. {}".format(e))
475
476 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
477 """
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
482 """
483 for vnf_index, db_vnfr in db_vnfrs.items():
484 for vnf_RO in nsr_desc_RO["vnfs"]:
485 if vnf_RO["member_vnf_index"] != vnf_index:
486 continue
487 vnfr_update = {}
488 if vnf_RO.get("ip_address"):
489 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
490 elif not db_vnfr.get("ip-address"):
491 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
493
494 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
495 vdur_RO_count_index = 0
496 if vdur.get("pdu-type"):
497 continue
498 for vdur_RO in get_iterable(vnf_RO, "vms"):
499 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
500 continue
501 if vdur["count-index"] != vdur_RO_count_index:
502 vdur_RO_count_index += 1
503 continue
504 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
505 if vdur_RO.get("ip_address"):
506 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
507 else:
508 vdur["ip-address"] = None
509 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
510 vdur["name"] = vdur_RO.get("vim_name")
511 vdur["status"] = vdur_RO.get("status")
512 vdur["status-detailed"] = vdur_RO.get("error_msg")
513 for ifacer in get_iterable(vdur, "interfaces"):
514 for interface_RO in get_iterable(vdur_RO, "interfaces"):
515 if ifacer["name"] == interface_RO.get("internal_name"):
516 ifacer["ip-address"] = interface_RO.get("ip_address")
517 ifacer["mac-address"] = interface_RO.get("mac_address")
518 break
519 else:
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
521 "from VIM info"
522 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
523 vnfr_update["vdur.{}".format(vdu_index)] = vdur
524 break
525 else:
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
528
529 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
530 for net_RO in get_iterable(nsr_desc_RO, "nets"):
531 if vld["id"] != net_RO.get("vnf_net_osm_id"):
532 continue
533 vld["vim-id"] = net_RO.get("vim_net_id")
534 vld["name"] = net_RO.get("vim_name")
535 vld["status"] = net_RO.get("status")
536 vld["status-detailed"] = net_RO.get("error_msg")
537 vnfr_update["vld.{}".format(vld_index)] = vld
538 break
539 else:
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index, vld["id"]))
542
543 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
544 break
545
546 else:
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
548
549 def _get_ns_config_info(self, nsr_id):
550 """
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
556 """
557 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
558 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
559 mapping = {}
560 ns_config_info = {"osm-config-mapping": mapping}
561 for vca in vca_deployed_list:
562 if not vca["member-vnf-index"]:
563 continue
564 if not vca["vdu_id"]:
565 mapping[vca["member-vnf-index"]] = vca["application"]
566 else:
567 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
568 vca["application"]
569 return ns_config_info
570
571 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
572 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
573
574 db_vims = {}
575
576 def get_vim_account(vim_account_id):
577 nonlocal db_vims
578 if vim_account_id in db_vims:
579 return db_vims[vim_account_id]
580 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
581 db_vims[vim_account_id] = db_vim
582 return db_vim
583
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
586 if vld_params.get("ip-profile"):
587 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
588 if vld_params.get("provider-network"):
589 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
590 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
591 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
592 if vld_params.get("wimAccountId"):
593 target_wim = "wim:{}".format(vld_params["wimAccountId"])
594 target_vld["vim_info"][target_wim] = {}
595 for param in ("vim-network-name", "vim-network-id"):
596 if vld_params.get(param):
597 if isinstance(vld_params[param], dict):
598 for vim, vim_net in vld_params[param]:
599 other_target_vim = "vim:" + vim
600 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
601 else: # isinstance str
602 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
603 if vld_params.get("common_id"):
604 target_vld["common_id"] = vld_params.get("common_id")
605
606 nslcmop_id = db_nslcmop["_id"]
607 target = {
608 "name": db_nsr["name"],
609 "ns": {"vld": []},
610 "vnf": [],
611 "image": deepcopy(db_nsr["image"]),
612 "flavor": deepcopy(db_nsr["flavor"]),
613 "action_id": nslcmop_id,
614 "cloud_init_content": {},
615 }
616 for image in target["image"]:
617 image["vim_info"] = {}
618 for flavor in target["flavor"]:
619 flavor["vim_info"] = {}
620
621 if db_nslcmop.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params = db_nslcmop_instantiate.get("operationParams")
626 else:
627 ns_params = db_nslcmop.get("operationParams")
628 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
629 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
630
631 cp2target = {}
632 for vld_index, vld in enumerate(db_nsr.get("vld")):
633 target_vim = "vim:{}".format(ns_params["vimAccountId"])
634 target_vld = {
635 "id": vld["id"],
636 "name": vld["name"],
637 "mgmt-network": vld.get("mgmt-network", False),
638 "type": vld.get("type"),
639 "vim_info": {
640 target_vim: {
641 "vim_network_name": vld.get("vim-network-name"),
642 "vim_account_id": ns_params["vimAccountId"]
643 }
644 }
645 }
646 # check if this network needs SDN assist
647 if vld.get("pci-interfaces"):
648 db_vim = VimAccountDB.get_vim_account_with_id(target_vld["vim_info"][0]["vim_account_id"])
649 sdnc_id = db_vim["config"].get("sdn-controller")
650 if sdnc_id:
651 target_vld["vim_info"].append({"sdnc_id": sdnc_id})
652
653 nsd_vnf_profiles = get_vnf_profiles(nsd)
654 for nsd_vnf_profile in nsd_vnf_profiles:
655 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
656 if cp["virtual-link-profile-id"] == vld["id"]:
657 cp2target["member_vnf:{}.{}".format(
658 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
659 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
660 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
661
662 # check at nsd descriptor, if there is an ip-profile
663 vld_params = {}
664 virtual_link_profiles = get_virtual_link_profiles(nsd)
665
666 for vlp in virtual_link_profiles:
667 ip_profile = find_in_list(nsd["ip-profiles"],
668 lambda profile: profile["name"] == vlp["ip-profile-ref"])
669 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
670 # update vld_params with instantiation params
671 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
672 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
673 if vld_instantiation_params:
674 vld_params.update(vld_instantiation_params)
675 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
676 target["ns"]["vld"].append(target_vld)
677
678 for vnfr in db_vnfrs.values():
679 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
680 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
681 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
682 target_vnf = deepcopy(vnfr)
683 target_vim = "vim:{}".format(vnfr["vim-account-id"])
684 for vld in target_vnf.get("vld", ()):
685 # check if connected to a ns.vld, to fill target'
686 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
687 lambda cpd: cpd.get("id") == vld["id"])
688 if vnf_cp:
689 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
690 if cp2target.get(ns_cp):
691 vld["target"] = cp2target[ns_cp]
692
693 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
694 # check if this network needs SDN assist
695 target_sdn = None
696 if vld.get("pci-interfaces"):
697 db_vim = get_vim_account(vnfr["vim-account-id"])
698 sdnc_id = db_vim["config"].get("sdn-controller")
699 if sdnc_id:
700 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
701 target_sdn = "sdn:{}".format(sdnc_id)
702 vld["vim_info"][target_sdn] = {
703 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
704
705 # check at vnfd descriptor, if there is an ip-profile
706 vld_params = {}
707 vnfd_vlp = find_in_list(
708 get_virtual_link_profiles(vnfd),
709 lambda a_link_profile: a_link_profile["id"] == vld["id"]
710 )
711 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
712 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
713 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
714 ip_profile_dest_data = {}
715 if "ip-version" in ip_profile_source_data:
716 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
717 if "cidr" in ip_profile_source_data:
718 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
719 if "gateway-ip" in ip_profile_source_data:
720 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
721 if "dhcp-enabled" in ip_profile_source_data:
722 ip_profile_dest_data["dhcp-params"] = {
723 "enabled": ip_profile_source_data["dhcp-enabled"]
724 }
725
726 vld_params["ip-profile"] = ip_profile_dest_data
727 # update vld_params with instantiation params
728 if vnf_params:
729 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
730 lambda i_vld: i_vld["name"] == vld["id"])
731 if vld_instantiation_params:
732 vld_params.update(vld_instantiation_params)
733 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
734
735 vdur_list = []
736 for vdur in target_vnf.get("vdur", ()):
737 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
738 continue # This vdu must not be created
739 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
740
741 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
742
743 if ssh_keys_all:
744 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
745 vnf_configuration = get_configuration(vnfd, vnfd["id"])
746 if vdu_configuration and vdu_configuration.get("config-access") and \
747 vdu_configuration.get("config-access").get("ssh-access"):
748 vdur["ssh-keys"] = ssh_keys_all
749 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
750 elif vnf_configuration and vnf_configuration.get("config-access") and \
751 vnf_configuration.get("config-access").get("ssh-access") and \
752 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
753 vdur["ssh-keys"] = ssh_keys_all
754 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
755 elif ssh_keys_instantiation and \
756 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
757 vdur["ssh-keys"] = ssh_keys_instantiation
758
759 self.logger.debug("NS > vdur > {}".format(vdur))
760
761 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
762 # cloud-init
763 if vdud.get("cloud-init-file"):
764 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
765 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
766 if vdur["cloud-init"] not in target["cloud_init_content"]:
767 base_folder = vnfd["_admin"]["storage"]
768 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
769 vdud.get("cloud-init-file"))
770 with self.fs.file_open(cloud_init_file, "r") as ci_file:
771 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
772 elif vdud.get("cloud-init"):
773 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
774 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
775 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
776 vdur["additionalParams"] = vdur.get("additionalParams") or {}
777 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
778 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
779 vdur["additionalParams"] = deploy_params_vdu
780
781 # flavor
782 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
783 if target_vim not in ns_flavor["vim_info"]:
784 ns_flavor["vim_info"][target_vim] = {}
785
786 # deal with images
787 # in case alternative images are provided we must check if they should be applied
788 # for the vim_type, modify the vim_type taking into account
789 ns_image_id = int(vdur["ns-image-id"])
790 if vdur.get("alt-image-ids"):
791 db_vim = get_vim_account(vnfr["vim-account-id"])
792 vim_type = db_vim["vim_type"]
793 for alt_image_id in vdur.get("alt-image-ids"):
794 ns_alt_image = target["image"][int(alt_image_id)]
795 if vim_type == ns_alt_image.get("vim-type"):
796 # must use alternative image
797 self.logger.debug("use alternative image id: {}".format(alt_image_id))
798 ns_image_id = alt_image_id
799 vdur["ns-image-id"] = ns_image_id
800 break
801 ns_image = target["image"][int(ns_image_id)]
802 if target_vim not in ns_image["vim_info"]:
803 ns_image["vim_info"][target_vim] = {}
804
805 vdur["vim_info"] = {target_vim: {}}
806 # instantiation parameters
807 # if vnf_params:
808 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
809 # vdud["id"]), None)
810 vdur_list.append(vdur)
811 target_vnf["vdur"] = vdur_list
812 target["vnf"].append(target_vnf)
813
814 desc = await self.RO.deploy(nsr_id, target)
815 self.logger.debug("RO return > {}".format(desc))
816 action_id = desc["action_id"]
817 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
818
819 # Updating NSR
820 db_nsr_update = {
821 "_admin.deployed.RO.operational-status": "running",
822 "detailed-status": " ".join(stage)
823 }
824 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
825 self.update_db_2("nsrs", nsr_id, db_nsr_update)
826 self._write_op_status(nslcmop_id, stage)
827 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
828 return
829
830 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
831 detailed_status_old = None
832 db_nsr_update = {}
833 start_time = start_time or time()
834 while time() <= start_time + timeout:
835 desc_status = await self.RO.status(nsr_id, action_id)
836 self.logger.debug("Wait NG RO > {}".format(desc_status))
837 if desc_status["status"] == "FAILED":
838 raise NgRoException(desc_status["details"])
839 elif desc_status["status"] == "BUILD":
840 if stage:
841 stage[2] = "VIM: ({})".format(desc_status["details"])
842 elif desc_status["status"] == "DONE":
843 if stage:
844 stage[2] = "Deployed at VIM"
845 break
846 else:
847 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
848 if stage and nslcmop_id and stage[2] != detailed_status_old:
849 detailed_status_old = stage[2]
850 db_nsr_update["detailed-status"] = " ".join(stage)
851 self.update_db_2("nsrs", nsr_id, db_nsr_update)
852 self._write_op_status(nslcmop_id, stage)
853 await asyncio.sleep(15, loop=self.loop)
854 else: # timeout_ns_deploy
855 raise NgRoException("Timeout waiting ns to deploy")
856
857 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
858 db_nsr_update = {}
859 failed_detail = []
860 action_id = None
861 start_deploy = time()
862 try:
863 target = {
864 "ns": {"vld": []},
865 "vnf": [],
866 "image": [],
867 "flavor": [],
868 "action_id": nslcmop_id
869 }
870 desc = await self.RO.deploy(nsr_id, target)
871 action_id = desc["action_id"]
872 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
873 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
874 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
875
876 # wait until done
877 delete_timeout = 20 * 60 # 20 minutes
878 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
879
880 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
881 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
882 # delete all nsr
883 await self.RO.delete(nsr_id)
884 except Exception as e:
885 if isinstance(e, NgRoException) and e.http_code == 404: # not found
886 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
887 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
888 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
889 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
890 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
891 failed_detail.append("delete conflict: {}".format(e))
892 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
893 else:
894 failed_detail.append("delete error: {}".format(e))
895 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
896
897 if failed_detail:
898 stage[2] = "Error deleting from VIM"
899 else:
900 stage[2] = "Deleted from VIM"
901 db_nsr_update["detailed-status"] = " ".join(stage)
902 self.update_db_2("nsrs", nsr_id, db_nsr_update)
903 self._write_op_status(nslcmop_id, stage)
904
905 if failed_detail:
906 raise LcmException("; ".join(failed_detail))
907 return
908
909 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
910 n2vc_key_list, stage):
911 """
912 Instantiate at RO
913 :param logging_text: preffix text to use at logging
914 :param nsr_id: nsr identity
915 :param nsd: database content of ns descriptor
916 :param db_nsr: database content of ns record
917 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
918 :param db_vnfrs:
919 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
920 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
921 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
922 :return: None or exception
923 """
924 try:
925 start_deploy = time()
926 ns_params = db_nslcmop.get("operationParams")
927 if ns_params and ns_params.get("timeout_ns_deploy"):
928 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
929 else:
930 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
931
932 # Check for and optionally request placement optimization. Database will be updated if placement activated
933 stage[2] = "Waiting for Placement."
934 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
935 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
936 for vnfr in db_vnfrs.values():
937 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
938 break
939 else:
940 ns_params["vimAccountId"] == vnfr["vim-account-id"]
941
942 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
943 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
944 except Exception as e:
945 stage[2] = "ERROR deploying at VIM"
946 self.set_vnfr_at_error(db_vnfrs, str(e))
947 self.logger.error("Error deploying at VIM {}".format(e),
948 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
949 NgRoException)))
950 raise
951
952 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
953 """
954 Wait for kdu to be up, get ip address
955 :param logging_text: prefix use for logging
956 :param nsr_id:
957 :param vnfr_id:
958 :param kdu_name:
959 :return: IP address
960 """
961
962 # self.logger.debug(logging_text + "Starting wait_kdu_up")
963 nb_tries = 0
964
965 while nb_tries < 360:
966 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
967 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
968 if not kdur:
969 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
970 if kdur.get("status"):
971 if kdur["status"] in ("READY", "ENABLED"):
972 return kdur.get("ip-address")
973 else:
974 raise LcmException("target KDU={} is in error state".format(kdu_name))
975
976 await asyncio.sleep(10, loop=self.loop)
977 nb_tries += 1
978 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
979
980 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
981 """
982 Wait for ip addres at RO, and optionally, insert public key in virtual machine
983 :param logging_text: prefix use for logging
984 :param nsr_id:
985 :param vnfr_id:
986 :param vdu_id:
987 :param vdu_index:
988 :param pub_key: public ssh key to inject, None to skip
989 :param user: user to apply the public ssh key
990 :return: IP address
991 """
992
993 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
994 ro_nsr_id = None
995 ip_address = None
996 nb_tries = 0
997 target_vdu_id = None
998 ro_retries = 0
999
1000 while True:
1001
1002 ro_retries += 1
1003 if ro_retries >= 360: # 1 hour
1004 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1005
1006 await asyncio.sleep(10, loop=self.loop)
1007
1008 # get ip address
1009 if not target_vdu_id:
1010 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1011
1012 if not vdu_id: # for the VNF case
1013 if db_vnfr.get("status") == "ERROR":
1014 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1015 ip_address = db_vnfr.get("ip-address")
1016 if not ip_address:
1017 continue
1018 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1019 else: # VDU case
1020 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1021 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1022
1023 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1024 vdur = db_vnfr["vdur"][0]
1025 if not vdur:
1026 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1027 vdu_index))
1028 # New generation RO stores information at "vim_info"
1029 ng_ro_status = None
1030 target_vim = None
1031 if vdur.get("vim_info"):
1032 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1033 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1034 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1035 ip_address = vdur.get("ip-address")
1036 if not ip_address:
1037 continue
1038 target_vdu_id = vdur["vdu-id-ref"]
1039 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1040 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1041
1042 if not target_vdu_id:
1043 continue
1044
1045 # inject public key into machine
1046 if pub_key and user:
1047 self.logger.debug(logging_text + "Inserting RO key")
1048 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1049 if vdur.get("pdu-type"):
1050 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1051 return ip_address
1052 try:
1053 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1054 if self.ng_ro:
1055 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1056 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1057 }
1058 desc = await self.RO.deploy(nsr_id, target)
1059 action_id = desc["action_id"]
1060 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1061 break
1062 else:
1063 # wait until NS is deployed at RO
1064 if not ro_nsr_id:
1065 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1066 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1067 if not ro_nsr_id:
1068 continue
1069 result_dict = await self.RO.create_action(
1070 item="ns",
1071 item_id_name=ro_nsr_id,
1072 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1073 )
1074 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1075 if not result_dict or not isinstance(result_dict, dict):
1076 raise LcmException("Unknown response from RO when injecting key")
1077 for result in result_dict.values():
1078 if result.get("vim_result") == 200:
1079 break
1080 else:
1081 raise ROclient.ROClientException("error injecting key: {}".format(
1082 result.get("description")))
1083 break
1084 except NgRoException as e:
1085 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1086 except ROclient.ROClientException as e:
1087 if not nb_tries:
1088 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1089 format(e, 20*10))
1090 nb_tries += 1
1091 if nb_tries >= 20:
1092 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1093 else:
1094 break
1095
1096 return ip_address
1097
1098 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1099 """
1100 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1101 """
1102 my_vca = vca_deployed_list[vca_index]
1103 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1104 # vdu or kdu: no dependencies
1105 return
1106 timeout = 300
1107 while timeout >= 0:
1108 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1109 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1110 configuration_status_list = db_nsr["configurationStatus"]
1111 for index, vca_deployed in enumerate(configuration_status_list):
1112 if index == vca_index:
1113 # myself
1114 continue
1115 if not my_vca.get("member-vnf-index") or \
1116 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1117 internal_status = configuration_status_list[index].get("status")
1118 if internal_status == 'READY':
1119 continue
1120 elif internal_status == 'BROKEN':
1121 raise LcmException("Configuration aborted because dependent charm/s has failed")
1122 else:
1123 break
1124 else:
1125 # no dependencies, return
1126 return
1127 await asyncio.sleep(10)
1128 timeout -= 1
1129
1130 raise LcmException("Configuration aborted because dependent charm/s timeout")
1131
1132 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1133 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1134 ee_config_descriptor):
1135 nsr_id = db_nsr["_id"]
1136 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1137 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1138 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1139 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1140 db_dict = {
1141 'collection': 'nsrs',
1142 'filter': {'_id': nsr_id},
1143 'path': db_update_entry
1144 }
1145 step = ""
1146 try:
1147
1148 element_type = 'NS'
1149 element_under_configuration = nsr_id
1150
1151 vnfr_id = None
1152 if db_vnfr:
1153 vnfr_id = db_vnfr["_id"]
1154 osm_config["osm"]["vnf_id"] = vnfr_id
1155
1156 namespace = "{nsi}.{ns}".format(
1157 nsi=nsi_id if nsi_id else "",
1158 ns=nsr_id)
1159
1160 if vnfr_id:
1161 element_type = 'VNF'
1162 element_under_configuration = vnfr_id
1163 namespace += ".{}".format(vnfr_id)
1164 if vdu_id:
1165 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1166 element_type = 'VDU'
1167 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1168 osm_config["osm"]["vdu_id"] = vdu_id
1169 elif kdu_name:
1170 namespace += ".{}".format(kdu_name)
1171 element_type = 'KDU'
1172 element_under_configuration = kdu_name
1173 osm_config["osm"]["kdu_name"] = kdu_name
1174
1175 # Get artifact path
1176 artifact_path = "{}/{}/{}/{}".format(
1177 base_folder["folder"],
1178 base_folder["pkg-dir"],
1179 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1180 vca_name
1181 )
1182
1183 self.logger.debug("Artifact path > {}".format(artifact_path))
1184
1185 # get initial_config_primitive_list that applies to this element
1186 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1187
1188 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1189
1190 # add config if not present for NS charm
1191 ee_descriptor_id = ee_config_descriptor.get("id")
1192 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1193 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1194 vca_deployed, ee_descriptor_id)
1195
1196 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1197 # n2vc_redesign STEP 3.1
1198 # find old ee_id if exists
1199 ee_id = vca_deployed.get("ee_id")
1200
1201 vim_account_id = (
1202 deep_get(db_vnfr, ("vim-account-id",)) or
1203 deep_get(deploy_params, ("OSM", "vim_account_id"))
1204 )
1205 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1206 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1207 # create or register execution environment in VCA
1208 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1209
1210 self._write_configuration_status(
1211 nsr_id=nsr_id,
1212 vca_index=vca_index,
1213 status='CREATING',
1214 element_under_configuration=element_under_configuration,
1215 element_type=element_type
1216 )
1217
1218 step = "create execution environment"
1219 self.logger.debug(logging_text + step)
1220
1221 ee_id = None
1222 credentials = None
1223 if vca_type == "k8s_proxy_charm":
1224 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1225 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1226 namespace=namespace,
1227 artifact_path=artifact_path,
1228 db_dict=db_dict,
1229 cloud_name=vca_k8s_cloud,
1230 credential_name=vca_k8s_cloud_credential,
1231 )
1232 elif vca_type == "helm" or vca_type == "helm-v3":
1233 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1234 namespace=namespace,
1235 reuse_ee_id=ee_id,
1236 db_dict=db_dict,
1237 config=osm_config,
1238 artifact_path=artifact_path,
1239 vca_type=vca_type
1240 )
1241 else:
1242 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1243 namespace=namespace,
1244 reuse_ee_id=ee_id,
1245 db_dict=db_dict,
1246 cloud_name=vca_cloud,
1247 credential_name=vca_cloud_credential,
1248 )
1249
1250 elif vca_type == "native_charm":
1251 step = "Waiting to VM being up and getting IP address"
1252 self.logger.debug(logging_text + step)
1253 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1254 user=None, pub_key=None)
1255 credentials = {"hostname": rw_mgmt_ip}
1256 # get username
1257 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1258 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1259 # merged. Meanwhile let's get username from initial-config-primitive
1260 if not username and initial_config_primitive_list:
1261 for config_primitive in initial_config_primitive_list:
1262 for param in config_primitive.get("parameter", ()):
1263 if param["name"] == "ssh-username":
1264 username = param["value"]
1265 break
1266 if not username:
1267 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1268 "'config-access.ssh-access.default-user'")
1269 credentials["username"] = username
1270 # n2vc_redesign STEP 3.2
1271
1272 self._write_configuration_status(
1273 nsr_id=nsr_id,
1274 vca_index=vca_index,
1275 status='REGISTERING',
1276 element_under_configuration=element_under_configuration,
1277 element_type=element_type
1278 )
1279
1280 step = "register execution environment {}".format(credentials)
1281 self.logger.debug(logging_text + step)
1282 ee_id = await self.vca_map[vca_type].register_execution_environment(
1283 credentials=credentials,
1284 namespace=namespace,
1285 db_dict=db_dict,
1286 cloud_name=vca_cloud,
1287 credential_name=vca_cloud_credential,
1288 )
1289
1290 # for compatibility with MON/POL modules, the need model and application name at database
1291 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1292 ee_id_parts = ee_id.split('.')
1293 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1294 if len(ee_id_parts) >= 2:
1295 model_name = ee_id_parts[0]
1296 application_name = ee_id_parts[1]
1297 db_nsr_update[db_update_entry + "model"] = model_name
1298 db_nsr_update[db_update_entry + "application"] = application_name
1299
1300 # n2vc_redesign STEP 3.3
1301 step = "Install configuration Software"
1302
1303 self._write_configuration_status(
1304 nsr_id=nsr_id,
1305 vca_index=vca_index,
1306 status='INSTALLING SW',
1307 element_under_configuration=element_under_configuration,
1308 element_type=element_type,
1309 other_update=db_nsr_update
1310 )
1311
1312 # TODO check if already done
1313 self.logger.debug(logging_text + step)
1314 config = None
1315 if vca_type == "native_charm":
1316 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1317 if config_primitive:
1318 config = self._map_primitive_params(
1319 config_primitive,
1320 {},
1321 deploy_params
1322 )
1323 num_units = 1
1324 if vca_type == "lxc_proxy_charm":
1325 if element_type == "NS":
1326 num_units = db_nsr.get("config-units") or 1
1327 elif element_type == "VNF":
1328 num_units = db_vnfr.get("config-units") or 1
1329 elif element_type == "VDU":
1330 for v in db_vnfr["vdur"]:
1331 if vdu_id == v["vdu-id-ref"]:
1332 num_units = v.get("config-units") or 1
1333 break
1334 if vca_type != "k8s_proxy_charm":
1335 await self.vca_map[vca_type].install_configuration_sw(
1336 ee_id=ee_id,
1337 artifact_path=artifact_path,
1338 db_dict=db_dict,
1339 config=config,
1340 num_units=num_units,
1341 )
1342
1343 # write in db flag of configuration_sw already installed
1344 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1345
1346 # add relations for this VCA (wait for other peers related with this VCA)
1347 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1348 vca_index=vca_index, vca_type=vca_type)
1349
1350 # if SSH access is required, then get execution environment SSH public
1351 # if native charm we have waited already to VM be UP
1352 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1353 pub_key = None
1354 user = None
1355 # self.logger.debug("get ssh key block")
1356 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1357 # self.logger.debug("ssh key needed")
1358 # Needed to inject a ssh key
1359 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1360 step = "Install configuration Software, getting public ssh key"
1361 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1362
1363 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1364 else:
1365 # self.logger.debug("no need to get ssh key")
1366 step = "Waiting to VM being up and getting IP address"
1367 self.logger.debug(logging_text + step)
1368
1369 # n2vc_redesign STEP 5.1
1370 # wait for RO (ip-address) Insert pub_key into VM
1371 if vnfr_id:
1372 if kdu_name:
1373 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1374 else:
1375 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1376 vdu_index, user=user, pub_key=pub_key)
1377 else:
1378 rw_mgmt_ip = None # This is for a NS configuration
1379
1380 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1381
1382 # store rw_mgmt_ip in deploy params for later replacement
1383 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1384
1385 # n2vc_redesign STEP 6 Execute initial config primitive
1386 step = 'execute initial config primitive'
1387
1388 # wait for dependent primitives execution (NS -> VNF -> VDU)
1389 if initial_config_primitive_list:
1390 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1391
1392 # stage, in function of element type: vdu, kdu, vnf or ns
1393 my_vca = vca_deployed_list[vca_index]
1394 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1395 # VDU or KDU
1396 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1397 elif my_vca.get("member-vnf-index"):
1398 # VNF
1399 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1400 else:
1401 # NS
1402 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1403
1404 self._write_configuration_status(
1405 nsr_id=nsr_id,
1406 vca_index=vca_index,
1407 status='EXECUTING PRIMITIVE'
1408 )
1409
1410 self._write_op_status(
1411 op_id=nslcmop_id,
1412 stage=stage
1413 )
1414
1415 check_if_terminated_needed = True
1416 for initial_config_primitive in initial_config_primitive_list:
1417 # adding information on the vca_deployed if it is a NS execution environment
1418 if not vca_deployed["member-vnf-index"]:
1419 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1420 # TODO check if already done
1421 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1422
1423 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1424 self.logger.debug(logging_text + step)
1425 await self.vca_map[vca_type].exec_primitive(
1426 ee_id=ee_id,
1427 primitive_name=initial_config_primitive["name"],
1428 params_dict=primitive_params_,
1429 db_dict=db_dict
1430 )
1431 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1432 if check_if_terminated_needed:
1433 if config_descriptor.get('terminate-config-primitive'):
1434 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1435 check_if_terminated_needed = False
1436
1437 # TODO register in database that primitive is done
1438
1439 # STEP 7 Configure metrics
1440 if vca_type == "helm" or vca_type == "helm-v3":
1441 prometheus_jobs = await self.add_prometheus_metrics(
1442 ee_id=ee_id,
1443 artifact_path=artifact_path,
1444 ee_config_descriptor=ee_config_descriptor,
1445 vnfr_id=vnfr_id,
1446 nsr_id=nsr_id,
1447 target_ip=rw_mgmt_ip,
1448 )
1449 if prometheus_jobs:
1450 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1451
1452 step = "instantiated at VCA"
1453 self.logger.debug(logging_text + step)
1454
1455 self._write_configuration_status(
1456 nsr_id=nsr_id,
1457 vca_index=vca_index,
1458 status='READY'
1459 )
1460
1461 except Exception as e: # TODO not use Exception but N2VC exception
1462 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1463 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1464 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1465 self._write_configuration_status(
1466 nsr_id=nsr_id,
1467 vca_index=vca_index,
1468 status='BROKEN'
1469 )
1470 raise LcmException("{} {}".format(step, e)) from e
1471
1472 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1473 error_description: str = None, error_detail: str = None, other_update: dict = None):
1474 """
1475 Update db_nsr fields.
1476 :param nsr_id:
1477 :param ns_state:
1478 :param current_operation:
1479 :param current_operation_id:
1480 :param error_description:
1481 :param error_detail:
1482 :param other_update: Other required changes at database if provided, will be cleared
1483 :return:
1484 """
1485 try:
1486 db_dict = other_update or {}
1487 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1488 db_dict["_admin.current-operation"] = current_operation_id
1489 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1490 db_dict["currentOperation"] = current_operation
1491 db_dict["currentOperationID"] = current_operation_id
1492 db_dict["errorDescription"] = error_description
1493 db_dict["errorDetail"] = error_detail
1494
1495 if ns_state:
1496 db_dict["nsState"] = ns_state
1497 self.update_db_2("nsrs", nsr_id, db_dict)
1498 except DbException as e:
1499 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1500
1501 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1502 operation_state: str = None, other_update: dict = None):
1503 try:
1504 db_dict = other_update or {}
1505 db_dict['queuePosition'] = queuePosition
1506 if isinstance(stage, list):
1507 db_dict['stage'] = stage[0]
1508 db_dict['detailed-status'] = " ".join(stage)
1509 elif stage is not None:
1510 db_dict['stage'] = str(stage)
1511
1512 if error_message is not None:
1513 db_dict['errorMessage'] = error_message
1514 if operation_state is not None:
1515 db_dict['operationState'] = operation_state
1516 db_dict["statusEnteredTime"] = time()
1517 self.update_db_2("nslcmops", op_id, db_dict)
1518 except DbException as e:
1519 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1520
1521 def _write_all_config_status(self, db_nsr: dict, status: str):
1522 try:
1523 nsr_id = db_nsr["_id"]
1524 # configurationStatus
1525 config_status = db_nsr.get('configurationStatus')
1526 if config_status:
1527 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1528 enumerate(config_status) if v}
1529 # update status
1530 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1531
1532 except DbException as e:
1533 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1534
1535 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1536 element_under_configuration: str = None, element_type: str = None,
1537 other_update: dict = None):
1538
1539 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1540 # .format(vca_index, status))
1541
1542 try:
1543 db_path = 'configurationStatus.{}.'.format(vca_index)
1544 db_dict = other_update or {}
1545 if status:
1546 db_dict[db_path + 'status'] = status
1547 if element_under_configuration:
1548 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1549 if element_type:
1550 db_dict[db_path + 'elementType'] = element_type
1551 self.update_db_2("nsrs", nsr_id, db_dict)
1552 except DbException as e:
1553 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1554 .format(status, nsr_id, vca_index, e))
1555
1556 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1557 """
1558 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1559 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1560 Database is used because the result can be obtained from a different LCM worker in case of HA.
1561 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1562 :param db_nslcmop: database content of nslcmop
1563 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1564 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1565 computed 'vim-account-id'
1566 """
1567 modified = False
1568 nslcmop_id = db_nslcmop['_id']
1569 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1570 if placement_engine == "PLA":
1571 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1572 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1573 db_poll_interval = 5
1574 wait = db_poll_interval * 10
1575 pla_result = None
1576 while not pla_result and wait >= 0:
1577 await asyncio.sleep(db_poll_interval)
1578 wait -= db_poll_interval
1579 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1580 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1581
1582 if not pla_result:
1583 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1584
1585 for pla_vnf in pla_result['vnf']:
1586 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1587 if not pla_vnf.get('vimAccountId') or not vnfr:
1588 continue
1589 modified = True
1590 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1591 # Modifies db_vnfrs
1592 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1593 return modified
1594
1595 def update_nsrs_with_pla_result(self, params):
1596 try:
1597 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1598 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1599 except Exception as e:
1600 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1601
1602 async def instantiate(self, nsr_id, nslcmop_id):
1603 """
1604
1605 :param nsr_id: ns instance to deploy
1606 :param nslcmop_id: operation to run
1607 :return:
1608 """
1609
1610 # Try to lock HA task here
1611 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1612 if not task_is_locked_by_me:
1613 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1614 return
1615
1616 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1617 self.logger.debug(logging_text + "Enter")
1618
1619 # get all needed from database
1620
1621 # database nsrs record
1622 db_nsr = None
1623
1624 # database nslcmops record
1625 db_nslcmop = None
1626
1627 # update operation on nsrs
1628 db_nsr_update = {}
1629 # update operation on nslcmops
1630 db_nslcmop_update = {}
1631
1632 nslcmop_operation_state = None
1633 db_vnfrs = {} # vnf's info indexed by member-index
1634 # n2vc_info = {}
1635 tasks_dict_info = {} # from task to info text
1636 exc = None
1637 error_list = []
1638 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1639 # ^ stage, step, VIM progress
1640 try:
1641 # wait for any previous tasks in process
1642 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1643
1644 stage[1] = "Sync filesystem from database."
1645 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1646
1647 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1648 stage[1] = "Reading from database."
1649 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1650 db_nsr_update["detailed-status"] = "creating"
1651 db_nsr_update["operational-status"] = "init"
1652 self._write_ns_status(
1653 nsr_id=nsr_id,
1654 ns_state="BUILDING",
1655 current_operation="INSTANTIATING",
1656 current_operation_id=nslcmop_id,
1657 other_update=db_nsr_update
1658 )
1659 self._write_op_status(
1660 op_id=nslcmop_id,
1661 stage=stage,
1662 queuePosition=0
1663 )
1664
1665 # read from db: operation
1666 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1667 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1668 ns_params = db_nslcmop.get("operationParams")
1669 if ns_params and ns_params.get("timeout_ns_deploy"):
1670 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1671 else:
1672 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1673
1674 # read from db: ns
1675 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1676 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1677 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1678 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1679 db_nsr["nsd"] = nsd
1680 # nsr_name = db_nsr["name"] # TODO short-name??
1681
1682 # read from db: vnf's of this ns
1683 stage[1] = "Getting vnfrs from db."
1684 self.logger.debug(logging_text + stage[1])
1685 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1686
1687 # read from db: vnfd's for every vnf
1688 db_vnfds = [] # every vnfd data
1689
1690 # for each vnf in ns, read vnfd
1691 for vnfr in db_vnfrs_list:
1692 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1693 vnfd_id = vnfr["vnfd-id"]
1694 vnfd_ref = vnfr["vnfd-ref"]
1695
1696 # if we haven't this vnfd, read it from db
1697 if vnfd_id not in db_vnfds:
1698 # read from db
1699 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1700 self.logger.debug(logging_text + stage[1])
1701 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1702
1703 # store vnfd
1704 db_vnfds.append(vnfd)
1705
1706 # Get or generates the _admin.deployed.VCA list
1707 vca_deployed_list = None
1708 if db_nsr["_admin"].get("deployed"):
1709 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1710 if vca_deployed_list is None:
1711 vca_deployed_list = []
1712 configuration_status_list = []
1713 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1714 db_nsr_update["configurationStatus"] = configuration_status_list
1715 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1716 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1717 elif isinstance(vca_deployed_list, dict):
1718 # maintain backward compatibility. Change a dict to list at database
1719 vca_deployed_list = list(vca_deployed_list.values())
1720 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1721 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1722
1723 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1724 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1725 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1726
1727 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1728 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1729 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1730 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1731
1732 # n2vc_redesign STEP 2 Deploy Network Scenario
1733 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1734 self._write_op_status(
1735 op_id=nslcmop_id,
1736 stage=stage
1737 )
1738
1739 stage[1] = "Deploying KDUs."
1740 # self.logger.debug(logging_text + "Before deploy_kdus")
1741 # Call to deploy_kdus in case exists the "vdu:kdu" param
1742 await self.deploy_kdus(
1743 logging_text=logging_text,
1744 nsr_id=nsr_id,
1745 nslcmop_id=nslcmop_id,
1746 db_vnfrs=db_vnfrs,
1747 db_vnfds=db_vnfds,
1748 task_instantiation_info=tasks_dict_info,
1749 )
1750
1751 stage[1] = "Getting VCA public key."
1752 # n2vc_redesign STEP 1 Get VCA public ssh-key
1753 # feature 1429. Add n2vc public key to needed VMs
1754 n2vc_key = self.n2vc.get_public_key()
1755 n2vc_key_list = [n2vc_key]
1756 if self.vca_config.get("public_key"):
1757 n2vc_key_list.append(self.vca_config["public_key"])
1758
1759 stage[1] = "Deploying NS at VIM."
1760 task_ro = asyncio.ensure_future(
1761 self.instantiate_RO(
1762 logging_text=logging_text,
1763 nsr_id=nsr_id,
1764 nsd=nsd,
1765 db_nsr=db_nsr,
1766 db_nslcmop=db_nslcmop,
1767 db_vnfrs=db_vnfrs,
1768 db_vnfds=db_vnfds,
1769 n2vc_key_list=n2vc_key_list,
1770 stage=stage
1771 )
1772 )
1773 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1774 tasks_dict_info[task_ro] = "Deploying at VIM"
1775
1776 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1777 stage[1] = "Deploying Execution Environments."
1778 self.logger.debug(logging_text + stage[1])
1779
1780 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1781 for vnf_profile in get_vnf_profiles(nsd):
1782 vnfd_id = vnf_profile["vnfd-id"]
1783 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1784 member_vnf_index = str(vnf_profile["id"])
1785 db_vnfr = db_vnfrs[member_vnf_index]
1786 base_folder = vnfd["_admin"]["storage"]
1787 vdu_id = None
1788 vdu_index = 0
1789 vdu_name = None
1790 kdu_name = None
1791
1792 # Get additional parameters
1793 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1794 if db_vnfr.get("additionalParamsForVnf"):
1795 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1796
1797 descriptor_config = get_configuration(vnfd, vnfd["id"])
1798 if descriptor_config:
1799 self._deploy_n2vc(
1800 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1801 db_nsr=db_nsr,
1802 db_vnfr=db_vnfr,
1803 nslcmop_id=nslcmop_id,
1804 nsr_id=nsr_id,
1805 nsi_id=nsi_id,
1806 vnfd_id=vnfd_id,
1807 vdu_id=vdu_id,
1808 kdu_name=kdu_name,
1809 member_vnf_index=member_vnf_index,
1810 vdu_index=vdu_index,
1811 vdu_name=vdu_name,
1812 deploy_params=deploy_params,
1813 descriptor_config=descriptor_config,
1814 base_folder=base_folder,
1815 task_instantiation_info=tasks_dict_info,
1816 stage=stage
1817 )
1818
1819 # Deploy charms for each VDU that supports one.
1820 for vdud in get_vdu_list(vnfd):
1821 vdu_id = vdud["id"]
1822 descriptor_config = get_configuration(vnfd, vdu_id)
1823 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1824
1825 if vdur.get("additionalParams"):
1826 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1827 else:
1828 deploy_params_vdu = deploy_params
1829 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1830 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1831
1832 self.logger.debug("VDUD > {}".format(vdud))
1833 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1834 if descriptor_config:
1835 vdu_name = None
1836 kdu_name = None
1837 for vdu_index in range(vdud_count):
1838 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1839 self._deploy_n2vc(
1840 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1841 member_vnf_index, vdu_id, vdu_index),
1842 db_nsr=db_nsr,
1843 db_vnfr=db_vnfr,
1844 nslcmop_id=nslcmop_id,
1845 nsr_id=nsr_id,
1846 nsi_id=nsi_id,
1847 vnfd_id=vnfd_id,
1848 vdu_id=vdu_id,
1849 kdu_name=kdu_name,
1850 member_vnf_index=member_vnf_index,
1851 vdu_index=vdu_index,
1852 vdu_name=vdu_name,
1853 deploy_params=deploy_params_vdu,
1854 descriptor_config=descriptor_config,
1855 base_folder=base_folder,
1856 task_instantiation_info=tasks_dict_info,
1857 stage=stage
1858 )
1859 for kdud in get_kdu_list(vnfd):
1860 kdu_name = kdud["name"]
1861 descriptor_config = get_configuration(vnfd, kdu_name)
1862 if descriptor_config:
1863 vdu_id = None
1864 vdu_index = 0
1865 vdu_name = None
1866 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1867 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1868 if kdur.get("additionalParams"):
1869 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1870
1871 self._deploy_n2vc(
1872 logging_text=logging_text,
1873 db_nsr=db_nsr,
1874 db_vnfr=db_vnfr,
1875 nslcmop_id=nslcmop_id,
1876 nsr_id=nsr_id,
1877 nsi_id=nsi_id,
1878 vnfd_id=vnfd_id,
1879 vdu_id=vdu_id,
1880 kdu_name=kdu_name,
1881 member_vnf_index=member_vnf_index,
1882 vdu_index=vdu_index,
1883 vdu_name=vdu_name,
1884 deploy_params=deploy_params_kdu,
1885 descriptor_config=descriptor_config,
1886 base_folder=base_folder,
1887 task_instantiation_info=tasks_dict_info,
1888 stage=stage
1889 )
1890
1891 # Check if this NS has a charm configuration
1892 descriptor_config = nsd.get("ns-configuration")
1893 if descriptor_config and descriptor_config.get("juju"):
1894 vnfd_id = None
1895 db_vnfr = None
1896 member_vnf_index = None
1897 vdu_id = None
1898 kdu_name = None
1899 vdu_index = 0
1900 vdu_name = None
1901
1902 # Get additional parameters
1903 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1904 if db_nsr.get("additionalParamsForNs"):
1905 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1906 base_folder = nsd["_admin"]["storage"]
1907 self._deploy_n2vc(
1908 logging_text=logging_text,
1909 db_nsr=db_nsr,
1910 db_vnfr=db_vnfr,
1911 nslcmop_id=nslcmop_id,
1912 nsr_id=nsr_id,
1913 nsi_id=nsi_id,
1914 vnfd_id=vnfd_id,
1915 vdu_id=vdu_id,
1916 kdu_name=kdu_name,
1917 member_vnf_index=member_vnf_index,
1918 vdu_index=vdu_index,
1919 vdu_name=vdu_name,
1920 deploy_params=deploy_params,
1921 descriptor_config=descriptor_config,
1922 base_folder=base_folder,
1923 task_instantiation_info=tasks_dict_info,
1924 stage=stage
1925 )
1926
1927 # rest of staff will be done at finally
1928
1929 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1930 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1931 exc = e
1932 except asyncio.CancelledError:
1933 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1934 exc = "Operation was cancelled"
1935 except Exception as e:
1936 exc = traceback.format_exc()
1937 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1938 finally:
1939 if exc:
1940 error_list.append(str(exc))
1941 try:
1942 # wait for pending tasks
1943 if tasks_dict_info:
1944 stage[1] = "Waiting for instantiate pending tasks."
1945 self.logger.debug(logging_text + stage[1])
1946 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1947 stage, nslcmop_id, nsr_id=nsr_id)
1948 stage[1] = stage[2] = ""
1949 except asyncio.CancelledError:
1950 error_list.append("Cancelled")
1951 # TODO cancel all tasks
1952 except Exception as exc:
1953 error_list.append(str(exc))
1954
1955 # update operation-status
1956 db_nsr_update["operational-status"] = "running"
1957 # let's begin with VCA 'configured' status (later we can change it)
1958 db_nsr_update["config-status"] = "configured"
1959 for task, task_name in tasks_dict_info.items():
1960 if not task.done() or task.cancelled() or task.exception():
1961 if task_name.startswith(self.task_name_deploy_vca):
1962 # A N2VC task is pending
1963 db_nsr_update["config-status"] = "failed"
1964 else:
1965 # RO or KDU task is pending
1966 db_nsr_update["operational-status"] = "failed"
1967
1968 # update status at database
1969 if error_list:
1970 error_detail = ". ".join(error_list)
1971 self.logger.error(logging_text + error_detail)
1972 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
1973 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
1974
1975 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1976 db_nslcmop_update["detailed-status"] = error_detail
1977 nslcmop_operation_state = "FAILED"
1978 ns_state = "BROKEN"
1979 else:
1980 error_detail = None
1981 error_description_nsr = error_description_nslcmop = None
1982 ns_state = "READY"
1983 db_nsr_update["detailed-status"] = "Done"
1984 db_nslcmop_update["detailed-status"] = "Done"
1985 nslcmop_operation_state = "COMPLETED"
1986
1987 if db_nsr:
1988 self._write_ns_status(
1989 nsr_id=nsr_id,
1990 ns_state=ns_state,
1991 current_operation="IDLE",
1992 current_operation_id=None,
1993 error_description=error_description_nsr,
1994 error_detail=error_detail,
1995 other_update=db_nsr_update
1996 )
1997 self._write_op_status(
1998 op_id=nslcmop_id,
1999 stage="",
2000 error_message=error_description_nslcmop,
2001 operation_state=nslcmop_operation_state,
2002 other_update=db_nslcmop_update,
2003 )
2004
2005 if nslcmop_operation_state:
2006 try:
2007 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2008 "operationState": nslcmop_operation_state},
2009 loop=self.loop)
2010 except Exception as e:
2011 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2012
2013 self.logger.debug(logging_text + "Exit")
2014 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2015
2016 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2017 timeout: int = 3600, vca_type: str = None) -> bool:
2018
2019 # steps:
2020 # 1. find all relations for this VCA
2021 # 2. wait for other peers related
2022 # 3. add relations
2023
2024 try:
2025 vca_type = vca_type or "lxc_proxy_charm"
2026
2027 # STEP 1: find all relations for this VCA
2028
2029 # read nsr record
2030 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2031 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2032
2033 # this VCA data
2034 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2035
2036 # read all ns-configuration relations
2037 ns_relations = list()
2038 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2039 if db_ns_relations:
2040 for r in db_ns_relations:
2041 # check if this VCA is in the relation
2042 if my_vca.get('member-vnf-index') in\
2043 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2044 ns_relations.append(r)
2045
2046 # read all vnf-configuration relations
2047 vnf_relations = list()
2048 db_vnfd_list = db_nsr.get('vnfd-id')
2049 if db_vnfd_list:
2050 for vnfd in db_vnfd_list:
2051 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2052 db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
2053 if db_vnf_relations:
2054 for r in db_vnf_relations:
2055 # check if this VCA is in the relation
2056 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2057 vnf_relations.append(r)
2058
2059 # if no relations, terminate
2060 if not ns_relations and not vnf_relations:
2061 self.logger.debug(logging_text + ' No relations')
2062 return True
2063
2064 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2065
2066 # add all relations
2067 start = time()
2068 while True:
2069 # check timeout
2070 now = time()
2071 if now - start >= timeout:
2072 self.logger.error(logging_text + ' : timeout adding relations')
2073 return False
2074
2075 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2076 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2077
2078 # for each defined NS relation, find the VCA's related
2079 for r in ns_relations.copy():
2080 from_vca_ee_id = None
2081 to_vca_ee_id = None
2082 from_vca_endpoint = None
2083 to_vca_endpoint = None
2084 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2085 for vca in vca_list:
2086 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2087 and vca.get('config_sw_installed'):
2088 from_vca_ee_id = vca.get('ee_id')
2089 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2090 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2091 and vca.get('config_sw_installed'):
2092 to_vca_ee_id = vca.get('ee_id')
2093 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2094 if from_vca_ee_id and to_vca_ee_id:
2095 # add relation
2096 await self.vca_map[vca_type].add_relation(
2097 ee_id_1=from_vca_ee_id,
2098 ee_id_2=to_vca_ee_id,
2099 endpoint_1=from_vca_endpoint,
2100 endpoint_2=to_vca_endpoint)
2101 # remove entry from relations list
2102 ns_relations.remove(r)
2103 else:
2104 # check failed peers
2105 try:
2106 vca_status_list = db_nsr.get('configurationStatus')
2107 if vca_status_list:
2108 for i in range(len(vca_list)):
2109 vca = vca_list[i]
2110 vca_status = vca_status_list[i]
2111 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2112 if vca_status.get('status') == 'BROKEN':
2113 # peer broken: remove relation from list
2114 ns_relations.remove(r)
2115 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2116 if vca_status.get('status') == 'BROKEN':
2117 # peer broken: remove relation from list
2118 ns_relations.remove(r)
2119 except Exception:
2120 # ignore
2121 pass
2122
2123 # for each defined VNF relation, find the VCA's related
2124 for r in vnf_relations.copy():
2125 from_vca_ee_id = None
2126 to_vca_ee_id = None
2127 from_vca_endpoint = None
2128 to_vca_endpoint = None
2129 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2130 for vca in vca_list:
2131 key_to_check = "vdu_id"
2132 if vca.get("vdu_id") is None:
2133 key_to_check = "vnfd_id"
2134 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2135 from_vca_ee_id = vca.get('ee_id')
2136 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2137 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2138 to_vca_ee_id = vca.get('ee_id')
2139 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2140 if from_vca_ee_id and to_vca_ee_id:
2141 # add relation
2142 await self.vca_map[vca_type].add_relation(
2143 ee_id_1=from_vca_ee_id,
2144 ee_id_2=to_vca_ee_id,
2145 endpoint_1=from_vca_endpoint,
2146 endpoint_2=to_vca_endpoint)
2147 # remove entry from relations list
2148 vnf_relations.remove(r)
2149 else:
2150 # check failed peers
2151 try:
2152 vca_status_list = db_nsr.get('configurationStatus')
2153 if vca_status_list:
2154 for i in range(len(vca_list)):
2155 vca = vca_list[i]
2156 vca_status = vca_status_list[i]
2157 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2158 if vca_status.get('status') == 'BROKEN':
2159 # peer broken: remove relation from list
2160 vnf_relations.remove(r)
2161 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2162 if vca_status.get('status') == 'BROKEN':
2163 # peer broken: remove relation from list
2164 vnf_relations.remove(r)
2165 except Exception:
2166 # ignore
2167 pass
2168
2169 # wait for next try
2170 await asyncio.sleep(5.0)
2171
2172 if not ns_relations and not vnf_relations:
2173 self.logger.debug('Relations added')
2174 break
2175
2176 return True
2177
2178 except Exception as e:
2179 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2180 return False
2181
2182 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2183 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2184
2185 try:
2186 k8sclustertype = k8s_instance_info["k8scluster-type"]
2187 # Instantiate kdu
2188 db_dict_install = {"collection": "nsrs",
2189 "filter": {"_id": nsr_id},
2190 "path": nsr_db_path}
2191
2192 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2193 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2194 kdu_model=k8s_instance_info["kdu-model"],
2195 atomic=True,
2196 params=k8params,
2197 db_dict=db_dict_install,
2198 timeout=timeout,
2199 kdu_name=k8s_instance_info["kdu-name"],
2200 namespace=k8s_instance_info["namespace"])
2201 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2202
2203 # Obtain services to obtain management service ip
2204 services = await self.k8scluster_map[k8sclustertype].get_services(
2205 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2206 kdu_instance=kdu_instance,
2207 namespace=k8s_instance_info["namespace"])
2208
2209 # Obtain management service info (if exists)
2210 vnfr_update_dict = {}
2211 if services:
2212 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2213 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2214 for mgmt_service in mgmt_services:
2215 for service in services:
2216 if service["name"].startswith(mgmt_service["name"]):
2217 # Mgmt service found, Obtain service ip
2218 ip = service.get("external_ip", service.get("cluster_ip"))
2219 if isinstance(ip, list) and len(ip) == 1:
2220 ip = ip[0]
2221
2222 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2223
2224 # Check if must update also mgmt ip at the vnf
2225 service_external_cp = mgmt_service.get("external-connection-point-ref")
2226 if service_external_cp:
2227 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2228 vnfr_update_dict["ip-address"] = ip
2229
2230 break
2231 else:
2232 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2233
2234 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2235 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2236
2237 kdu_config = kdud.get("kdu-configuration")
2238 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2239 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2240 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2241
2242 for initial_config_primitive in initial_config_primitive_list:
2243 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2244
2245 await asyncio.wait_for(
2246 self.k8scluster_map[k8sclustertype].exec_primitive(
2247 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2248 kdu_instance=kdu_instance,
2249 primitive_name=initial_config_primitive["name"],
2250 params=primitive_params_, db_dict={}),
2251 timeout=timeout)
2252
2253 except Exception as e:
2254 # Prepare update db with error and raise exception
2255 try:
2256 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2257 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2258 except Exception:
2259 # ignore to keep original exception
2260 pass
2261 # reraise original error
2262 raise
2263
2264 return kdu_instance
2265
2266 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2267 # Launch kdus if present in the descriptor
2268
2269 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2270
2271 async def _get_cluster_id(cluster_id, cluster_type):
2272 nonlocal k8scluster_id_2_uuic
2273 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2274 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2275
2276 # check if K8scluster is creating and wait look if previous tasks in process
2277 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2278 if task_dependency:
2279 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2280 self.logger.debug(logging_text + text)
2281 await asyncio.wait(task_dependency, timeout=3600)
2282
2283 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2284 if not db_k8scluster:
2285 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2286
2287 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2288 if not k8s_id:
2289 if cluster_type == "helm-chart-v3":
2290 try:
2291 # backward compatibility for existing clusters that have not been initialized for helm v3
2292 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2293 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2294 reuse_cluster_uuid=cluster_id)
2295 db_k8scluster_update = {}
2296 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2297 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2298 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2299 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2300 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2301 except Exception as e:
2302 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2303 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2304 cluster_type))
2305 else:
2306 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2307 format(cluster_id, cluster_type))
2308 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2309 return k8s_id
2310
2311 logging_text += "Deploy kdus: "
2312 step = ""
2313 try:
2314 db_nsr_update = {"_admin.deployed.K8s": []}
2315 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2316
2317 index = 0
2318 updated_cluster_list = []
2319 updated_v3_cluster_list = []
2320
2321 for vnfr_data in db_vnfrs.values():
2322 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2323 # Step 0: Prepare and set parameters
2324 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2325 vnfd_id = vnfr_data.get('vnfd-id')
2326 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2327 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2328 namespace = kdur.get("k8s-namespace")
2329 if kdur.get("helm-chart"):
2330 kdumodel = kdur["helm-chart"]
2331 # Default version: helm3, if helm-version is v2 assign v2
2332 k8sclustertype = "helm-chart-v3"
2333 self.logger.debug("kdur: {}".format(kdur))
2334 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2335 k8sclustertype = "helm-chart"
2336 elif kdur.get("juju-bundle"):
2337 kdumodel = kdur["juju-bundle"]
2338 k8sclustertype = "juju-bundle"
2339 else:
2340 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2341 "juju-bundle. Maybe an old NBI version is running".
2342 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2343 # check if kdumodel is a file and exists
2344 try:
2345 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2346 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2347 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2348 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2349 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2350 kdumodel)
2351 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2352 kdumodel = self.fs.path + filename
2353 except (asyncio.TimeoutError, asyncio.CancelledError):
2354 raise
2355 except Exception: # it is not a file
2356 pass
2357
2358 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2359 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2360 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2361
2362 # Synchronize repos
2363 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2364 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2365 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2366 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2367 if del_repo_list or added_repo_dict:
2368 if k8sclustertype == "helm-chart":
2369 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2370 updated = {'_admin.helm_charts_added.' +
2371 item: name for item, name in added_repo_dict.items()}
2372 updated_cluster_list.append(cluster_uuid)
2373 elif k8sclustertype == "helm-chart-v3":
2374 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2375 updated = {'_admin.helm_charts_v3_added.' +
2376 item: name for item, name in added_repo_dict.items()}
2377 updated_v3_cluster_list.append(cluster_uuid)
2378 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2379 "'{}' to_delete: {}, to_add: {}".
2380 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2381 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2382
2383 # Instantiate kdu
2384 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2385 kdur["kdu-name"], k8s_cluster_id)
2386 k8s_instance_info = {"kdu-instance": None,
2387 "k8scluster-uuid": cluster_uuid,
2388 "k8scluster-type": k8sclustertype,
2389 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2390 "kdu-name": kdur["kdu-name"],
2391 "kdu-model": kdumodel,
2392 "namespace": namespace}
2393 db_path = "_admin.deployed.K8s.{}".format(index)
2394 db_nsr_update[db_path] = k8s_instance_info
2395 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2396 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2397 task = asyncio.ensure_future(
2398 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2399 k8s_instance_info, k8params=desc_params, timeout=600))
2400 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2401 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2402
2403 index += 1
2404
2405 except (LcmException, asyncio.CancelledError):
2406 raise
2407 except Exception as e:
2408 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2409 if isinstance(e, (N2VCException, DbException)):
2410 self.logger.error(logging_text + msg)
2411 else:
2412 self.logger.critical(logging_text + msg, exc_info=True)
2413 raise LcmException(msg)
2414 finally:
2415 if db_nsr_update:
2416 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2417
2418 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2419 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2420 base_folder, task_instantiation_info, stage):
2421 # launch instantiate_N2VC in a asyncio task and register task object
2422 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2423 # if not found, create one entry and update database
2424 # fill db_nsr._admin.deployed.VCA.<index>
2425
2426 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2427 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2428 ee_list = [descriptor_config]
2429 elif descriptor_config.get("execution-environment-list"):
2430 ee_list = descriptor_config.get("execution-environment-list")
2431 else: # other types as script are not supported
2432 ee_list = []
2433
2434 for ee_item in ee_list:
2435 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2436 ee_item.get("helm-chart")))
2437 ee_descriptor_id = ee_item.get("id")
2438 if ee_item.get("juju"):
2439 vca_name = ee_item['juju'].get('charm')
2440 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2441 if ee_item['juju'].get('cloud') == "k8s":
2442 vca_type = "k8s_proxy_charm"
2443 elif ee_item['juju'].get('proxy') is False:
2444 vca_type = "native_charm"
2445 elif ee_item.get("helm-chart"):
2446 vca_name = ee_item['helm-chart']
2447 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2448 vca_type = "helm"
2449 else:
2450 vca_type = "helm-v3"
2451 else:
2452 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2453 continue
2454
2455 vca_index = -1
2456 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2457 if not vca_deployed:
2458 continue
2459 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2460 vca_deployed.get("vdu_id") == vdu_id and \
2461 vca_deployed.get("kdu_name") == kdu_name and \
2462 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2463 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2464 break
2465 else:
2466 # not found, create one.
2467 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2468 if vdu_id:
2469 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2470 elif kdu_name:
2471 target += "/kdu/{}".format(kdu_name)
2472 vca_deployed = {
2473 "target_element": target,
2474 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2475 "member-vnf-index": member_vnf_index,
2476 "vdu_id": vdu_id,
2477 "kdu_name": kdu_name,
2478 "vdu_count_index": vdu_index,
2479 "operational-status": "init", # TODO revise
2480 "detailed-status": "", # TODO revise
2481 "step": "initial-deploy", # TODO revise
2482 "vnfd_id": vnfd_id,
2483 "vdu_name": vdu_name,
2484 "type": vca_type,
2485 "ee_descriptor_id": ee_descriptor_id
2486 }
2487 vca_index += 1
2488
2489 # create VCA and configurationStatus in db
2490 db_dict = {
2491 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2492 "configurationStatus.{}".format(vca_index): dict()
2493 }
2494 self.update_db_2("nsrs", nsr_id, db_dict)
2495
2496 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2497
2498 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2499 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2500 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2501
2502 # Launch task
2503 task_n2vc = asyncio.ensure_future(
2504 self.instantiate_N2VC(
2505 logging_text=logging_text,
2506 vca_index=vca_index,
2507 nsi_id=nsi_id,
2508 db_nsr=db_nsr,
2509 db_vnfr=db_vnfr,
2510 vdu_id=vdu_id,
2511 kdu_name=kdu_name,
2512 vdu_index=vdu_index,
2513 deploy_params=deploy_params,
2514 config_descriptor=descriptor_config,
2515 base_folder=base_folder,
2516 nslcmop_id=nslcmop_id,
2517 stage=stage,
2518 vca_type=vca_type,
2519 vca_name=vca_name,
2520 ee_config_descriptor=ee_item
2521 )
2522 )
2523 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2524 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2525 member_vnf_index or "", vdu_id or "")
2526
2527 @staticmethod
2528 def _create_nslcmop(nsr_id, operation, params):
2529 """
2530 Creates a ns-lcm-opp content to be stored at database.
2531 :param nsr_id: internal id of the instance
2532 :param operation: instantiate, terminate, scale, action, ...
2533 :param params: user parameters for the operation
2534 :return: dictionary following SOL005 format
2535 """
2536 # Raise exception if invalid arguments
2537 if not (nsr_id and operation and params):
2538 raise LcmException(
2539 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2540 now = time()
2541 _id = str(uuid4())
2542 nslcmop = {
2543 "id": _id,
2544 "_id": _id,
2545 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2546 "operationState": "PROCESSING",
2547 "statusEnteredTime": now,
2548 "nsInstanceId": nsr_id,
2549 "lcmOperationType": operation,
2550 "startTime": now,
2551 "isAutomaticInvocation": False,
2552 "operationParams": params,
2553 "isCancelPending": False,
2554 "links": {
2555 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2556 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2557 }
2558 }
2559 return nslcmop
2560
2561 def _format_additional_params(self, params):
2562 params = params or {}
2563 for key, value in params.items():
2564 if str(value).startswith("!!yaml "):
2565 params[key] = yaml.safe_load(value[7:])
2566 return params
2567
2568 def _get_terminate_primitive_params(self, seq, vnf_index):
2569 primitive = seq.get('name')
2570 primitive_params = {}
2571 params = {
2572 "member_vnf_index": vnf_index,
2573 "primitive": primitive,
2574 "primitive_params": primitive_params,
2575 }
2576 desc_params = {}
2577 return self._map_primitive_params(seq, params, desc_params)
2578
2579 # sub-operations
2580
2581 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2582 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2583 if op.get('operationState') == 'COMPLETED':
2584 # b. Skip sub-operation
2585 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2586 return self.SUBOPERATION_STATUS_SKIP
2587 else:
2588 # c. retry executing sub-operation
2589 # The sub-operation exists, and operationState != 'COMPLETED'
2590 # Update operationState = 'PROCESSING' to indicate a retry.
2591 operationState = 'PROCESSING'
2592 detailed_status = 'In progress'
2593 self._update_suboperation_status(
2594 db_nslcmop, op_index, operationState, detailed_status)
2595 # Return the sub-operation index
2596 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2597 # with arguments extracted from the sub-operation
2598 return op_index
2599
2600 # Find a sub-operation where all keys in a matching dictionary must match
2601 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2602 def _find_suboperation(self, db_nslcmop, match):
2603 if db_nslcmop and match:
2604 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2605 for i, op in enumerate(op_list):
2606 if all(op.get(k) == match[k] for k in match):
2607 return i
2608 return self.SUBOPERATION_STATUS_NOT_FOUND
2609
2610 # Update status for a sub-operation given its index
2611 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2612 # Update DB for HA tasks
2613 q_filter = {'_id': db_nslcmop['_id']}
2614 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2615 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2616 self.db.set_one("nslcmops",
2617 q_filter=q_filter,
2618 update_dict=update_dict,
2619 fail_on_empty=False)
2620
2621 # Add sub-operation, return the index of the added sub-operation
2622 # Optionally, set operationState, detailed-status, and operationType
2623 # Status and type are currently set for 'scale' sub-operations:
2624 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2625 # 'detailed-status' : status message
2626 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2627 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2628 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2629 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2630 RO_nsr_id=None, RO_scaling_info=None):
2631 if not db_nslcmop:
2632 return self.SUBOPERATION_STATUS_NOT_FOUND
2633 # Get the "_admin.operations" list, if it exists
2634 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2635 op_list = db_nslcmop_admin.get('operations')
2636 # Create or append to the "_admin.operations" list
2637 new_op = {'member_vnf_index': vnf_index,
2638 'vdu_id': vdu_id,
2639 'vdu_count_index': vdu_count_index,
2640 'primitive': primitive,
2641 'primitive_params': mapped_primitive_params}
2642 if operationState:
2643 new_op['operationState'] = operationState
2644 if detailed_status:
2645 new_op['detailed-status'] = detailed_status
2646 if operationType:
2647 new_op['lcmOperationType'] = operationType
2648 if RO_nsr_id:
2649 new_op['RO_nsr_id'] = RO_nsr_id
2650 if RO_scaling_info:
2651 new_op['RO_scaling_info'] = RO_scaling_info
2652 if not op_list:
2653 # No existing operations, create key 'operations' with current operation as first list element
2654 db_nslcmop_admin.update({'operations': [new_op]})
2655 op_list = db_nslcmop_admin.get('operations')
2656 else:
2657 # Existing operations, append operation to list
2658 op_list.append(new_op)
2659
2660 db_nslcmop_update = {'_admin.operations': op_list}
2661 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2662 op_index = len(op_list) - 1
2663 return op_index
2664
2665 # Helper methods for scale() sub-operations
2666
2667 # pre-scale/post-scale:
2668 # Check for 3 different cases:
2669 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2670 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2671 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2672 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2673 operationType, RO_nsr_id=None, RO_scaling_info=None):
2674 # Find this sub-operation
2675 if RO_nsr_id and RO_scaling_info:
2676 operationType = 'SCALE-RO'
2677 match = {
2678 'member_vnf_index': vnf_index,
2679 'RO_nsr_id': RO_nsr_id,
2680 'RO_scaling_info': RO_scaling_info,
2681 }
2682 else:
2683 match = {
2684 'member_vnf_index': vnf_index,
2685 'primitive': vnf_config_primitive,
2686 'primitive_params': primitive_params,
2687 'lcmOperationType': operationType
2688 }
2689 op_index = self._find_suboperation(db_nslcmop, match)
2690 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2691 # a. New sub-operation
2692 # The sub-operation does not exist, add it.
2693 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2694 # The following parameters are set to None for all kind of scaling:
2695 vdu_id = None
2696 vdu_count_index = None
2697 vdu_name = None
2698 if RO_nsr_id and RO_scaling_info:
2699 vnf_config_primitive = None
2700 primitive_params = None
2701 else:
2702 RO_nsr_id = None
2703 RO_scaling_info = None
2704 # Initial status for sub-operation
2705 operationState = 'PROCESSING'
2706 detailed_status = 'In progress'
2707 # Add sub-operation for pre/post-scaling (zero or more operations)
2708 self._add_suboperation(db_nslcmop,
2709 vnf_index,
2710 vdu_id,
2711 vdu_count_index,
2712 vdu_name,
2713 vnf_config_primitive,
2714 primitive_params,
2715 operationState,
2716 detailed_status,
2717 operationType,
2718 RO_nsr_id,
2719 RO_scaling_info)
2720 return self.SUBOPERATION_STATUS_NEW
2721 else:
2722 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2723 # or op_index (operationState != 'COMPLETED')
2724 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2725
2726 # Function to return execution_environment id
2727
2728 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2729 # TODO vdu_index_count
2730 for vca in vca_deployed_list:
2731 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2732 return vca["ee_id"]
2733
2734 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2735 vca_index, destroy_ee=True, exec_primitives=True):
2736 """
2737 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2738 :param logging_text:
2739 :param db_nslcmop:
2740 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2741 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2742 :param vca_index: index in the database _admin.deployed.VCA
2743 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2744 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2745 not executed properly
2746 :return: None or exception
2747 """
2748
2749 self.logger.debug(
2750 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2751 vca_index, vca_deployed, config_descriptor, destroy_ee
2752 )
2753 )
2754
2755 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2756
2757 # execute terminate_primitives
2758 if exec_primitives:
2759 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2760 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2761 vdu_id = vca_deployed.get("vdu_id")
2762 vdu_count_index = vca_deployed.get("vdu_count_index")
2763 vdu_name = vca_deployed.get("vdu_name")
2764 vnf_index = vca_deployed.get("member-vnf-index")
2765 if terminate_primitives and vca_deployed.get("needed_terminate"):
2766 for seq in terminate_primitives:
2767 # For each sequence in list, get primitive and call _ns_execute_primitive()
2768 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2769 vnf_index, seq.get("name"))
2770 self.logger.debug(logging_text + step)
2771 # Create the primitive for each sequence, i.e. "primitive": "touch"
2772 primitive = seq.get('name')
2773 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2774
2775 # Add sub-operation
2776 self._add_suboperation(db_nslcmop,
2777 vnf_index,
2778 vdu_id,
2779 vdu_count_index,
2780 vdu_name,
2781 primitive,
2782 mapped_primitive_params)
2783 # Sub-operations: Call _ns_execute_primitive() instead of action()
2784 try:
2785 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2786 mapped_primitive_params,
2787 vca_type=vca_type)
2788 except LcmException:
2789 # this happens when VCA is not deployed. In this case it is not needed to terminate
2790 continue
2791 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2792 if result not in result_ok:
2793 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2794 "error {}".format(seq.get("name"), vnf_index, result_detail))
2795 # set that this VCA do not need terminated
2796 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2797 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2798
2799 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2800 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2801
2802 if destroy_ee:
2803 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
2804
2805 async def _delete_all_N2VC(self, db_nsr: dict):
2806 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2807 namespace = "." + db_nsr["_id"]
2808 try:
2809 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2810 except N2VCNotFound: # already deleted. Skip
2811 pass
2812 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2813
2814 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2815 """
2816 Terminates a deployment from RO
2817 :param logging_text:
2818 :param nsr_deployed: db_nsr._admin.deployed
2819 :param nsr_id:
2820 :param nslcmop_id:
2821 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2822 this method will update only the index 2, but it will write on database the concatenated content of the list
2823 :return:
2824 """
2825 db_nsr_update = {}
2826 failed_detail = []
2827 ro_nsr_id = ro_delete_action = None
2828 if nsr_deployed and nsr_deployed.get("RO"):
2829 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2830 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2831 try:
2832 if ro_nsr_id:
2833 stage[2] = "Deleting ns from VIM."
2834 db_nsr_update["detailed-status"] = " ".join(stage)
2835 self._write_op_status(nslcmop_id, stage)
2836 self.logger.debug(logging_text + stage[2])
2837 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2838 self._write_op_status(nslcmop_id, stage)
2839 desc = await self.RO.delete("ns", ro_nsr_id)
2840 ro_delete_action = desc["action_id"]
2841 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2842 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2843 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2844 if ro_delete_action:
2845 # wait until NS is deleted from VIM
2846 stage[2] = "Waiting ns deleted from VIM."
2847 detailed_status_old = None
2848 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2849 ro_delete_action))
2850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2851 self._write_op_status(nslcmop_id, stage)
2852
2853 delete_timeout = 20 * 60 # 20 minutes
2854 while delete_timeout > 0:
2855 desc = await self.RO.show(
2856 "ns",
2857 item_id_name=ro_nsr_id,
2858 extra_item="action",
2859 extra_item_id=ro_delete_action)
2860
2861 # deploymentStatus
2862 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2863
2864 ns_status, ns_status_info = self.RO.check_action_status(desc)
2865 if ns_status == "ERROR":
2866 raise ROclient.ROClientException(ns_status_info)
2867 elif ns_status == "BUILD":
2868 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2869 elif ns_status == "ACTIVE":
2870 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2871 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2872 break
2873 else:
2874 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2875 if stage[2] != detailed_status_old:
2876 detailed_status_old = stage[2]
2877 db_nsr_update["detailed-status"] = " ".join(stage)
2878 self._write_op_status(nslcmop_id, stage)
2879 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2880 await asyncio.sleep(5, loop=self.loop)
2881 delete_timeout -= 5
2882 else: # delete_timeout <= 0:
2883 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2884
2885 except Exception as e:
2886 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2887 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2888 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2889 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2890 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2891 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2892 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2893 failed_detail.append("delete conflict: {}".format(e))
2894 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2895 else:
2896 failed_detail.append("delete error: {}".format(e))
2897 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2898
2899 # Delete nsd
2900 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2901 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2902 try:
2903 stage[2] = "Deleting nsd from RO."
2904 db_nsr_update["detailed-status"] = " ".join(stage)
2905 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2906 self._write_op_status(nslcmop_id, stage)
2907 await self.RO.delete("nsd", ro_nsd_id)
2908 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2909 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2910 except Exception as e:
2911 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2912 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2913 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2914 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2915 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2916 self.logger.debug(logging_text + failed_detail[-1])
2917 else:
2918 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2919 self.logger.error(logging_text + failed_detail[-1])
2920
2921 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2922 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2923 if not vnf_deployed or not vnf_deployed["id"]:
2924 continue
2925 try:
2926 ro_vnfd_id = vnf_deployed["id"]
2927 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2928 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2929 db_nsr_update["detailed-status"] = " ".join(stage)
2930 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2931 self._write_op_status(nslcmop_id, stage)
2932 await self.RO.delete("vnfd", ro_vnfd_id)
2933 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2934 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2935 except Exception as e:
2936 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2937 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2938 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2939 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2940 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2941 self.logger.debug(logging_text + failed_detail[-1])
2942 else:
2943 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2944 self.logger.error(logging_text + failed_detail[-1])
2945
2946 if failed_detail:
2947 stage[2] = "Error deleting from VIM"
2948 else:
2949 stage[2] = "Deleted from VIM"
2950 db_nsr_update["detailed-status"] = " ".join(stage)
2951 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2952 self._write_op_status(nslcmop_id, stage)
2953
2954 if failed_detail:
2955 raise LcmException("; ".join(failed_detail))
2956
2957 async def terminate(self, nsr_id, nslcmop_id):
2958 # Try to lock HA task here
2959 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2960 if not task_is_locked_by_me:
2961 return
2962
2963 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2964 self.logger.debug(logging_text + "Enter")
2965 timeout_ns_terminate = self.timeout_ns_terminate
2966 db_nsr = None
2967 db_nslcmop = None
2968 operation_params = None
2969 exc = None
2970 error_list = [] # annotates all failed error messages
2971 db_nslcmop_update = {}
2972 autoremove = False # autoremove after terminated
2973 tasks_dict_info = {}
2974 db_nsr_update = {}
2975 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2976 # ^ contains [stage, step, VIM-status]
2977 try:
2978 # wait for any previous tasks in process
2979 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
2980
2981 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2982 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2983 operation_params = db_nslcmop.get("operationParams") or {}
2984 if operation_params.get("timeout_ns_terminate"):
2985 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
2986 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2987 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2988
2989 db_nsr_update["operational-status"] = "terminating"
2990 db_nsr_update["config-status"] = "terminating"
2991 self._write_ns_status(
2992 nsr_id=nsr_id,
2993 ns_state="TERMINATING",
2994 current_operation="TERMINATING",
2995 current_operation_id=nslcmop_id,
2996 other_update=db_nsr_update
2997 )
2998 self._write_op_status(
2999 op_id=nslcmop_id,
3000 queuePosition=0,
3001 stage=stage
3002 )
3003 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3004 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3005 return
3006
3007 stage[1] = "Getting vnf descriptors from db."
3008 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3009 db_vnfds_from_id = {}
3010 db_vnfds_from_member_index = {}
3011 # Loop over VNFRs
3012 for vnfr in db_vnfrs_list:
3013 vnfd_id = vnfr["vnfd-id"]
3014 if vnfd_id not in db_vnfds_from_id:
3015 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3016 db_vnfds_from_id[vnfd_id] = vnfd
3017 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3018
3019 # Destroy individual execution environments when there are terminating primitives.
3020 # Rest of EE will be deleted at once
3021 # TODO - check before calling _destroy_N2VC
3022 # if not operation_params.get("skip_terminate_primitives"):#
3023 # or not vca.get("needed_terminate"):
3024 stage[0] = "Stage 2/3 execute terminating primitives."
3025 self.logger.debug(logging_text + stage[0])
3026 stage[1] = "Looking execution environment that needs terminate."
3027 self.logger.debug(logging_text + stage[1])
3028
3029 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3030 config_descriptor = None
3031 if not vca or not vca.get("ee_id"):
3032 continue
3033 if not vca.get("member-vnf-index"):
3034 # ns
3035 config_descriptor = db_nsr.get("ns-configuration")
3036 elif vca.get("vdu_id"):
3037 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3038 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3039 elif vca.get("kdu_name"):
3040 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3041 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3042 else:
3043 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3044 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3045 vca_type = vca.get("type")
3046 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3047 vca.get("needed_terminate"))
3048 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3049 # pending native charms
3050 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3051 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3052 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3053 task = asyncio.ensure_future(
3054 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3055 destroy_ee, exec_terminate_primitives))
3056 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3057
3058 # wait for pending tasks of terminate primitives
3059 if tasks_dict_info:
3060 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3061 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3062 min(self.timeout_charm_delete, timeout_ns_terminate),
3063 stage, nslcmop_id)
3064 tasks_dict_info.clear()
3065 if error_list:
3066 return # raise LcmException("; ".join(error_list))
3067
3068 # remove All execution environments at once
3069 stage[0] = "Stage 3/3 delete all."
3070
3071 if nsr_deployed.get("VCA"):
3072 stage[1] = "Deleting all execution environments."
3073 self.logger.debug(logging_text + stage[1])
3074 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3075 timeout=self.timeout_charm_delete))
3076 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3077 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3078
3079 # Delete from k8scluster
3080 stage[1] = "Deleting KDUs."
3081 self.logger.debug(logging_text + stage[1])
3082 # print(nsr_deployed)
3083 for kdu in get_iterable(nsr_deployed, "K8s"):
3084 if not kdu or not kdu.get("kdu-instance"):
3085 continue
3086 kdu_instance = kdu.get("kdu-instance")
3087 if kdu.get("k8scluster-type") in self.k8scluster_map:
3088 task_delete_kdu_instance = asyncio.ensure_future(
3089 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3090 cluster_uuid=kdu.get("k8scluster-uuid"),
3091 kdu_instance=kdu_instance))
3092 else:
3093 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3094 format(kdu.get("k8scluster-type")))
3095 continue
3096 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3097
3098 # remove from RO
3099 stage[1] = "Deleting ns from VIM."
3100 if self.ng_ro:
3101 task_delete_ro = asyncio.ensure_future(
3102 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3103 else:
3104 task_delete_ro = asyncio.ensure_future(
3105 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3106 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3107
3108 # rest of staff will be done at finally
3109
3110 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3111 self.logger.error(logging_text + "Exit Exception {}".format(e))
3112 exc = e
3113 except asyncio.CancelledError:
3114 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3115 exc = "Operation was cancelled"
3116 except Exception as e:
3117 exc = traceback.format_exc()
3118 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3119 finally:
3120 if exc:
3121 error_list.append(str(exc))
3122 try:
3123 # wait for pending tasks
3124 if tasks_dict_info:
3125 stage[1] = "Waiting for terminate pending tasks."
3126 self.logger.debug(logging_text + stage[1])
3127 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3128 stage, nslcmop_id)
3129 stage[1] = stage[2] = ""
3130 except asyncio.CancelledError:
3131 error_list.append("Cancelled")
3132 # TODO cancell all tasks
3133 except Exception as exc:
3134 error_list.append(str(exc))
3135 # update status at database
3136 if error_list:
3137 error_detail = "; ".join(error_list)
3138 # self.logger.error(logging_text + error_detail)
3139 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3140 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3141
3142 db_nsr_update["operational-status"] = "failed"
3143 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3144 db_nslcmop_update["detailed-status"] = error_detail
3145 nslcmop_operation_state = "FAILED"
3146 ns_state = "BROKEN"
3147 else:
3148 error_detail = None
3149 error_description_nsr = error_description_nslcmop = None
3150 ns_state = "NOT_INSTANTIATED"
3151 db_nsr_update["operational-status"] = "terminated"
3152 db_nsr_update["detailed-status"] = "Done"
3153 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3154 db_nslcmop_update["detailed-status"] = "Done"
3155 nslcmop_operation_state = "COMPLETED"
3156
3157 if db_nsr:
3158 self._write_ns_status(
3159 nsr_id=nsr_id,
3160 ns_state=ns_state,
3161 current_operation="IDLE",
3162 current_operation_id=None,
3163 error_description=error_description_nsr,
3164 error_detail=error_detail,
3165 other_update=db_nsr_update
3166 )
3167 self._write_op_status(
3168 op_id=nslcmop_id,
3169 stage="",
3170 error_message=error_description_nslcmop,
3171 operation_state=nslcmop_operation_state,
3172 other_update=db_nslcmop_update,
3173 )
3174 if ns_state == "NOT_INSTANTIATED":
3175 try:
3176 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3177 except DbException as e:
3178 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3179 format(nsr_id, e))
3180 if operation_params:
3181 autoremove = operation_params.get("autoremove", False)
3182 if nslcmop_operation_state:
3183 try:
3184 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3185 "operationState": nslcmop_operation_state,
3186 "autoremove": autoremove},
3187 loop=self.loop)
3188 except Exception as e:
3189 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3190
3191 self.logger.debug(logging_text + "Exit")
3192 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3193
3194 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3195 time_start = time()
3196 error_detail_list = []
3197 error_list = []
3198 pending_tasks = list(created_tasks_info.keys())
3199 num_tasks = len(pending_tasks)
3200 num_done = 0
3201 stage[1] = "{}/{}.".format(num_done, num_tasks)
3202 self._write_op_status(nslcmop_id, stage)
3203 while pending_tasks:
3204 new_error = None
3205 _timeout = timeout + time_start - time()
3206 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3207 return_when=asyncio.FIRST_COMPLETED)
3208 num_done += len(done)
3209 if not done: # Timeout
3210 for task in pending_tasks:
3211 new_error = created_tasks_info[task] + ": Timeout"
3212 error_detail_list.append(new_error)
3213 error_list.append(new_error)
3214 break
3215 for task in done:
3216 if task.cancelled():
3217 exc = "Cancelled"
3218 else:
3219 exc = task.exception()
3220 if exc:
3221 if isinstance(exc, asyncio.TimeoutError):
3222 exc = "Timeout"
3223 new_error = created_tasks_info[task] + ": {}".format(exc)
3224 error_list.append(created_tasks_info[task])
3225 error_detail_list.append(new_error)
3226 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3227 K8sException, NgRoException)):
3228 self.logger.error(logging_text + new_error)
3229 else:
3230 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3231 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3232 else:
3233 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3234 stage[1] = "{}/{}.".format(num_done, num_tasks)
3235 if new_error:
3236 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3237 if nsr_id: # update also nsr
3238 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3239 "errorDetail": ". ".join(error_detail_list)})
3240 self._write_op_status(nslcmop_id, stage)
3241 return error_detail_list
3242
3243 @staticmethod
3244 def _map_primitive_params(primitive_desc, params, instantiation_params):
3245 """
3246 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3247 The default-value is used. If it is between < > it look for a value at instantiation_params
3248 :param primitive_desc: portion of VNFD/NSD that describes primitive
3249 :param params: Params provided by user
3250 :param instantiation_params: Instantiation params provided by user
3251 :return: a dictionary with the calculated params
3252 """
3253 calculated_params = {}
3254 for parameter in primitive_desc.get("parameter", ()):
3255 param_name = parameter["name"]
3256 if param_name in params:
3257 calculated_params[param_name] = params[param_name]
3258 elif "default-value" in parameter or "value" in parameter:
3259 if "value" in parameter:
3260 calculated_params[param_name] = parameter["value"]
3261 else:
3262 calculated_params[param_name] = parameter["default-value"]
3263 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3264 and calculated_params[param_name].endswith(">"):
3265 if calculated_params[param_name][1:-1] in instantiation_params:
3266 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3267 else:
3268 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3269 format(calculated_params[param_name], primitive_desc["name"]))
3270 else:
3271 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3272 format(param_name, primitive_desc["name"]))
3273
3274 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3275 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3276 default_flow_style=True, width=256)
3277 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3278 calculated_params[param_name] = calculated_params[param_name][7:]
3279 if parameter.get("data-type") == "INTEGER":
3280 try:
3281 calculated_params[param_name] = int(calculated_params[param_name])
3282 except ValueError: # error converting string to int
3283 raise LcmException(
3284 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3285 elif parameter.get("data-type") == "BOOLEAN":
3286 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3287
3288 # add always ns_config_info if primitive name is config
3289 if primitive_desc["name"] == "config":
3290 if "ns_config_info" in instantiation_params:
3291 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3292 return calculated_params
3293
3294 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3295 ee_descriptor_id=None):
3296 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3297 for vca in deployed_vca:
3298 if not vca:
3299 continue
3300 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3301 continue
3302 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3303 continue
3304 if kdu_name and kdu_name != vca["kdu_name"]:
3305 continue
3306 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3307 continue
3308 break
3309 else:
3310 # vca_deployed not found
3311 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3312 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3313 ee_descriptor_id))
3314 # get ee_id
3315 ee_id = vca.get("ee_id")
3316 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3317 if not ee_id:
3318 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3319 "execution environment"
3320 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3321 return ee_id, vca_type
3322
3323 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3324 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3325 try:
3326 if primitive == "config":
3327 primitive_params = {"params": primitive_params}
3328
3329 vca_type = vca_type or "lxc_proxy_charm"
3330
3331 while retries >= 0:
3332 try:
3333 output = await asyncio.wait_for(
3334 self.vca_map[vca_type].exec_primitive(
3335 ee_id=ee_id,
3336 primitive_name=primitive,
3337 params_dict=primitive_params,
3338 progress_timeout=self.timeout_progress_primitive,
3339 total_timeout=self.timeout_primitive,
3340 db_dict=db_dict),
3341 timeout=timeout or self.timeout_primitive)
3342 # execution was OK
3343 break
3344 except asyncio.CancelledError:
3345 raise
3346 except Exception as e: # asyncio.TimeoutError
3347 if isinstance(e, asyncio.TimeoutError):
3348 e = "Timeout"
3349 retries -= 1
3350 if retries >= 0:
3351 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3352 # wait and retry
3353 await asyncio.sleep(retries_interval, loop=self.loop)
3354 else:
3355 return 'FAILED', str(e)
3356
3357 return 'COMPLETED', output
3358
3359 except (LcmException, asyncio.CancelledError):
3360 raise
3361 except Exception as e:
3362 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3363
3364 async def action(self, nsr_id, nslcmop_id):
3365 # Try to lock HA task here
3366 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3367 if not task_is_locked_by_me:
3368 return
3369
3370 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3371 self.logger.debug(logging_text + "Enter")
3372 # get all needed from database
3373 db_nsr = None
3374 db_nslcmop = None
3375 db_nsr_update = {}
3376 db_nslcmop_update = {}
3377 nslcmop_operation_state = None
3378 error_description_nslcmop = None
3379 exc = None
3380 try:
3381 # wait for any previous tasks in process
3382 step = "Waiting for previous operations to terminate"
3383 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3384
3385 self._write_ns_status(
3386 nsr_id=nsr_id,
3387 ns_state=None,
3388 current_operation="RUNNING ACTION",
3389 current_operation_id=nslcmop_id
3390 )
3391
3392 step = "Getting information from database"
3393 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3394 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3395
3396 nsr_deployed = db_nsr["_admin"].get("deployed")
3397 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3398 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3399 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3400 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3401 primitive = db_nslcmop["operationParams"]["primitive"]
3402 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3403 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3404
3405 if vnf_index:
3406 step = "Getting vnfr from database"
3407 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3408 step = "Getting vnfd from database"
3409 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3410 else:
3411 step = "Getting nsd from database"
3412 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3413
3414 # for backward compatibility
3415 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3416 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3417 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3418 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3419
3420 # look for primitive
3421 config_primitive_desc = descriptor_configuration = None
3422 if vdu_id:
3423 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3424 elif kdu_name:
3425 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3426 elif vnf_index:
3427 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3428 else:
3429 descriptor_configuration = db_nsd.get("ns-configuration")
3430
3431 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3432 for config_primitive in descriptor_configuration["config-primitive"]:
3433 if config_primitive["name"] == primitive:
3434 config_primitive_desc = config_primitive
3435 break
3436
3437 if not config_primitive_desc:
3438 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3439 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3440 format(primitive))
3441 primitive_name = primitive
3442 ee_descriptor_id = None
3443 else:
3444 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3445 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3446
3447 if vnf_index:
3448 if vdu_id:
3449 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3450 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3451 elif kdu_name:
3452 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3453 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3454 else:
3455 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3456 else:
3457 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3458 if kdu_name and get_configuration(db_vnfd, kdu_name):
3459 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3460 actions = set()
3461 for primitive in kdu_configuration.get("initial-config-primitive", []):
3462 actions.add(primitive["name"])
3463 for primitive in kdu_configuration.get("config-primitive", []):
3464 actions.add(primitive["name"])
3465 kdu_action = True if primitive_name in actions else False
3466
3467 # TODO check if ns is in a proper status
3468 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3469 # kdur and desc_params already set from before
3470 if primitive_params:
3471 desc_params.update(primitive_params)
3472 # TODO Check if we will need something at vnf level
3473 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3474 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3475 break
3476 else:
3477 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3478
3479 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3480 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3481 raise LcmException(msg)
3482
3483 db_dict = {"collection": "nsrs",
3484 "filter": {"_id": nsr_id},
3485 "path": "_admin.deployed.K8s.{}".format(index)}
3486 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3487 step = "Executing kdu {}".format(primitive_name)
3488 if primitive_name == "upgrade":
3489 if desc_params.get("kdu_model"):
3490 kdu_model = desc_params.get("kdu_model")
3491 del desc_params["kdu_model"]
3492 else:
3493 kdu_model = kdu.get("kdu-model")
3494 parts = kdu_model.split(sep=":")
3495 if len(parts) == 2:
3496 kdu_model = parts[0]
3497
3498 detailed_status = await asyncio.wait_for(
3499 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3500 cluster_uuid=kdu.get("k8scluster-uuid"),
3501 kdu_instance=kdu.get("kdu-instance"),
3502 atomic=True, kdu_model=kdu_model,
3503 params=desc_params, db_dict=db_dict,
3504 timeout=timeout_ns_action),
3505 timeout=timeout_ns_action + 10)
3506 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3507 elif primitive_name == "rollback":
3508 detailed_status = await asyncio.wait_for(
3509 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3510 cluster_uuid=kdu.get("k8scluster-uuid"),
3511 kdu_instance=kdu.get("kdu-instance"),
3512 db_dict=db_dict),
3513 timeout=timeout_ns_action)
3514 elif primitive_name == "status":
3515 detailed_status = await asyncio.wait_for(
3516 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3517 cluster_uuid=kdu.get("k8scluster-uuid"),
3518 kdu_instance=kdu.get("kdu-instance")),
3519 timeout=timeout_ns_action)
3520 else:
3521 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3522 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3523
3524 detailed_status = await asyncio.wait_for(
3525 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3526 cluster_uuid=kdu.get("k8scluster-uuid"),
3527 kdu_instance=kdu_instance,
3528 primitive_name=primitive_name,
3529 params=params, db_dict=db_dict,
3530 timeout=timeout_ns_action),
3531 timeout=timeout_ns_action)
3532
3533 if detailed_status:
3534 nslcmop_operation_state = 'COMPLETED'
3535 else:
3536 detailed_status = ''
3537 nslcmop_operation_state = 'FAILED'
3538 else:
3539 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3540 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3541 ee_descriptor_id=ee_descriptor_id)
3542 db_nslcmop_notif = {"collection": "nslcmops",
3543 "filter": {"_id": nslcmop_id},
3544 "path": "admin.VCA"}
3545 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3546 ee_id,
3547 primitive=primitive_name,
3548 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3549 timeout=timeout_ns_action,
3550 vca_type=vca_type,
3551 db_dict=db_nslcmop_notif)
3552
3553 db_nslcmop_update["detailed-status"] = detailed_status
3554 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3555 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3556 detailed_status))
3557 return # database update is called inside finally
3558
3559 except (DbException, LcmException, N2VCException, K8sException) as e:
3560 self.logger.error(logging_text + "Exit Exception {}".format(e))
3561 exc = e
3562 except asyncio.CancelledError:
3563 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3564 exc = "Operation was cancelled"
3565 except asyncio.TimeoutError:
3566 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3567 exc = "Timeout"
3568 except Exception as e:
3569 exc = traceback.format_exc()
3570 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3571 finally:
3572 if exc:
3573 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3574 "FAILED {}: {}".format(step, exc)
3575 nslcmop_operation_state = "FAILED"
3576 if db_nsr:
3577 self._write_ns_status(
3578 nsr_id=nsr_id,
3579 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3580 current_operation="IDLE",
3581 current_operation_id=None,
3582 # error_description=error_description_nsr,
3583 # error_detail=error_detail,
3584 other_update=db_nsr_update
3585 )
3586
3587 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3588 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3589
3590 if nslcmop_operation_state:
3591 try:
3592 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3593 "operationState": nslcmop_operation_state},
3594 loop=self.loop)
3595 except Exception as e:
3596 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3597 self.logger.debug(logging_text + "Exit")
3598 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3599 return nslcmop_operation_state, detailed_status
3600
3601 async def scale(self, nsr_id, nslcmop_id):
3602 # Try to lock HA task here
3603 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3604 if not task_is_locked_by_me:
3605 return
3606
3607 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3608 stage = ['', '', '']
3609 # ^ stage, step, VIM progress
3610 self.logger.debug(logging_text + "Enter")
3611 # get all needed from database
3612 db_nsr = None
3613 db_nslcmop_update = {}
3614 db_nsr_update = {}
3615 exc = None
3616 # in case of error, indicates what part of scale was failed to put nsr at error status
3617 scale_process = None
3618 old_operational_status = ""
3619 old_config_status = ""
3620 try:
3621 # wait for any previous tasks in process
3622 step = "Waiting for previous operations to terminate"
3623 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3624 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3625 current_operation="SCALING", current_operation_id=nslcmop_id)
3626
3627 step = "Getting nslcmop from database"
3628 self.logger.debug(step + " after having waited for previous tasks to be completed")
3629 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3630
3631 step = "Getting nsr from database"
3632 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3633 old_operational_status = db_nsr["operational-status"]
3634 old_config_status = db_nsr["config-status"]
3635
3636 step = "Parsing scaling parameters"
3637 db_nsr_update["operational-status"] = "scaling"
3638 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3639 nsr_deployed = db_nsr["_admin"].get("deployed")
3640
3641 #######
3642 nsr_deployed = db_nsr["_admin"].get("deployed")
3643 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3644 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3645 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3646 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3647 #######
3648
3649 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3650 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3651 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3652 # for backward compatibility
3653 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3654 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3655 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3656 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3657
3658 step = "Getting vnfr from database"
3659 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3660
3661 step = "Getting vnfd from database"
3662 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3663
3664 step = "Getting scaling-group-descriptor"
3665 scaling_descriptor = find_in_list(
3666 get_scaling_aspect(
3667 db_vnfd
3668 ),
3669 lambda scale_desc: scale_desc["name"] == scaling_group
3670 )
3671 if not scaling_descriptor:
3672 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3673 "at vnfd:scaling-group-descriptor".format(scaling_group))
3674
3675 step = "Sending scale order to VIM"
3676 # TODO check if ns is in a proper status
3677 nb_scale_op = 0
3678 if not db_nsr["_admin"].get("scaling-group"):
3679 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3680 admin_scale_index = 0
3681 else:
3682 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3683 if admin_scale_info["name"] == scaling_group:
3684 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3685 break
3686 else: # not found, set index one plus last element and add new entry with the name
3687 admin_scale_index += 1
3688 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3689 RO_scaling_info = []
3690 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3691 if scaling_type == "SCALE_OUT":
3692 if "aspect-delta-details" not in scaling_descriptor:
3693 raise LcmException(
3694 "Aspect delta details not fount in scaling descriptor {}".format(
3695 scaling_descriptor["name"]
3696 )
3697 )
3698 # count if max-instance-count is reached
3699 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3700
3701 vdu_scaling_info["scaling_direction"] = "OUT"
3702 vdu_scaling_info["vdu-create"] = {}
3703 for delta in deltas:
3704 for vdu_delta in delta["vdu-delta"]:
3705 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3706 vdu_index = get_vdu_index(db_vnfr, vdu_delta["id"])
3707 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3708 if cloud_init_text:
3709 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3710 cloud_init_list = []
3711
3712 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3713 max_instance_count = 10
3714 if vdu_profile and "max-number-of-instances" in vdu_profile:
3715 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3716
3717 deafult_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3718
3719 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3720
3721 if nb_scale_op + deafult_instance_num > max_instance_count:
3722 raise LcmException(
3723 "reached the limit of {} (max-instance-count) "
3724 "scaling-out operations for the "
3725 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3726 )
3727 for x in range(vdu_delta.get("number-of-instances", 1)):
3728 if cloud_init_text:
3729 # TODO Information of its own ip is not available because db_vnfr is not updated.
3730 additional_params["OSM"] = get_osm_params(
3731 db_vnfr,
3732 vdu_delta["id"],
3733 vdu_index + x
3734 )
3735 cloud_init_list.append(
3736 self._parse_cloud_init(
3737 cloud_init_text,
3738 additional_params,
3739 db_vnfd["id"],
3740 vdud["id"]
3741 )
3742 )
3743 RO_scaling_info.append(
3744 {
3745 "osm_vdu_id": vdu_delta["id"],
3746 "member-vnf-index": vnf_index,
3747 "type": "create",
3748 "count": vdu_delta.get("number-of-instances", 1)
3749 }
3750 )
3751 if cloud_init_list:
3752 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3753 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3754
3755 elif scaling_type == "SCALE_IN":
3756 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3757 min_instance_count = int(scaling_descriptor["min-instance-count"])
3758
3759 vdu_scaling_info["scaling_direction"] = "IN"
3760 vdu_scaling_info["vdu-delete"] = {}
3761 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3762 for delta in deltas:
3763 for vdu_delta in delta["vdu-delta"]:
3764 min_instance_count = 0
3765 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3766 if vdu_profile and "min-number-of-instances" in vdu_profile:
3767 min_instance_count = vdu_profile["min-number-of-instances"]
3768
3769 deafult_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3770
3771 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3772 if nb_scale_op + deafult_instance_num < min_instance_count:
3773 raise LcmException(
3774 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3775 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3776 )
3777 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3778 "type": "delete", "count": vdu_delta.get("number-of-instances", 1)})
3779 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3780
3781 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3782 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3783 if vdu_scaling_info["scaling_direction"] == "IN":
3784 for vdur in reversed(db_vnfr["vdur"]):
3785 if vdu_delete.get(vdur["vdu-id-ref"]):
3786 vdu_delete[vdur["vdu-id-ref"]] -= 1
3787 vdu_scaling_info["vdu"].append({
3788 "name": vdur.get("name") or vdur.get("vdu-name"),
3789 "vdu_id": vdur["vdu-id-ref"],
3790 "interface": []
3791 })
3792 for interface in vdur["interfaces"]:
3793 vdu_scaling_info["vdu"][-1]["interface"].append({
3794 "name": interface["name"],
3795 "ip_address": interface["ip-address"],
3796 "mac_address": interface.get("mac-address"),
3797 })
3798 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3799
3800 # PRE-SCALE BEGIN
3801 step = "Executing pre-scale vnf-config-primitive"
3802 if scaling_descriptor.get("scaling-config-action"):
3803 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3804 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3805 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3806 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3807 step = db_nslcmop_update["detailed-status"] = \
3808 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3809
3810 # look for primitive
3811 for config_primitive in (get_configuration(
3812 db_vnfd, db_vnfd["id"]
3813 ) or {}).get("config-primitive", ()):
3814 if config_primitive["name"] == vnf_config_primitive:
3815 break
3816 else:
3817 raise LcmException(
3818 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3819 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3820 "primitive".format(scaling_group, vnf_config_primitive))
3821
3822 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3823 if db_vnfr.get("additionalParamsForVnf"):
3824 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3825
3826 scale_process = "VCA"
3827 db_nsr_update["config-status"] = "configuring pre-scaling"
3828 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3829
3830 # Pre-scale retry check: Check if this sub-operation has been executed before
3831 op_index = self._check_or_add_scale_suboperation(
3832 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3833 if op_index == self.SUBOPERATION_STATUS_SKIP:
3834 # Skip sub-operation
3835 result = 'COMPLETED'
3836 result_detail = 'Done'
3837 self.logger.debug(logging_text +
3838 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3839 vnf_config_primitive, result, result_detail))
3840 else:
3841 if op_index == self.SUBOPERATION_STATUS_NEW:
3842 # New sub-operation: Get index of this sub-operation
3843 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3844 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3845 format(vnf_config_primitive))
3846 else:
3847 # retry: Get registered params for this existing sub-operation
3848 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3849 vnf_index = op.get('member_vnf_index')
3850 vnf_config_primitive = op.get('primitive')
3851 primitive_params = op.get('primitive_params')
3852 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3853 format(vnf_config_primitive))
3854 # Execute the primitive, either with new (first-time) or registered (reintent) args
3855 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3856 primitive_name = config_primitive.get("execution-environment-primitive",
3857 vnf_config_primitive)
3858 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3859 member_vnf_index=vnf_index,
3860 vdu_id=None,
3861 vdu_count_index=None,
3862 ee_descriptor_id=ee_descriptor_id)
3863 result, result_detail = await self._ns_execute_primitive(
3864 ee_id, primitive_name, primitive_params, vca_type)
3865 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3866 vnf_config_primitive, result, result_detail))
3867 # Update operationState = COMPLETED | FAILED
3868 self._update_suboperation_status(
3869 db_nslcmop, op_index, result, result_detail)
3870
3871 if result == "FAILED":
3872 raise LcmException(result_detail)
3873 db_nsr_update["config-status"] = old_config_status
3874 scale_process = None
3875 # PRE-SCALE END
3876
3877 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3878 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3879
3880 # SCALE RO - BEGIN
3881 if RO_scaling_info:
3882 scale_process = "RO"
3883 if self.ro_config.get("ng"):
3884 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
3885 vdu_scaling_info.pop("vdu-create", None)
3886 vdu_scaling_info.pop("vdu-delete", None)
3887
3888 scale_process = None
3889 if db_nsr_update:
3890 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3891
3892 # POST-SCALE BEGIN
3893 # execute primitive service POST-SCALING
3894 step = "Executing post-scale vnf-config-primitive"
3895 if scaling_descriptor.get("scaling-config-action"):
3896 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3897 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
3898 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
3899 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3900 step = db_nslcmop_update["detailed-status"] = \
3901 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
3902
3903 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3904 if db_vnfr.get("additionalParamsForVnf"):
3905 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3906
3907 # look for primitive
3908 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3909 if config_primitive["name"] == vnf_config_primitive:
3910 break
3911 else:
3912 raise LcmException(
3913 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3914 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3915 "config-primitive".format(scaling_group, vnf_config_primitive))
3916 scale_process = "VCA"
3917 db_nsr_update["config-status"] = "configuring post-scaling"
3918 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3919
3920 # Post-scale retry check: Check if this sub-operation has been executed before
3921 op_index = self._check_or_add_scale_suboperation(
3922 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
3923 if op_index == self.SUBOPERATION_STATUS_SKIP:
3924 # Skip sub-operation
3925 result = 'COMPLETED'
3926 result_detail = 'Done'
3927 self.logger.debug(logging_text +
3928 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3929 format(vnf_config_primitive, result, result_detail))
3930 else:
3931 if op_index == self.SUBOPERATION_STATUS_NEW:
3932 # New sub-operation: Get index of this sub-operation
3933 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3934 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3935 format(vnf_config_primitive))
3936 else:
3937 # retry: Get registered params for this existing sub-operation
3938 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3939 vnf_index = op.get('member_vnf_index')
3940 vnf_config_primitive = op.get('primitive')
3941 primitive_params = op.get('primitive_params')
3942 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3943 format(vnf_config_primitive))
3944 # Execute the primitive, either with new (first-time) or registered (reintent) args
3945 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3946 primitive_name = config_primitive.get("execution-environment-primitive",
3947 vnf_config_primitive)
3948 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3949 member_vnf_index=vnf_index,
3950 vdu_id=None,
3951 vdu_count_index=None,
3952 ee_descriptor_id=ee_descriptor_id)
3953 result, result_detail = await self._ns_execute_primitive(
3954 ee_id, primitive_name, primitive_params, vca_type)
3955 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3956 vnf_config_primitive, result, result_detail))
3957 # Update operationState = COMPLETED | FAILED
3958 self._update_suboperation_status(
3959 db_nslcmop, op_index, result, result_detail)
3960
3961 if result == "FAILED":
3962 raise LcmException(result_detail)
3963 db_nsr_update["config-status"] = old_config_status
3964 scale_process = None
3965 # POST-SCALE END
3966
3967 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3968 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
3969 else old_operational_status
3970 db_nsr_update["config-status"] = old_config_status
3971 return
3972 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
3973 self.logger.error(logging_text + "Exit Exception {}".format(e))
3974 exc = e
3975 except asyncio.CancelledError:
3976 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3977 exc = "Operation was cancelled"
3978 except Exception as e:
3979 exc = traceback.format_exc()
3980 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3981 finally:
3982 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
3983 if exc:
3984 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
3985 nslcmop_operation_state = "FAILED"
3986 if db_nsr:
3987 db_nsr_update["operational-status"] = old_operational_status
3988 db_nsr_update["config-status"] = old_config_status
3989 db_nsr_update["detailed-status"] = ""
3990 if scale_process:
3991 if "VCA" in scale_process:
3992 db_nsr_update["config-status"] = "failed"
3993 if "RO" in scale_process:
3994 db_nsr_update["operational-status"] = "failed"
3995 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
3996 exc)
3997 else:
3998 error_description_nslcmop = None
3999 nslcmop_operation_state = "COMPLETED"
4000 db_nslcmop_update["detailed-status"] = "Done"
4001
4002 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4003 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4004 if db_nsr:
4005 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4006 current_operation_id=None, other_update=db_nsr_update)
4007
4008 if nslcmop_operation_state:
4009 try:
4010 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4011 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4012 except Exception as e:
4013 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4014 self.logger.debug(logging_text + "Exit")
4015 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4016
4017 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4018 nsr_id = db_nslcmop["nsInstanceId"]
4019 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4020 db_vnfrs = {}
4021
4022 # read from db: vnfd's for every vnf
4023 db_vnfds = []
4024
4025 # for each vnf in ns, read vnfd
4026 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4027 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4028 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4029 # if we haven't this vnfd, read it from db
4030 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4031 # read from db
4032 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4033 db_vnfds.append(vnfd)
4034 n2vc_key = self.n2vc.get_public_key()
4035 n2vc_key_list = [n2vc_key]
4036 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4037 mark_delete=True)
4038 # db_vnfr has been updated, update db_vnfrs to use it
4039 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4040 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4041 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4042 timeout_ns_deploy=self.timeout_ns_deploy)
4043 if vdu_scaling_info.get("vdu-delete"):
4044 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4045
4046 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4047 if not self.prometheus:
4048 return
4049 # look if exist a file called 'prometheus*.j2' and
4050 artifact_content = self.fs.dir_ls(artifact_path)
4051 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4052 if not job_file:
4053 return
4054 with self.fs.file_open((artifact_path, job_file), "r") as f:
4055 job_data = f.read()
4056
4057 # TODO get_service
4058 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4059 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4060 host_port = "80"
4061 vnfr_id = vnfr_id.replace("-", "")
4062 variables = {
4063 "JOB_NAME": vnfr_id,
4064 "TARGET_IP": target_ip,
4065 "EXPORTER_POD_IP": host_name,
4066 "EXPORTER_POD_PORT": host_port,
4067 }
4068 job_list = self.prometheus.parse_job(job_data, variables)
4069 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4070 for job in job_list:
4071 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4072 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4073 job["nsr_id"] = nsr_id
4074 job_dict = {jl["job_name"]: jl for jl in job_list}
4075 if await self.prometheus.update(job_dict):
4076 return list(job_dict.keys())
4077
4078 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4079 """
4080 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4081
4082 :param: vim_account_id: VIM Account ID
4083
4084 :return: (cloud_name, cloud_credential)
4085 """
4086 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4087 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4088
4089 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4090 """
4091 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4092
4093 :param: vim_account_id: VIM Account ID
4094
4095 :return: (cloud_name, cloud_credential)
4096 """
4097 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4098 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")