Feature 10906: Support for Anti-Affinity groups
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.data_utils.nsr import get_deployed_kdu
35 from osm_lcm.ng_ro import NgRoClient, NgRoException
36 from osm_lcm.lcm_utils import (
37 LcmException,
38 LcmExceptionNoMgmtIP,
39 LcmBase,
40 deep_get,
41 get_iterable,
42 populate_dict,
43 )
44 from osm_lcm.data_utils.nsd import get_vnf_profiles
45 from osm_lcm.data_utils.vnfd import (
46 get_vdu_list,
47 get_vdu_profile,
48 get_ee_sorted_initial_config_primitive_list,
49 get_ee_sorted_terminate_config_primitive_list,
50 get_kdu_list,
51 get_virtual_link_profiles,
52 get_vdu,
53 get_configuration,
54 get_vdu_index,
55 get_scaling_aspect,
56 get_number_of_instances,
57 get_juju_ee_ref,
58 get_kdu_profile,
59 )
60 from osm_lcm.data_utils.list_utils import find_in_list
61 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
62 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
63 from osm_lcm.data_utils.database.vim_account import VimAccountDB
64 from n2vc.k8s_helm_conn import K8sHelmConnector
65 from n2vc.k8s_helm3_conn import K8sHelm3Connector
66 from n2vc.k8s_juju_conn import K8sJujuConnector
67
68 from osm_common.dbbase import DbException
69 from osm_common.fsbase import FsException
70
71 from osm_lcm.data_utils.database.database import Database
72 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
73
74 from n2vc.n2vc_juju_conn import N2VCJujuConnector
75 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
76
77 from osm_lcm.lcm_helm_conn import LCMHelmConn
78
79 from copy import copy, deepcopy
80 from time import time
81 from uuid import uuid4
82
83 from random import randint
84
85 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
86
87
88 class NsLcm(LcmBase):
89 timeout_vca_on_error = (
90 5 * 60
91 ) # Time for charm from first time at blocked,error status to mark as failed
92 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
93 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
94 timeout_charm_delete = 10 * 60
95 timeout_primitive = 30 * 60 # timeout for primitive execution
96 timeout_progress_primitive = (
97 10 * 60
98 ) # timeout for some progress in a primitive execution
99
100 SUBOPERATION_STATUS_NOT_FOUND = -1
101 SUBOPERATION_STATUS_NEW = -2
102 SUBOPERATION_STATUS_SKIP = -3
103 task_name_deploy_vca = "Deploying VCA"
104
105 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
106 """
107 Init, Connect to database, filesystem storage, and messaging
108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
109 :return: None
110 """
111 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
112
113 self.db = Database().instance.db
114 self.fs = Filesystem().instance.fs
115 self.loop = loop
116 self.lcm_tasks = lcm_tasks
117 self.timeout = config["timeout"]
118 self.ro_config = config["ro_config"]
119 self.ng_ro = config["ro_config"].get("ng")
120 self.vca_config = config["VCA"].copy()
121
122 # create N2VC connector
123 self.n2vc = N2VCJujuConnector(
124 log=self.logger,
125 loop=self.loop,
126 on_update_db=self._on_update_n2vc_db,
127 fs=self.fs,
128 db=self.db,
129 )
130
131 self.conn_helm_ee = LCMHelmConn(
132 log=self.logger,
133 loop=self.loop,
134 vca_config=self.vca_config,
135 on_update_db=self._on_update_n2vc_db,
136 )
137
138 self.k8sclusterhelm2 = K8sHelmConnector(
139 kubectl_command=self.vca_config.get("kubectlpath"),
140 helm_command=self.vca_config.get("helmpath"),
141 log=self.logger,
142 on_update_db=None,
143 fs=self.fs,
144 db=self.db,
145 )
146
147 self.k8sclusterhelm3 = K8sHelm3Connector(
148 kubectl_command=self.vca_config.get("kubectlpath"),
149 helm_command=self.vca_config.get("helm3path"),
150 fs=self.fs,
151 log=self.logger,
152 db=self.db,
153 on_update_db=None,
154 )
155
156 self.k8sclusterjuju = K8sJujuConnector(
157 kubectl_command=self.vca_config.get("kubectlpath"),
158 juju_command=self.vca_config.get("jujupath"),
159 log=self.logger,
160 loop=self.loop,
161 on_update_db=self._on_update_k8s_db,
162 fs=self.fs,
163 db=self.db,
164 )
165
166 self.k8scluster_map = {
167 "helm-chart": self.k8sclusterhelm2,
168 "helm-chart-v3": self.k8sclusterhelm3,
169 "chart": self.k8sclusterhelm3,
170 "juju-bundle": self.k8sclusterjuju,
171 "juju": self.k8sclusterjuju,
172 }
173
174 self.vca_map = {
175 "lxc_proxy_charm": self.n2vc,
176 "native_charm": self.n2vc,
177 "k8s_proxy_charm": self.n2vc,
178 "helm": self.conn_helm_ee,
179 "helm-v3": self.conn_helm_ee,
180 }
181
182 self.prometheus = prometheus
183
184 # create RO client
185 self.RO = NgRoClient(self.loop, **self.ro_config)
186
187 @staticmethod
188 def increment_ip_mac(ip_mac, vm_index=1):
189 if not isinstance(ip_mac, str):
190 return ip_mac
191 try:
192 # try with ipv4 look for last dot
193 i = ip_mac.rfind(".")
194 if i > 0:
195 i += 1
196 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
197 # try with ipv6 or mac look for last colon. Operate in hex
198 i = ip_mac.rfind(":")
199 if i > 0:
200 i += 1
201 # format in hex, len can be 2 for mac or 4 for ipv6
202 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
203 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
204 )
205 except Exception:
206 pass
207 return None
208
209 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
210
211 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
212
213 try:
214 # TODO filter RO descriptor fields...
215
216 # write to database
217 db_dict = dict()
218 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
219 db_dict["deploymentStatus"] = ro_descriptor
220 self.update_db_2("nsrs", nsrs_id, db_dict)
221
222 except Exception as e:
223 self.logger.warn(
224 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
225 )
226
227 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
228
229 # remove last dot from path (if exists)
230 if path.endswith("."):
231 path = path[:-1]
232
233 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
234 # .format(table, filter, path, updated_data))
235 try:
236
237 nsr_id = filter.get("_id")
238
239 # read ns record from database
240 nsr = self.db.get_one(table="nsrs", q_filter=filter)
241 current_ns_status = nsr.get("nsState")
242
243 # get vca status for NS
244 status_dict = await self.n2vc.get_status(
245 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
246 )
247
248 # vcaStatus
249 db_dict = dict()
250 db_dict["vcaStatus"] = status_dict
251 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
252
253 # update configurationStatus for this VCA
254 try:
255 vca_index = int(path[path.rfind(".") + 1 :])
256
257 vca_list = deep_get(
258 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
259 )
260 vca_status = vca_list[vca_index].get("status")
261
262 configuration_status_list = nsr.get("configurationStatus")
263 config_status = configuration_status_list[vca_index].get("status")
264
265 if config_status == "BROKEN" and vca_status != "failed":
266 db_dict["configurationStatus"][vca_index] = "READY"
267 elif config_status != "BROKEN" and vca_status == "failed":
268 db_dict["configurationStatus"][vca_index] = "BROKEN"
269 except Exception as e:
270 # not update configurationStatus
271 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
272
273 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
274 # if nsState = 'DEGRADED' check if all is OK
275 is_degraded = False
276 if current_ns_status in ("READY", "DEGRADED"):
277 error_description = ""
278 # check machines
279 if status_dict.get("machines"):
280 for machine_id in status_dict.get("machines"):
281 machine = status_dict.get("machines").get(machine_id)
282 # check machine agent-status
283 if machine.get("agent-status"):
284 s = machine.get("agent-status").get("status")
285 if s != "started":
286 is_degraded = True
287 error_description += (
288 "machine {} agent-status={} ; ".format(
289 machine_id, s
290 )
291 )
292 # check machine instance status
293 if machine.get("instance-status"):
294 s = machine.get("instance-status").get("status")
295 if s != "running":
296 is_degraded = True
297 error_description += (
298 "machine {} instance-status={} ; ".format(
299 machine_id, s
300 )
301 )
302 # check applications
303 if status_dict.get("applications"):
304 for app_id in status_dict.get("applications"):
305 app = status_dict.get("applications").get(app_id)
306 # check application status
307 if app.get("status"):
308 s = app.get("status").get("status")
309 if s != "active":
310 is_degraded = True
311 error_description += (
312 "application {} status={} ; ".format(app_id, s)
313 )
314
315 if error_description:
316 db_dict["errorDescription"] = error_description
317 if current_ns_status == "READY" and is_degraded:
318 db_dict["nsState"] = "DEGRADED"
319 if current_ns_status == "DEGRADED" and not is_degraded:
320 db_dict["nsState"] = "READY"
321
322 # write to database
323 self.update_db_2("nsrs", nsr_id, db_dict)
324
325 except (asyncio.CancelledError, asyncio.TimeoutError):
326 raise
327 except Exception as e:
328 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
329
330 async def _on_update_k8s_db(
331 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
332 ):
333 """
334 Updating vca status in NSR record
335 :param cluster_uuid: UUID of a k8s cluster
336 :param kdu_instance: The unique name of the KDU instance
337 :param filter: To get nsr_id
338 :return: none
339 """
340
341 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
342 # .format(cluster_uuid, kdu_instance, filter))
343
344 try:
345 nsr_id = filter.get("_id")
346
347 # get vca status for NS
348 vca_status = await self.k8sclusterjuju.status_kdu(
349 cluster_uuid,
350 kdu_instance,
351 complete_status=True,
352 yaml_format=False,
353 vca_id=vca_id,
354 )
355 # vcaStatus
356 db_dict = dict()
357 db_dict["vcaStatus"] = {nsr_id: vca_status}
358
359 await self.k8sclusterjuju.update_vca_status(
360 db_dict["vcaStatus"],
361 kdu_instance,
362 vca_id=vca_id,
363 )
364
365 # write to database
366 self.update_db_2("nsrs", nsr_id, db_dict)
367
368 except (asyncio.CancelledError, asyncio.TimeoutError):
369 raise
370 except Exception as e:
371 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
372
373 @staticmethod
374 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
375 try:
376 env = Environment(undefined=StrictUndefined)
377 template = env.from_string(cloud_init_text)
378 return template.render(additional_params or {})
379 except UndefinedError as e:
380 raise LcmException(
381 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
382 "file, must be provided in the instantiation parameters inside the "
383 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
384 )
385 except (TemplateError, TemplateNotFound) as e:
386 raise LcmException(
387 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
388 vnfd_id, vdu_id, e
389 )
390 )
391
392 def _get_vdu_cloud_init_content(self, vdu, vnfd):
393 cloud_init_content = cloud_init_file = None
394 try:
395 if vdu.get("cloud-init-file"):
396 base_folder = vnfd["_admin"]["storage"]
397 cloud_init_file = "{}/{}/cloud_init/{}".format(
398 base_folder["folder"],
399 base_folder["pkg-dir"],
400 vdu["cloud-init-file"],
401 )
402 with self.fs.file_open(cloud_init_file, "r") as ci_file:
403 cloud_init_content = ci_file.read()
404 elif vdu.get("cloud-init"):
405 cloud_init_content = vdu["cloud-init"]
406
407 return cloud_init_content
408 except FsException as e:
409 raise LcmException(
410 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
411 vnfd["id"], vdu["id"], cloud_init_file, e
412 )
413 )
414
415 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
416 vdur = next(
417 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
418 {}
419 )
420 additional_params = vdur.get("additionalParams")
421 return parse_yaml_strings(additional_params)
422
423 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
424 """
425 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
426 :param vnfd: input vnfd
427 :param new_id: overrides vnf id if provided
428 :param additionalParams: Instantiation params for VNFs provided
429 :param nsrId: Id of the NSR
430 :return: copy of vnfd
431 """
432 vnfd_RO = deepcopy(vnfd)
433 # remove unused by RO configuration, monitoring, scaling and internal keys
434 vnfd_RO.pop("_id", None)
435 vnfd_RO.pop("_admin", None)
436 vnfd_RO.pop("monitoring-param", None)
437 vnfd_RO.pop("scaling-group-descriptor", None)
438 vnfd_RO.pop("kdu", None)
439 vnfd_RO.pop("k8s-cluster", None)
440 if new_id:
441 vnfd_RO["id"] = new_id
442
443 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
444 for vdu in get_iterable(vnfd_RO, "vdu"):
445 vdu.pop("cloud-init-file", None)
446 vdu.pop("cloud-init", None)
447 return vnfd_RO
448
449 @staticmethod
450 def ip_profile_2_RO(ip_profile):
451 RO_ip_profile = deepcopy(ip_profile)
452 if "dns-server" in RO_ip_profile:
453 if isinstance(RO_ip_profile["dns-server"], list):
454 RO_ip_profile["dns-address"] = []
455 for ds in RO_ip_profile.pop("dns-server"):
456 RO_ip_profile["dns-address"].append(ds["address"])
457 else:
458 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
459 if RO_ip_profile.get("ip-version") == "ipv4":
460 RO_ip_profile["ip-version"] = "IPv4"
461 if RO_ip_profile.get("ip-version") == "ipv6":
462 RO_ip_profile["ip-version"] = "IPv6"
463 if "dhcp-params" in RO_ip_profile:
464 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
465 return RO_ip_profile
466
467 def _get_ro_vim_id_for_vim_account(self, vim_account):
468 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
469 if db_vim["_admin"]["operationalState"] != "ENABLED":
470 raise LcmException(
471 "VIM={} is not available. operationalState={}".format(
472 vim_account, db_vim["_admin"]["operationalState"]
473 )
474 )
475 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
476 return RO_vim_id
477
478 def get_ro_wim_id_for_wim_account(self, wim_account):
479 if isinstance(wim_account, str):
480 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
481 if db_wim["_admin"]["operationalState"] != "ENABLED":
482 raise LcmException(
483 "WIM={} is not available. operationalState={}".format(
484 wim_account, db_wim["_admin"]["operationalState"]
485 )
486 )
487 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
488 return RO_wim_id
489 else:
490 return wim_account
491
492 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
493
494 db_vdu_push_list = []
495 template_vdur = []
496 db_update = {"_admin.modified": time()}
497 if vdu_create:
498 for vdu_id, vdu_count in vdu_create.items():
499 vdur = next(
500 (
501 vdur
502 for vdur in reversed(db_vnfr["vdur"])
503 if vdur["vdu-id-ref"] == vdu_id
504 ),
505 None,
506 )
507 if not vdur:
508 # Read the template saved in the db:
509 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
510 vdur_template = db_vnfr.get("vdur-template")
511 if not vdur_template:
512 raise LcmException(
513 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
514 vdu_id
515 )
516 )
517 vdur = vdur_template[0]
518 #Delete a template from the database after using it
519 self.db.set_one("vnfrs",
520 {"_id": db_vnfr["_id"]},
521 None,
522 pull={"vdur-template": {"_id": vdur['_id']}}
523 )
524 for count in range(vdu_count):
525 vdur_copy = deepcopy(vdur)
526 vdur_copy["status"] = "BUILD"
527 vdur_copy["status-detailed"] = None
528 vdur_copy["ip-address"] = None
529 vdur_copy["_id"] = str(uuid4())
530 vdur_copy["count-index"] += count + 1
531 vdur_copy["id"] = "{}-{}".format(
532 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
533 )
534 vdur_copy.pop("vim_info", None)
535 for iface in vdur_copy["interfaces"]:
536 if iface.get("fixed-ip"):
537 iface["ip-address"] = self.increment_ip_mac(
538 iface["ip-address"], count + 1
539 )
540 else:
541 iface.pop("ip-address", None)
542 if iface.get("fixed-mac"):
543 iface["mac-address"] = self.increment_ip_mac(
544 iface["mac-address"], count + 1
545 )
546 else:
547 iface.pop("mac-address", None)
548 if db_vnfr["vdur"]:
549 iface.pop(
550 "mgmt_vnf", None
551 ) # only first vdu can be managment of vnf
552 db_vdu_push_list.append(vdur_copy)
553 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
554 if vdu_delete:
555 if len(db_vnfr["vdur"]) == 1:
556 # The scale will move to 0 instances
557 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
558 template_vdur = [db_vnfr["vdur"][0]]
559 for vdu_id, vdu_count in vdu_delete.items():
560 if mark_delete:
561 indexes_to_delete = [
562 iv[0]
563 for iv in enumerate(db_vnfr["vdur"])
564 if iv[1]["vdu-id-ref"] == vdu_id
565 ]
566 db_update.update(
567 {
568 "vdur.{}.status".format(i): "DELETING"
569 for i in indexes_to_delete[-vdu_count:]
570 }
571 )
572 else:
573 # it must be deleted one by one because common.db does not allow otherwise
574 vdus_to_delete = [
575 v
576 for v in reversed(db_vnfr["vdur"])
577 if v["vdu-id-ref"] == vdu_id
578 ]
579 for vdu in vdus_to_delete[:vdu_count]:
580 self.db.set_one(
581 "vnfrs",
582 {"_id": db_vnfr["_id"]},
583 None,
584 pull={"vdur": {"_id": vdu["_id"]}},
585 )
586 db_push = {}
587 if db_vdu_push_list:
588 db_push["vdur"] = db_vdu_push_list
589 if template_vdur:
590 db_push["vdur-template"] = template_vdur
591 if not db_push:
592 db_push = None
593 db_vnfr["vdur-template"] = template_vdur
594 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
595 # modify passed dictionary db_vnfr
596 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
597 db_vnfr["vdur"] = db_vnfr_["vdur"]
598
599 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
600 """
601 Updates database nsr with the RO info for the created vld
602 :param ns_update_nsr: dictionary to be filled with the updated info
603 :param db_nsr: content of db_nsr. This is also modified
604 :param nsr_desc_RO: nsr descriptor from RO
605 :return: Nothing, LcmException is raised on errors
606 """
607
608 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
609 for net_RO in get_iterable(nsr_desc_RO, "nets"):
610 if vld["id"] != net_RO.get("ns_net_osm_id"):
611 continue
612 vld["vim-id"] = net_RO.get("vim_net_id")
613 vld["name"] = net_RO.get("vim_name")
614 vld["status"] = net_RO.get("status")
615 vld["status-detailed"] = net_RO.get("error_msg")
616 ns_update_nsr["vld.{}".format(vld_index)] = vld
617 break
618 else:
619 raise LcmException(
620 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
621 )
622
623 def set_vnfr_at_error(self, db_vnfrs, error_text):
624 try:
625 for db_vnfr in db_vnfrs.values():
626 vnfr_update = {"status": "ERROR"}
627 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
628 if "status" not in vdur:
629 vdur["status"] = "ERROR"
630 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
631 if error_text:
632 vdur["status-detailed"] = str(error_text)
633 vnfr_update[
634 "vdur.{}.status-detailed".format(vdu_index)
635 ] = "ERROR"
636 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
637 except DbException as e:
638 self.logger.error("Cannot update vnf. {}".format(e))
639
640 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
641 """
642 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
643 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
646 """
647 for vnf_index, db_vnfr in db_vnfrs.items():
648 for vnf_RO in nsr_desc_RO["vnfs"]:
649 if vnf_RO["member_vnf_index"] != vnf_index:
650 continue
651 vnfr_update = {}
652 if vnf_RO.get("ip_address"):
653 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
654 "ip_address"
655 ].split(";")[0]
656 elif not db_vnfr.get("ip-address"):
657 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
658 raise LcmExceptionNoMgmtIP(
659 "ns member_vnf_index '{}' has no IP address".format(
660 vnf_index
661 )
662 )
663
664 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
665 vdur_RO_count_index = 0
666 if vdur.get("pdu-type"):
667 continue
668 for vdur_RO in get_iterable(vnf_RO, "vms"):
669 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
670 continue
671 if vdur["count-index"] != vdur_RO_count_index:
672 vdur_RO_count_index += 1
673 continue
674 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
675 if vdur_RO.get("ip_address"):
676 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
677 else:
678 vdur["ip-address"] = None
679 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
680 vdur["name"] = vdur_RO.get("vim_name")
681 vdur["status"] = vdur_RO.get("status")
682 vdur["status-detailed"] = vdur_RO.get("error_msg")
683 for ifacer in get_iterable(vdur, "interfaces"):
684 for interface_RO in get_iterable(vdur_RO, "interfaces"):
685 if ifacer["name"] == interface_RO.get("internal_name"):
686 ifacer["ip-address"] = interface_RO.get(
687 "ip_address"
688 )
689 ifacer["mac-address"] = interface_RO.get(
690 "mac_address"
691 )
692 break
693 else:
694 raise LcmException(
695 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
696 "from VIM info".format(
697 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
698 )
699 )
700 vnfr_update["vdur.{}".format(vdu_index)] = vdur
701 break
702 else:
703 raise LcmException(
704 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
705 "VIM info".format(
706 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
707 )
708 )
709
710 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
711 for net_RO in get_iterable(nsr_desc_RO, "nets"):
712 if vld["id"] != net_RO.get("vnf_net_osm_id"):
713 continue
714 vld["vim-id"] = net_RO.get("vim_net_id")
715 vld["name"] = net_RO.get("vim_name")
716 vld["status"] = net_RO.get("status")
717 vld["status-detailed"] = net_RO.get("error_msg")
718 vnfr_update["vld.{}".format(vld_index)] = vld
719 break
720 else:
721 raise LcmException(
722 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
723 vnf_index, vld["id"]
724 )
725 )
726
727 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
728 break
729
730 else:
731 raise LcmException(
732 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
733 vnf_index
734 )
735 )
736
737 def _get_ns_config_info(self, nsr_id):
738 """
739 Generates a mapping between vnf,vdu elements and the N2VC id
740 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
741 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
742 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
743 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
744 """
745 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
746 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
747 mapping = {}
748 ns_config_info = {"osm-config-mapping": mapping}
749 for vca in vca_deployed_list:
750 if not vca["member-vnf-index"]:
751 continue
752 if not vca["vdu_id"]:
753 mapping[vca["member-vnf-index"]] = vca["application"]
754 else:
755 mapping[
756 "{}.{}.{}".format(
757 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
758 )
759 ] = vca["application"]
760 return ns_config_info
761
762 async def _instantiate_ng_ro(
763 self,
764 logging_text,
765 nsr_id,
766 nsd,
767 db_nsr,
768 db_nslcmop,
769 db_vnfrs,
770 db_vnfds,
771 n2vc_key_list,
772 stage,
773 start_deploy,
774 timeout_ns_deploy,
775 ):
776
777 db_vims = {}
778
779 def get_vim_account(vim_account_id):
780 nonlocal db_vims
781 if vim_account_id in db_vims:
782 return db_vims[vim_account_id]
783 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
784 db_vims[vim_account_id] = db_vim
785 return db_vim
786
787 # modify target_vld info with instantiation parameters
788 def parse_vld_instantiation_params(
789 target_vim, target_vld, vld_params, target_sdn
790 ):
791 if vld_params.get("ip-profile"):
792 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
793 "ip-profile"
794 ]
795 if vld_params.get("provider-network"):
796 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
797 "provider-network"
798 ]
799 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
800 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
801 "provider-network"
802 ]["sdn-ports"]
803 if vld_params.get("wimAccountId"):
804 target_wim = "wim:{}".format(vld_params["wimAccountId"])
805 target_vld["vim_info"][target_wim] = {}
806 for param in ("vim-network-name", "vim-network-id"):
807 if vld_params.get(param):
808 if isinstance(vld_params[param], dict):
809 for vim, vim_net in vld_params[param].items():
810 other_target_vim = "vim:" + vim
811 populate_dict(
812 target_vld["vim_info"],
813 (other_target_vim, param.replace("-", "_")),
814 vim_net,
815 )
816 else: # isinstance str
817 target_vld["vim_info"][target_vim][
818 param.replace("-", "_")
819 ] = vld_params[param]
820 if vld_params.get("common_id"):
821 target_vld["common_id"] = vld_params.get("common_id")
822
823 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
824 def update_ns_vld_target(target, ns_params):
825 for vnf_params in ns_params.get("vnf", ()):
826 if vnf_params.get("vimAccountId"):
827 target_vnf = next(
828 (
829 vnfr
830 for vnfr in db_vnfrs.values()
831 if vnf_params["member-vnf-index"]
832 == vnfr["member-vnf-index-ref"]
833 ),
834 None,
835 )
836 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
837 for a_index, a_vld in enumerate(target["ns"]["vld"]):
838 target_vld = find_in_list(
839 get_iterable(vdur, "interfaces"),
840 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
841 )
842 if target_vld:
843 if vnf_params.get("vimAccountId") not in a_vld.get(
844 "vim_info", {}
845 ):
846 target["ns"]["vld"][a_index].get("vim_info").update(
847 {
848 "vim:{}".format(vnf_params["vimAccountId"]): {
849 "vim_network_name": ""
850 }
851 }
852 )
853
854 nslcmop_id = db_nslcmop["_id"]
855 target = {
856 "name": db_nsr["name"],
857 "ns": {"vld": []},
858 "vnf": [],
859 "image": deepcopy(db_nsr["image"]),
860 "flavor": deepcopy(db_nsr["flavor"]),
861 "action_id": nslcmop_id,
862 "cloud_init_content": {},
863 }
864 for image in target["image"]:
865 image["vim_info"] = {}
866 for flavor in target["flavor"]:
867 flavor["vim_info"] = {}
868 if db_nsr.get("affinity-or-anti-affinity-group"):
869 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
870 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
871 affinity_or_anti_affinity_group["vim_info"] = {}
872
873 if db_nslcmop.get("lcmOperationType") != "instantiate":
874 # get parameters of instantiation:
875 db_nslcmop_instantiate = self.db.get_list(
876 "nslcmops",
877 {
878 "nsInstanceId": db_nslcmop["nsInstanceId"],
879 "lcmOperationType": "instantiate",
880 },
881 )[-1]
882 ns_params = db_nslcmop_instantiate.get("operationParams")
883 else:
884 ns_params = db_nslcmop.get("operationParams")
885 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
886 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
887
888 cp2target = {}
889 for vld_index, vld in enumerate(db_nsr.get("vld")):
890 target_vim = "vim:{}".format(ns_params["vimAccountId"])
891 target_vld = {
892 "id": vld["id"],
893 "name": vld["name"],
894 "mgmt-network": vld.get("mgmt-network", False),
895 "type": vld.get("type"),
896 "vim_info": {
897 target_vim: {
898 "vim_network_name": vld.get("vim-network-name"),
899 "vim_account_id": ns_params["vimAccountId"],
900 }
901 },
902 }
903 # check if this network needs SDN assist
904 if vld.get("pci-interfaces"):
905 db_vim = get_vim_account(ns_params["vimAccountId"])
906 sdnc_id = db_vim["config"].get("sdn-controller")
907 if sdnc_id:
908 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
909 target_sdn = "sdn:{}".format(sdnc_id)
910 target_vld["vim_info"][target_sdn] = {
911 "sdn": True,
912 "target_vim": target_vim,
913 "vlds": [sdn_vld],
914 "type": vld.get("type"),
915 }
916
917 nsd_vnf_profiles = get_vnf_profiles(nsd)
918 for nsd_vnf_profile in nsd_vnf_profiles:
919 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
920 if cp["virtual-link-profile-id"] == vld["id"]:
921 cp2target[
922 "member_vnf:{}.{}".format(
923 cp["constituent-cpd-id"][0][
924 "constituent-base-element-id"
925 ],
926 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
927 )
928 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
929
930 # check at nsd descriptor, if there is an ip-profile
931 vld_params = {}
932 nsd_vlp = find_in_list(
933 get_virtual_link_profiles(nsd),
934 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
935 == vld["id"],
936 )
937 if (
938 nsd_vlp
939 and nsd_vlp.get("virtual-link-protocol-data")
940 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
941 ):
942 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
943 "l3-protocol-data"
944 ]
945 ip_profile_dest_data = {}
946 if "ip-version" in ip_profile_source_data:
947 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
948 "ip-version"
949 ]
950 if "cidr" in ip_profile_source_data:
951 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
952 "cidr"
953 ]
954 if "gateway-ip" in ip_profile_source_data:
955 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
956 "gateway-ip"
957 ]
958 if "dhcp-enabled" in ip_profile_source_data:
959 ip_profile_dest_data["dhcp-params"] = {
960 "enabled": ip_profile_source_data["dhcp-enabled"]
961 }
962 vld_params["ip-profile"] = ip_profile_dest_data
963
964 # update vld_params with instantiation params
965 vld_instantiation_params = find_in_list(
966 get_iterable(ns_params, "vld"),
967 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
968 )
969 if vld_instantiation_params:
970 vld_params.update(vld_instantiation_params)
971 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
972 target["ns"]["vld"].append(target_vld)
973 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
974 update_ns_vld_target(target, ns_params)
975
976 for vnfr in db_vnfrs.values():
977 vnfd = find_in_list(
978 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
979 )
980 vnf_params = find_in_list(
981 get_iterable(ns_params, "vnf"),
982 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
983 )
984 target_vnf = deepcopy(vnfr)
985 target_vim = "vim:{}".format(vnfr["vim-account-id"])
986 for vld in target_vnf.get("vld", ()):
987 # check if connected to a ns.vld, to fill target'
988 vnf_cp = find_in_list(
989 vnfd.get("int-virtual-link-desc", ()),
990 lambda cpd: cpd.get("id") == vld["id"],
991 )
992 if vnf_cp:
993 ns_cp = "member_vnf:{}.{}".format(
994 vnfr["member-vnf-index-ref"], vnf_cp["id"]
995 )
996 if cp2target.get(ns_cp):
997 vld["target"] = cp2target[ns_cp]
998
999 vld["vim_info"] = {
1000 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1001 }
1002 # check if this network needs SDN assist
1003 target_sdn = None
1004 if vld.get("pci-interfaces"):
1005 db_vim = get_vim_account(vnfr["vim-account-id"])
1006 sdnc_id = db_vim["config"].get("sdn-controller")
1007 if sdnc_id:
1008 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1009 target_sdn = "sdn:{}".format(sdnc_id)
1010 vld["vim_info"][target_sdn] = {
1011 "sdn": True,
1012 "target_vim": target_vim,
1013 "vlds": [sdn_vld],
1014 "type": vld.get("type"),
1015 }
1016
1017 # check at vnfd descriptor, if there is an ip-profile
1018 vld_params = {}
1019 vnfd_vlp = find_in_list(
1020 get_virtual_link_profiles(vnfd),
1021 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1022 )
1023 if (
1024 vnfd_vlp
1025 and vnfd_vlp.get("virtual-link-protocol-data")
1026 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1027 ):
1028 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1029 "l3-protocol-data"
1030 ]
1031 ip_profile_dest_data = {}
1032 if "ip-version" in ip_profile_source_data:
1033 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1034 "ip-version"
1035 ]
1036 if "cidr" in ip_profile_source_data:
1037 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1038 "cidr"
1039 ]
1040 if "gateway-ip" in ip_profile_source_data:
1041 ip_profile_dest_data[
1042 "gateway-address"
1043 ] = ip_profile_source_data["gateway-ip"]
1044 if "dhcp-enabled" in ip_profile_source_data:
1045 ip_profile_dest_data["dhcp-params"] = {
1046 "enabled": ip_profile_source_data["dhcp-enabled"]
1047 }
1048
1049 vld_params["ip-profile"] = ip_profile_dest_data
1050 # update vld_params with instantiation params
1051 if vnf_params:
1052 vld_instantiation_params = find_in_list(
1053 get_iterable(vnf_params, "internal-vld"),
1054 lambda i_vld: i_vld["name"] == vld["id"],
1055 )
1056 if vld_instantiation_params:
1057 vld_params.update(vld_instantiation_params)
1058 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1059
1060 vdur_list = []
1061 for vdur in target_vnf.get("vdur", ()):
1062 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1063 continue # This vdu must not be created
1064 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1065
1066 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1067
1068 if ssh_keys_all:
1069 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1070 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1071 if (
1072 vdu_configuration
1073 and vdu_configuration.get("config-access")
1074 and vdu_configuration.get("config-access").get("ssh-access")
1075 ):
1076 vdur["ssh-keys"] = ssh_keys_all
1077 vdur["ssh-access-required"] = vdu_configuration[
1078 "config-access"
1079 ]["ssh-access"]["required"]
1080 elif (
1081 vnf_configuration
1082 and vnf_configuration.get("config-access")
1083 and vnf_configuration.get("config-access").get("ssh-access")
1084 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1085 ):
1086 vdur["ssh-keys"] = ssh_keys_all
1087 vdur["ssh-access-required"] = vnf_configuration[
1088 "config-access"
1089 ]["ssh-access"]["required"]
1090 elif ssh_keys_instantiation and find_in_list(
1091 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1092 ):
1093 vdur["ssh-keys"] = ssh_keys_instantiation
1094
1095 self.logger.debug("NS > vdur > {}".format(vdur))
1096
1097 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1098 # cloud-init
1099 if vdud.get("cloud-init-file"):
1100 vdur["cloud-init"] = "{}:file:{}".format(
1101 vnfd["_id"], vdud.get("cloud-init-file")
1102 )
1103 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1104 if vdur["cloud-init"] not in target["cloud_init_content"]:
1105 base_folder = vnfd["_admin"]["storage"]
1106 cloud_init_file = "{}/{}/cloud_init/{}".format(
1107 base_folder["folder"],
1108 base_folder["pkg-dir"],
1109 vdud.get("cloud-init-file"),
1110 )
1111 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1112 target["cloud_init_content"][
1113 vdur["cloud-init"]
1114 ] = ci_file.read()
1115 elif vdud.get("cloud-init"):
1116 vdur["cloud-init"] = "{}:vdu:{}".format(
1117 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1118 )
1119 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1120 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1121 "cloud-init"
1122 ]
1123 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1124 deploy_params_vdu = self._format_additional_params(
1125 vdur.get("additionalParams") or {}
1126 )
1127 deploy_params_vdu["OSM"] = get_osm_params(
1128 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1129 )
1130 vdur["additionalParams"] = deploy_params_vdu
1131
1132 # flavor
1133 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1134 if target_vim not in ns_flavor["vim_info"]:
1135 ns_flavor["vim_info"][target_vim] = {}
1136
1137 # deal with images
1138 # in case alternative images are provided we must check if they should be applied
1139 # for the vim_type, modify the vim_type taking into account
1140 ns_image_id = int(vdur["ns-image-id"])
1141 if vdur.get("alt-image-ids"):
1142 db_vim = get_vim_account(vnfr["vim-account-id"])
1143 vim_type = db_vim["vim_type"]
1144 for alt_image_id in vdur.get("alt-image-ids"):
1145 ns_alt_image = target["image"][int(alt_image_id)]
1146 if vim_type == ns_alt_image.get("vim-type"):
1147 # must use alternative image
1148 self.logger.debug(
1149 "use alternative image id: {}".format(alt_image_id)
1150 )
1151 ns_image_id = alt_image_id
1152 vdur["ns-image-id"] = ns_image_id
1153 break
1154 ns_image = target["image"][int(ns_image_id)]
1155 if target_vim not in ns_image["vim_info"]:
1156 ns_image["vim_info"][target_vim] = {}
1157
1158 # Affinity groups
1159 if vdur.get("affinity-or-anti-affinity-group-id"):
1160 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1161 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1162 if target_vim not in ns_ags["vim_info"]:
1163 ns_ags["vim_info"][target_vim] = {}
1164
1165 vdur["vim_info"] = {target_vim: {}}
1166 # instantiation parameters
1167 # if vnf_params:
1168 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1169 # vdud["id"]), None)
1170 vdur_list.append(vdur)
1171 target_vnf["vdur"] = vdur_list
1172 target["vnf"].append(target_vnf)
1173
1174 desc = await self.RO.deploy(nsr_id, target)
1175 self.logger.debug("RO return > {}".format(desc))
1176 action_id = desc["action_id"]
1177 await self._wait_ng_ro(
1178 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1179 )
1180
1181 # Updating NSR
1182 db_nsr_update = {
1183 "_admin.deployed.RO.operational-status": "running",
1184 "detailed-status": " ".join(stage),
1185 }
1186 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1187 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1188 self._write_op_status(nslcmop_id, stage)
1189 self.logger.debug(
1190 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1191 )
1192 return
1193
1194 async def _wait_ng_ro(
1195 self,
1196 nsr_id,
1197 action_id,
1198 nslcmop_id=None,
1199 start_time=None,
1200 timeout=600,
1201 stage=None,
1202 ):
1203 detailed_status_old = None
1204 db_nsr_update = {}
1205 start_time = start_time or time()
1206 while time() <= start_time + timeout:
1207 desc_status = await self.RO.status(nsr_id, action_id)
1208 self.logger.debug("Wait NG RO > {}".format(desc_status))
1209 if desc_status["status"] == "FAILED":
1210 raise NgRoException(desc_status["details"])
1211 elif desc_status["status"] == "BUILD":
1212 if stage:
1213 stage[2] = "VIM: ({})".format(desc_status["details"])
1214 elif desc_status["status"] == "DONE":
1215 if stage:
1216 stage[2] = "Deployed at VIM"
1217 break
1218 else:
1219 assert False, "ROclient.check_ns_status returns unknown {}".format(
1220 desc_status["status"]
1221 )
1222 if stage and nslcmop_id and stage[2] != detailed_status_old:
1223 detailed_status_old = stage[2]
1224 db_nsr_update["detailed-status"] = " ".join(stage)
1225 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1226 self._write_op_status(nslcmop_id, stage)
1227 await asyncio.sleep(15, loop=self.loop)
1228 else: # timeout_ns_deploy
1229 raise NgRoException("Timeout waiting ns to deploy")
1230
1231 async def _terminate_ng_ro(
1232 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1233 ):
1234 db_nsr_update = {}
1235 failed_detail = []
1236 action_id = None
1237 start_deploy = time()
1238 try:
1239 target = {
1240 "ns": {"vld": []},
1241 "vnf": [],
1242 "image": [],
1243 "flavor": [],
1244 "action_id": nslcmop_id,
1245 }
1246 desc = await self.RO.deploy(nsr_id, target)
1247 action_id = desc["action_id"]
1248 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1249 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1250 self.logger.debug(
1251 logging_text
1252 + "ns terminate action at RO. action_id={}".format(action_id)
1253 )
1254
1255 # wait until done
1256 delete_timeout = 20 * 60 # 20 minutes
1257 await self._wait_ng_ro(
1258 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1259 )
1260
1261 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1262 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1263 # delete all nsr
1264 await self.RO.delete(nsr_id)
1265 except Exception as e:
1266 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1267 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1268 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1269 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1270 self.logger.debug(
1271 logging_text + "RO_action_id={} already deleted".format(action_id)
1272 )
1273 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1274 failed_detail.append("delete conflict: {}".format(e))
1275 self.logger.debug(
1276 logging_text
1277 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1278 )
1279 else:
1280 failed_detail.append("delete error: {}".format(e))
1281 self.logger.error(
1282 logging_text
1283 + "RO_action_id={} delete error: {}".format(action_id, e)
1284 )
1285
1286 if failed_detail:
1287 stage[2] = "Error deleting from VIM"
1288 else:
1289 stage[2] = "Deleted from VIM"
1290 db_nsr_update["detailed-status"] = " ".join(stage)
1291 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1292 self._write_op_status(nslcmop_id, stage)
1293
1294 if failed_detail:
1295 raise LcmException("; ".join(failed_detail))
1296 return
1297
1298 async def instantiate_RO(
1299 self,
1300 logging_text,
1301 nsr_id,
1302 nsd,
1303 db_nsr,
1304 db_nslcmop,
1305 db_vnfrs,
1306 db_vnfds,
1307 n2vc_key_list,
1308 stage,
1309 ):
1310 """
1311 Instantiate at RO
1312 :param logging_text: preffix text to use at logging
1313 :param nsr_id: nsr identity
1314 :param nsd: database content of ns descriptor
1315 :param db_nsr: database content of ns record
1316 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1317 :param db_vnfrs:
1318 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1319 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1320 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1321 :return: None or exception
1322 """
1323 try:
1324 start_deploy = time()
1325 ns_params = db_nslcmop.get("operationParams")
1326 if ns_params and ns_params.get("timeout_ns_deploy"):
1327 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1328 else:
1329 timeout_ns_deploy = self.timeout.get(
1330 "ns_deploy", self.timeout_ns_deploy
1331 )
1332
1333 # Check for and optionally request placement optimization. Database will be updated if placement activated
1334 stage[2] = "Waiting for Placement."
1335 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1336 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1337 for vnfr in db_vnfrs.values():
1338 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1339 break
1340 else:
1341 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1342
1343 return await self._instantiate_ng_ro(
1344 logging_text,
1345 nsr_id,
1346 nsd,
1347 db_nsr,
1348 db_nslcmop,
1349 db_vnfrs,
1350 db_vnfds,
1351 n2vc_key_list,
1352 stage,
1353 start_deploy,
1354 timeout_ns_deploy,
1355 )
1356 except Exception as e:
1357 stage[2] = "ERROR deploying at VIM"
1358 self.set_vnfr_at_error(db_vnfrs, str(e))
1359 self.logger.error(
1360 "Error deploying at VIM {}".format(e),
1361 exc_info=not isinstance(
1362 e,
1363 (
1364 ROclient.ROClientException,
1365 LcmException,
1366 DbException,
1367 NgRoException,
1368 ),
1369 ),
1370 )
1371 raise
1372
1373 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1374 """
1375 Wait for kdu to be up, get ip address
1376 :param logging_text: prefix use for logging
1377 :param nsr_id:
1378 :param vnfr_id:
1379 :param kdu_name:
1380 :return: IP address
1381 """
1382
1383 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1384 nb_tries = 0
1385
1386 while nb_tries < 360:
1387 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1388 kdur = next(
1389 (
1390 x
1391 for x in get_iterable(db_vnfr, "kdur")
1392 if x.get("kdu-name") == kdu_name
1393 ),
1394 None,
1395 )
1396 if not kdur:
1397 raise LcmException(
1398 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1399 )
1400 if kdur.get("status"):
1401 if kdur["status"] in ("READY", "ENABLED"):
1402 return kdur.get("ip-address")
1403 else:
1404 raise LcmException(
1405 "target KDU={} is in error state".format(kdu_name)
1406 )
1407
1408 await asyncio.sleep(10, loop=self.loop)
1409 nb_tries += 1
1410 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1411
1412 async def wait_vm_up_insert_key_ro(
1413 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1414 ):
1415 """
1416 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1417 :param logging_text: prefix use for logging
1418 :param nsr_id:
1419 :param vnfr_id:
1420 :param vdu_id:
1421 :param vdu_index:
1422 :param pub_key: public ssh key to inject, None to skip
1423 :param user: user to apply the public ssh key
1424 :return: IP address
1425 """
1426
1427 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1428 ro_nsr_id = None
1429 ip_address = None
1430 nb_tries = 0
1431 target_vdu_id = None
1432 ro_retries = 0
1433
1434 while True:
1435
1436 ro_retries += 1
1437 if ro_retries >= 360: # 1 hour
1438 raise LcmException(
1439 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1440 )
1441
1442 await asyncio.sleep(10, loop=self.loop)
1443
1444 # get ip address
1445 if not target_vdu_id:
1446 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1447
1448 if not vdu_id: # for the VNF case
1449 if db_vnfr.get("status") == "ERROR":
1450 raise LcmException(
1451 "Cannot inject ssh-key because target VNF is in error state"
1452 )
1453 ip_address = db_vnfr.get("ip-address")
1454 if not ip_address:
1455 continue
1456 vdur = next(
1457 (
1458 x
1459 for x in get_iterable(db_vnfr, "vdur")
1460 if x.get("ip-address") == ip_address
1461 ),
1462 None,
1463 )
1464 else: # VDU case
1465 vdur = next(
1466 (
1467 x
1468 for x in get_iterable(db_vnfr, "vdur")
1469 if x.get("vdu-id-ref") == vdu_id
1470 and x.get("count-index") == vdu_index
1471 ),
1472 None,
1473 )
1474
1475 if (
1476 not vdur and len(db_vnfr.get("vdur", ())) == 1
1477 ): # If only one, this should be the target vdu
1478 vdur = db_vnfr["vdur"][0]
1479 if not vdur:
1480 raise LcmException(
1481 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1482 vnfr_id, vdu_id, vdu_index
1483 )
1484 )
1485 # New generation RO stores information at "vim_info"
1486 ng_ro_status = None
1487 target_vim = None
1488 if vdur.get("vim_info"):
1489 target_vim = next(
1490 t for t in vdur["vim_info"]
1491 ) # there should be only one key
1492 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1493 if (
1494 vdur.get("pdu-type")
1495 or vdur.get("status") == "ACTIVE"
1496 or ng_ro_status == "ACTIVE"
1497 ):
1498 ip_address = vdur.get("ip-address")
1499 if not ip_address:
1500 continue
1501 target_vdu_id = vdur["vdu-id-ref"]
1502 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1503 raise LcmException(
1504 "Cannot inject ssh-key because target VM is in error state"
1505 )
1506
1507 if not target_vdu_id:
1508 continue
1509
1510 # inject public key into machine
1511 if pub_key and user:
1512 self.logger.debug(logging_text + "Inserting RO key")
1513 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1514 if vdur.get("pdu-type"):
1515 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1516 return ip_address
1517 try:
1518 ro_vm_id = "{}-{}".format(
1519 db_vnfr["member-vnf-index-ref"], target_vdu_id
1520 ) # TODO add vdu_index
1521 if self.ng_ro:
1522 target = {
1523 "action": {
1524 "action": "inject_ssh_key",
1525 "key": pub_key,
1526 "user": user,
1527 },
1528 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1529 }
1530 desc = await self.RO.deploy(nsr_id, target)
1531 action_id = desc["action_id"]
1532 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1533 break
1534 else:
1535 # wait until NS is deployed at RO
1536 if not ro_nsr_id:
1537 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1538 ro_nsr_id = deep_get(
1539 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1540 )
1541 if not ro_nsr_id:
1542 continue
1543 result_dict = await self.RO.create_action(
1544 item="ns",
1545 item_id_name=ro_nsr_id,
1546 descriptor={
1547 "add_public_key": pub_key,
1548 "vms": [ro_vm_id],
1549 "user": user,
1550 },
1551 )
1552 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1553 if not result_dict or not isinstance(result_dict, dict):
1554 raise LcmException(
1555 "Unknown response from RO when injecting key"
1556 )
1557 for result in result_dict.values():
1558 if result.get("vim_result") == 200:
1559 break
1560 else:
1561 raise ROclient.ROClientException(
1562 "error injecting key: {}".format(
1563 result.get("description")
1564 )
1565 )
1566 break
1567 except NgRoException as e:
1568 raise LcmException(
1569 "Reaching max tries injecting key. Error: {}".format(e)
1570 )
1571 except ROclient.ROClientException as e:
1572 if not nb_tries:
1573 self.logger.debug(
1574 logging_text
1575 + "error injecting key: {}. Retrying until {} seconds".format(
1576 e, 20 * 10
1577 )
1578 )
1579 nb_tries += 1
1580 if nb_tries >= 20:
1581 raise LcmException(
1582 "Reaching max tries injecting key. Error: {}".format(e)
1583 )
1584 else:
1585 break
1586
1587 return ip_address
1588
1589 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1590 """
1591 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1592 """
1593 my_vca = vca_deployed_list[vca_index]
1594 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1595 # vdu or kdu: no dependencies
1596 return
1597 timeout = 300
1598 while timeout >= 0:
1599 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1600 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1601 configuration_status_list = db_nsr["configurationStatus"]
1602 for index, vca_deployed in enumerate(configuration_status_list):
1603 if index == vca_index:
1604 # myself
1605 continue
1606 if not my_vca.get("member-vnf-index") or (
1607 vca_deployed.get("member-vnf-index")
1608 == my_vca.get("member-vnf-index")
1609 ):
1610 internal_status = configuration_status_list[index].get("status")
1611 if internal_status == "READY":
1612 continue
1613 elif internal_status == "BROKEN":
1614 raise LcmException(
1615 "Configuration aborted because dependent charm/s has failed"
1616 )
1617 else:
1618 break
1619 else:
1620 # no dependencies, return
1621 return
1622 await asyncio.sleep(10)
1623 timeout -= 1
1624
1625 raise LcmException("Configuration aborted because dependent charm/s timeout")
1626
1627 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1628 vca_id = None
1629 if db_vnfr:
1630 vca_id = deep_get(db_vnfr, ("vca-id",))
1631 elif db_nsr:
1632 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1633 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1634 return vca_id
1635
1636 async def instantiate_N2VC(
1637 self,
1638 logging_text,
1639 vca_index,
1640 nsi_id,
1641 db_nsr,
1642 db_vnfr,
1643 vdu_id,
1644 kdu_name,
1645 vdu_index,
1646 config_descriptor,
1647 deploy_params,
1648 base_folder,
1649 nslcmop_id,
1650 stage,
1651 vca_type,
1652 vca_name,
1653 ee_config_descriptor,
1654 ):
1655 nsr_id = db_nsr["_id"]
1656 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1657 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1658 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1659 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1660 db_dict = {
1661 "collection": "nsrs",
1662 "filter": {"_id": nsr_id},
1663 "path": db_update_entry,
1664 }
1665 step = ""
1666 try:
1667
1668 element_type = "NS"
1669 element_under_configuration = nsr_id
1670
1671 vnfr_id = None
1672 if db_vnfr:
1673 vnfr_id = db_vnfr["_id"]
1674 osm_config["osm"]["vnf_id"] = vnfr_id
1675
1676 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1677
1678 if vca_type == "native_charm":
1679 index_number = 0
1680 else:
1681 index_number = vdu_index or 0
1682
1683 if vnfr_id:
1684 element_type = "VNF"
1685 element_under_configuration = vnfr_id
1686 namespace += ".{}-{}".format(vnfr_id, index_number)
1687 if vdu_id:
1688 namespace += ".{}-{}".format(vdu_id, index_number)
1689 element_type = "VDU"
1690 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1691 osm_config["osm"]["vdu_id"] = vdu_id
1692 elif kdu_name:
1693 namespace += ".{}".format(kdu_name)
1694 element_type = "KDU"
1695 element_under_configuration = kdu_name
1696 osm_config["osm"]["kdu_name"] = kdu_name
1697
1698 # Get artifact path
1699 artifact_path = "{}/{}/{}/{}".format(
1700 base_folder["folder"],
1701 base_folder["pkg-dir"],
1702 "charms"
1703 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1704 else "helm-charts",
1705 vca_name,
1706 )
1707
1708 self.logger.debug("Artifact path > {}".format(artifact_path))
1709
1710 # get initial_config_primitive_list that applies to this element
1711 initial_config_primitive_list = config_descriptor.get(
1712 "initial-config-primitive"
1713 )
1714
1715 self.logger.debug(
1716 "Initial config primitive list > {}".format(
1717 initial_config_primitive_list
1718 )
1719 )
1720
1721 # add config if not present for NS charm
1722 ee_descriptor_id = ee_config_descriptor.get("id")
1723 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1724 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1725 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1726 )
1727
1728 self.logger.debug(
1729 "Initial config primitive list #2 > {}".format(
1730 initial_config_primitive_list
1731 )
1732 )
1733 # n2vc_redesign STEP 3.1
1734 # find old ee_id if exists
1735 ee_id = vca_deployed.get("ee_id")
1736
1737 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1738 # create or register execution environment in VCA
1739 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1740
1741 self._write_configuration_status(
1742 nsr_id=nsr_id,
1743 vca_index=vca_index,
1744 status="CREATING",
1745 element_under_configuration=element_under_configuration,
1746 element_type=element_type,
1747 )
1748
1749 step = "create execution environment"
1750 self.logger.debug(logging_text + step)
1751
1752 ee_id = None
1753 credentials = None
1754 if vca_type == "k8s_proxy_charm":
1755 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1756 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1757 namespace=namespace,
1758 artifact_path=artifact_path,
1759 db_dict=db_dict,
1760 vca_id=vca_id,
1761 )
1762 elif vca_type == "helm" or vca_type == "helm-v3":
1763 ee_id, credentials = await self.vca_map[
1764 vca_type
1765 ].create_execution_environment(
1766 namespace=namespace,
1767 reuse_ee_id=ee_id,
1768 db_dict=db_dict,
1769 config=osm_config,
1770 artifact_path=artifact_path,
1771 vca_type=vca_type,
1772 )
1773 else:
1774 ee_id, credentials = await self.vca_map[
1775 vca_type
1776 ].create_execution_environment(
1777 namespace=namespace,
1778 reuse_ee_id=ee_id,
1779 db_dict=db_dict,
1780 vca_id=vca_id,
1781 )
1782
1783 elif vca_type == "native_charm":
1784 step = "Waiting to VM being up and getting IP address"
1785 self.logger.debug(logging_text + step)
1786 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1787 logging_text,
1788 nsr_id,
1789 vnfr_id,
1790 vdu_id,
1791 vdu_index,
1792 user=None,
1793 pub_key=None,
1794 )
1795 credentials = {"hostname": rw_mgmt_ip}
1796 # get username
1797 username = deep_get(
1798 config_descriptor, ("config-access", "ssh-access", "default-user")
1799 )
1800 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1801 # merged. Meanwhile let's get username from initial-config-primitive
1802 if not username and initial_config_primitive_list:
1803 for config_primitive in initial_config_primitive_list:
1804 for param in config_primitive.get("parameter", ()):
1805 if param["name"] == "ssh-username":
1806 username = param["value"]
1807 break
1808 if not username:
1809 raise LcmException(
1810 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1811 "'config-access.ssh-access.default-user'"
1812 )
1813 credentials["username"] = username
1814 # n2vc_redesign STEP 3.2
1815
1816 self._write_configuration_status(
1817 nsr_id=nsr_id,
1818 vca_index=vca_index,
1819 status="REGISTERING",
1820 element_under_configuration=element_under_configuration,
1821 element_type=element_type,
1822 )
1823
1824 step = "register execution environment {}".format(credentials)
1825 self.logger.debug(logging_text + step)
1826 ee_id = await self.vca_map[vca_type].register_execution_environment(
1827 credentials=credentials,
1828 namespace=namespace,
1829 db_dict=db_dict,
1830 vca_id=vca_id,
1831 )
1832
1833 # for compatibility with MON/POL modules, the need model and application name at database
1834 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1835 ee_id_parts = ee_id.split(".")
1836 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1837 if len(ee_id_parts) >= 2:
1838 model_name = ee_id_parts[0]
1839 application_name = ee_id_parts[1]
1840 db_nsr_update[db_update_entry + "model"] = model_name
1841 db_nsr_update[db_update_entry + "application"] = application_name
1842
1843 # n2vc_redesign STEP 3.3
1844 step = "Install configuration Software"
1845
1846 self._write_configuration_status(
1847 nsr_id=nsr_id,
1848 vca_index=vca_index,
1849 status="INSTALLING SW",
1850 element_under_configuration=element_under_configuration,
1851 element_type=element_type,
1852 other_update=db_nsr_update,
1853 )
1854
1855 # TODO check if already done
1856 self.logger.debug(logging_text + step)
1857 config = None
1858 if vca_type == "native_charm":
1859 config_primitive = next(
1860 (p for p in initial_config_primitive_list if p["name"] == "config"),
1861 None,
1862 )
1863 if config_primitive:
1864 config = self._map_primitive_params(
1865 config_primitive, {}, deploy_params
1866 )
1867 num_units = 1
1868 if vca_type == "lxc_proxy_charm":
1869 if element_type == "NS":
1870 num_units = db_nsr.get("config-units") or 1
1871 elif element_type == "VNF":
1872 num_units = db_vnfr.get("config-units") or 1
1873 elif element_type == "VDU":
1874 for v in db_vnfr["vdur"]:
1875 if vdu_id == v["vdu-id-ref"]:
1876 num_units = v.get("config-units") or 1
1877 break
1878 if vca_type != "k8s_proxy_charm":
1879 await self.vca_map[vca_type].install_configuration_sw(
1880 ee_id=ee_id,
1881 artifact_path=artifact_path,
1882 db_dict=db_dict,
1883 config=config,
1884 num_units=num_units,
1885 vca_id=vca_id,
1886 vca_type=vca_type,
1887 )
1888
1889 # write in db flag of configuration_sw already installed
1890 self.update_db_2(
1891 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1892 )
1893
1894 # add relations for this VCA (wait for other peers related with this VCA)
1895 await self._add_vca_relations(
1896 logging_text=logging_text,
1897 nsr_id=nsr_id,
1898 vca_index=vca_index,
1899 vca_id=vca_id,
1900 vca_type=vca_type,
1901 )
1902
1903 # if SSH access is required, then get execution environment SSH public
1904 # if native charm we have waited already to VM be UP
1905 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1906 pub_key = None
1907 user = None
1908 # self.logger.debug("get ssh key block")
1909 if deep_get(
1910 config_descriptor, ("config-access", "ssh-access", "required")
1911 ):
1912 # self.logger.debug("ssh key needed")
1913 # Needed to inject a ssh key
1914 user = deep_get(
1915 config_descriptor,
1916 ("config-access", "ssh-access", "default-user"),
1917 )
1918 step = "Install configuration Software, getting public ssh key"
1919 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1920 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1921 )
1922
1923 step = "Insert public key into VM user={} ssh_key={}".format(
1924 user, pub_key
1925 )
1926 else:
1927 # self.logger.debug("no need to get ssh key")
1928 step = "Waiting to VM being up and getting IP address"
1929 self.logger.debug(logging_text + step)
1930
1931 # n2vc_redesign STEP 5.1
1932 # wait for RO (ip-address) Insert pub_key into VM
1933 if vnfr_id:
1934 if kdu_name:
1935 rw_mgmt_ip = await self.wait_kdu_up(
1936 logging_text, nsr_id, vnfr_id, kdu_name
1937 )
1938 else:
1939 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1940 logging_text,
1941 nsr_id,
1942 vnfr_id,
1943 vdu_id,
1944 vdu_index,
1945 user=user,
1946 pub_key=pub_key,
1947 )
1948 else:
1949 rw_mgmt_ip = None # This is for a NS configuration
1950
1951 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1952
1953 # store rw_mgmt_ip in deploy params for later replacement
1954 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1955
1956 # n2vc_redesign STEP 6 Execute initial config primitive
1957 step = "execute initial config primitive"
1958
1959 # wait for dependent primitives execution (NS -> VNF -> VDU)
1960 if initial_config_primitive_list:
1961 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1962
1963 # stage, in function of element type: vdu, kdu, vnf or ns
1964 my_vca = vca_deployed_list[vca_index]
1965 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1966 # VDU or KDU
1967 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1968 elif my_vca.get("member-vnf-index"):
1969 # VNF
1970 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1971 else:
1972 # NS
1973 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1974
1975 self._write_configuration_status(
1976 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1977 )
1978
1979 self._write_op_status(op_id=nslcmop_id, stage=stage)
1980
1981 check_if_terminated_needed = True
1982 for initial_config_primitive in initial_config_primitive_list:
1983 # adding information on the vca_deployed if it is a NS execution environment
1984 if not vca_deployed["member-vnf-index"]:
1985 deploy_params["ns_config_info"] = json.dumps(
1986 self._get_ns_config_info(nsr_id)
1987 )
1988 # TODO check if already done
1989 primitive_params_ = self._map_primitive_params(
1990 initial_config_primitive, {}, deploy_params
1991 )
1992
1993 step = "execute primitive '{}' params '{}'".format(
1994 initial_config_primitive["name"], primitive_params_
1995 )
1996 self.logger.debug(logging_text + step)
1997 await self.vca_map[vca_type].exec_primitive(
1998 ee_id=ee_id,
1999 primitive_name=initial_config_primitive["name"],
2000 params_dict=primitive_params_,
2001 db_dict=db_dict,
2002 vca_id=vca_id,
2003 vca_type=vca_type,
2004 )
2005 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2006 if check_if_terminated_needed:
2007 if config_descriptor.get("terminate-config-primitive"):
2008 self.update_db_2(
2009 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2010 )
2011 check_if_terminated_needed = False
2012
2013 # TODO register in database that primitive is done
2014
2015 # STEP 7 Configure metrics
2016 if vca_type == "helm" or vca_type == "helm-v3":
2017 prometheus_jobs = await self.add_prometheus_metrics(
2018 ee_id=ee_id,
2019 artifact_path=artifact_path,
2020 ee_config_descriptor=ee_config_descriptor,
2021 vnfr_id=vnfr_id,
2022 nsr_id=nsr_id,
2023 target_ip=rw_mgmt_ip,
2024 )
2025 if prometheus_jobs:
2026 self.update_db_2(
2027 "nsrs",
2028 nsr_id,
2029 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2030 )
2031
2032 step = "instantiated at VCA"
2033 self.logger.debug(logging_text + step)
2034
2035 self._write_configuration_status(
2036 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2037 )
2038
2039 except Exception as e: # TODO not use Exception but N2VC exception
2040 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2041 if not isinstance(
2042 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2043 ):
2044 self.logger.error(
2045 "Exception while {} : {}".format(step, e), exc_info=True
2046 )
2047 self._write_configuration_status(
2048 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2049 )
2050 raise LcmException("{} {}".format(step, e)) from e
2051
2052 def _write_ns_status(
2053 self,
2054 nsr_id: str,
2055 ns_state: str,
2056 current_operation: str,
2057 current_operation_id: str,
2058 error_description: str = None,
2059 error_detail: str = None,
2060 other_update: dict = None,
2061 ):
2062 """
2063 Update db_nsr fields.
2064 :param nsr_id:
2065 :param ns_state:
2066 :param current_operation:
2067 :param current_operation_id:
2068 :param error_description:
2069 :param error_detail:
2070 :param other_update: Other required changes at database if provided, will be cleared
2071 :return:
2072 """
2073 try:
2074 db_dict = other_update or {}
2075 db_dict[
2076 "_admin.nslcmop"
2077 ] = current_operation_id # for backward compatibility
2078 db_dict["_admin.current-operation"] = current_operation_id
2079 db_dict["_admin.operation-type"] = (
2080 current_operation if current_operation != "IDLE" else None
2081 )
2082 db_dict["currentOperation"] = current_operation
2083 db_dict["currentOperationID"] = current_operation_id
2084 db_dict["errorDescription"] = error_description
2085 db_dict["errorDetail"] = error_detail
2086
2087 if ns_state:
2088 db_dict["nsState"] = ns_state
2089 self.update_db_2("nsrs", nsr_id, db_dict)
2090 except DbException as e:
2091 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2092
2093 def _write_op_status(
2094 self,
2095 op_id: str,
2096 stage: list = None,
2097 error_message: str = None,
2098 queuePosition: int = 0,
2099 operation_state: str = None,
2100 other_update: dict = None,
2101 ):
2102 try:
2103 db_dict = other_update or {}
2104 db_dict["queuePosition"] = queuePosition
2105 if isinstance(stage, list):
2106 db_dict["stage"] = stage[0]
2107 db_dict["detailed-status"] = " ".join(stage)
2108 elif stage is not None:
2109 db_dict["stage"] = str(stage)
2110
2111 if error_message is not None:
2112 db_dict["errorMessage"] = error_message
2113 if operation_state is not None:
2114 db_dict["operationState"] = operation_state
2115 db_dict["statusEnteredTime"] = time()
2116 self.update_db_2("nslcmops", op_id, db_dict)
2117 except DbException as e:
2118 self.logger.warn(
2119 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2120 )
2121
2122 def _write_all_config_status(self, db_nsr: dict, status: str):
2123 try:
2124 nsr_id = db_nsr["_id"]
2125 # configurationStatus
2126 config_status = db_nsr.get("configurationStatus")
2127 if config_status:
2128 db_nsr_update = {
2129 "configurationStatus.{}.status".format(index): status
2130 for index, v in enumerate(config_status)
2131 if v
2132 }
2133 # update status
2134 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2135
2136 except DbException as e:
2137 self.logger.warn(
2138 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2139 )
2140
2141 def _write_configuration_status(
2142 self,
2143 nsr_id: str,
2144 vca_index: int,
2145 status: str = None,
2146 element_under_configuration: str = None,
2147 element_type: str = None,
2148 other_update: dict = None,
2149 ):
2150
2151 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2152 # .format(vca_index, status))
2153
2154 try:
2155 db_path = "configurationStatus.{}.".format(vca_index)
2156 db_dict = other_update or {}
2157 if status:
2158 db_dict[db_path + "status"] = status
2159 if element_under_configuration:
2160 db_dict[
2161 db_path + "elementUnderConfiguration"
2162 ] = element_under_configuration
2163 if element_type:
2164 db_dict[db_path + "elementType"] = element_type
2165 self.update_db_2("nsrs", nsr_id, db_dict)
2166 except DbException as e:
2167 self.logger.warn(
2168 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2169 status, nsr_id, vca_index, e
2170 )
2171 )
2172
2173 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2174 """
2175 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2176 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2177 Database is used because the result can be obtained from a different LCM worker in case of HA.
2178 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2179 :param db_nslcmop: database content of nslcmop
2180 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2181 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2182 computed 'vim-account-id'
2183 """
2184 modified = False
2185 nslcmop_id = db_nslcmop["_id"]
2186 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2187 if placement_engine == "PLA":
2188 self.logger.debug(
2189 logging_text + "Invoke and wait for placement optimization"
2190 )
2191 await self.msg.aiowrite(
2192 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2193 )
2194 db_poll_interval = 5
2195 wait = db_poll_interval * 10
2196 pla_result = None
2197 while not pla_result and wait >= 0:
2198 await asyncio.sleep(db_poll_interval)
2199 wait -= db_poll_interval
2200 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2201 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2202
2203 if not pla_result:
2204 raise LcmException(
2205 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2206 )
2207
2208 for pla_vnf in pla_result["vnf"]:
2209 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2210 if not pla_vnf.get("vimAccountId") or not vnfr:
2211 continue
2212 modified = True
2213 self.db.set_one(
2214 "vnfrs",
2215 {"_id": vnfr["_id"]},
2216 {"vim-account-id": pla_vnf["vimAccountId"]},
2217 )
2218 # Modifies db_vnfrs
2219 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2220 return modified
2221
2222 def update_nsrs_with_pla_result(self, params):
2223 try:
2224 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2225 self.update_db_2(
2226 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2227 )
2228 except Exception as e:
2229 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2230
2231 async def instantiate(self, nsr_id, nslcmop_id):
2232 """
2233
2234 :param nsr_id: ns instance to deploy
2235 :param nslcmop_id: operation to run
2236 :return:
2237 """
2238
2239 # Try to lock HA task here
2240 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2241 if not task_is_locked_by_me:
2242 self.logger.debug(
2243 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2244 )
2245 return
2246
2247 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2248 self.logger.debug(logging_text + "Enter")
2249
2250 # get all needed from database
2251
2252 # database nsrs record
2253 db_nsr = None
2254
2255 # database nslcmops record
2256 db_nslcmop = None
2257
2258 # update operation on nsrs
2259 db_nsr_update = {}
2260 # update operation on nslcmops
2261 db_nslcmop_update = {}
2262
2263 nslcmop_operation_state = None
2264 db_vnfrs = {} # vnf's info indexed by member-index
2265 # n2vc_info = {}
2266 tasks_dict_info = {} # from task to info text
2267 exc = None
2268 error_list = []
2269 stage = [
2270 "Stage 1/5: preparation of the environment.",
2271 "Waiting for previous operations to terminate.",
2272 "",
2273 ]
2274 # ^ stage, step, VIM progress
2275 try:
2276 # wait for any previous tasks in process
2277 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2278
2279 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2280 stage[1] = "Reading from database."
2281 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2282 db_nsr_update["detailed-status"] = "creating"
2283 db_nsr_update["operational-status"] = "init"
2284 self._write_ns_status(
2285 nsr_id=nsr_id,
2286 ns_state="BUILDING",
2287 current_operation="INSTANTIATING",
2288 current_operation_id=nslcmop_id,
2289 other_update=db_nsr_update,
2290 )
2291 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2292
2293 # read from db: operation
2294 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2295 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2296 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2297 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2298 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2299 )
2300 ns_params = db_nslcmop.get("operationParams")
2301 if ns_params and ns_params.get("timeout_ns_deploy"):
2302 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2303 else:
2304 timeout_ns_deploy = self.timeout.get(
2305 "ns_deploy", self.timeout_ns_deploy
2306 )
2307
2308 # read from db: ns
2309 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2310 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2311 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2312 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2313 self.fs.sync(db_nsr["nsd-id"])
2314 db_nsr["nsd"] = nsd
2315 # nsr_name = db_nsr["name"] # TODO short-name??
2316
2317 # read from db: vnf's of this ns
2318 stage[1] = "Getting vnfrs from db."
2319 self.logger.debug(logging_text + stage[1])
2320 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2321
2322 # read from db: vnfd's for every vnf
2323 db_vnfds = [] # every vnfd data
2324
2325 # for each vnf in ns, read vnfd
2326 for vnfr in db_vnfrs_list:
2327 if vnfr.get("kdur"):
2328 kdur_list = []
2329 for kdur in vnfr["kdur"]:
2330 if kdur.get("additionalParams"):
2331 kdur["additionalParams"] = json.loads(
2332 kdur["additionalParams"]
2333 )
2334 kdur_list.append(kdur)
2335 vnfr["kdur"] = kdur_list
2336
2337 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2338 vnfd_id = vnfr["vnfd-id"]
2339 vnfd_ref = vnfr["vnfd-ref"]
2340 self.fs.sync(vnfd_id)
2341
2342 # if we haven't this vnfd, read it from db
2343 if vnfd_id not in db_vnfds:
2344 # read from db
2345 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2346 vnfd_id, vnfd_ref
2347 )
2348 self.logger.debug(logging_text + stage[1])
2349 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2350
2351 # store vnfd
2352 db_vnfds.append(vnfd)
2353
2354 # Get or generates the _admin.deployed.VCA list
2355 vca_deployed_list = None
2356 if db_nsr["_admin"].get("deployed"):
2357 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2358 if vca_deployed_list is None:
2359 vca_deployed_list = []
2360 configuration_status_list = []
2361 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2362 db_nsr_update["configurationStatus"] = configuration_status_list
2363 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2364 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2365 elif isinstance(vca_deployed_list, dict):
2366 # maintain backward compatibility. Change a dict to list at database
2367 vca_deployed_list = list(vca_deployed_list.values())
2368 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2369 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2370
2371 if not isinstance(
2372 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2373 ):
2374 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2375 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2376
2377 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2378 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2379 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2380 self.db.set_list(
2381 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2382 )
2383
2384 # n2vc_redesign STEP 2 Deploy Network Scenario
2385 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2386 self._write_op_status(op_id=nslcmop_id, stage=stage)
2387
2388 stage[1] = "Deploying KDUs."
2389 # self.logger.debug(logging_text + "Before deploy_kdus")
2390 # Call to deploy_kdus in case exists the "vdu:kdu" param
2391 await self.deploy_kdus(
2392 logging_text=logging_text,
2393 nsr_id=nsr_id,
2394 nslcmop_id=nslcmop_id,
2395 db_vnfrs=db_vnfrs,
2396 db_vnfds=db_vnfds,
2397 task_instantiation_info=tasks_dict_info,
2398 )
2399
2400 stage[1] = "Getting VCA public key."
2401 # n2vc_redesign STEP 1 Get VCA public ssh-key
2402 # feature 1429. Add n2vc public key to needed VMs
2403 n2vc_key = self.n2vc.get_public_key()
2404 n2vc_key_list = [n2vc_key]
2405 if self.vca_config.get("public_key"):
2406 n2vc_key_list.append(self.vca_config["public_key"])
2407
2408 stage[1] = "Deploying NS at VIM."
2409 task_ro = asyncio.ensure_future(
2410 self.instantiate_RO(
2411 logging_text=logging_text,
2412 nsr_id=nsr_id,
2413 nsd=nsd,
2414 db_nsr=db_nsr,
2415 db_nslcmop=db_nslcmop,
2416 db_vnfrs=db_vnfrs,
2417 db_vnfds=db_vnfds,
2418 n2vc_key_list=n2vc_key_list,
2419 stage=stage,
2420 )
2421 )
2422 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2423 tasks_dict_info[task_ro] = "Deploying at VIM"
2424
2425 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2426 stage[1] = "Deploying Execution Environments."
2427 self.logger.debug(logging_text + stage[1])
2428
2429 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2430 for vnf_profile in get_vnf_profiles(nsd):
2431 vnfd_id = vnf_profile["vnfd-id"]
2432 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2433 member_vnf_index = str(vnf_profile["id"])
2434 db_vnfr = db_vnfrs[member_vnf_index]
2435 base_folder = vnfd["_admin"]["storage"]
2436 vdu_id = None
2437 vdu_index = 0
2438 vdu_name = None
2439 kdu_name = None
2440
2441 # Get additional parameters
2442 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2443 if db_vnfr.get("additionalParamsForVnf"):
2444 deploy_params.update(
2445 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2446 )
2447
2448 descriptor_config = get_configuration(vnfd, vnfd["id"])
2449 if descriptor_config:
2450 self._deploy_n2vc(
2451 logging_text=logging_text
2452 + "member_vnf_index={} ".format(member_vnf_index),
2453 db_nsr=db_nsr,
2454 db_vnfr=db_vnfr,
2455 nslcmop_id=nslcmop_id,
2456 nsr_id=nsr_id,
2457 nsi_id=nsi_id,
2458 vnfd_id=vnfd_id,
2459 vdu_id=vdu_id,
2460 kdu_name=kdu_name,
2461 member_vnf_index=member_vnf_index,
2462 vdu_index=vdu_index,
2463 vdu_name=vdu_name,
2464 deploy_params=deploy_params,
2465 descriptor_config=descriptor_config,
2466 base_folder=base_folder,
2467 task_instantiation_info=tasks_dict_info,
2468 stage=stage,
2469 )
2470
2471 # Deploy charms for each VDU that supports one.
2472 for vdud in get_vdu_list(vnfd):
2473 vdu_id = vdud["id"]
2474 descriptor_config = get_configuration(vnfd, vdu_id)
2475 vdur = find_in_list(
2476 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2477 )
2478
2479 if vdur.get("additionalParams"):
2480 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2481 else:
2482 deploy_params_vdu = deploy_params
2483 deploy_params_vdu["OSM"] = get_osm_params(
2484 db_vnfr, vdu_id, vdu_count_index=0
2485 )
2486 vdud_count = get_number_of_instances(vnfd, vdu_id)
2487
2488 self.logger.debug("VDUD > {}".format(vdud))
2489 self.logger.debug(
2490 "Descriptor config > {}".format(descriptor_config)
2491 )
2492 if descriptor_config:
2493 vdu_name = None
2494 kdu_name = None
2495 for vdu_index in range(vdud_count):
2496 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2497 self._deploy_n2vc(
2498 logging_text=logging_text
2499 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2500 member_vnf_index, vdu_id, vdu_index
2501 ),
2502 db_nsr=db_nsr,
2503 db_vnfr=db_vnfr,
2504 nslcmop_id=nslcmop_id,
2505 nsr_id=nsr_id,
2506 nsi_id=nsi_id,
2507 vnfd_id=vnfd_id,
2508 vdu_id=vdu_id,
2509 kdu_name=kdu_name,
2510 member_vnf_index=member_vnf_index,
2511 vdu_index=vdu_index,
2512 vdu_name=vdu_name,
2513 deploy_params=deploy_params_vdu,
2514 descriptor_config=descriptor_config,
2515 base_folder=base_folder,
2516 task_instantiation_info=tasks_dict_info,
2517 stage=stage,
2518 )
2519 for kdud in get_kdu_list(vnfd):
2520 kdu_name = kdud["name"]
2521 descriptor_config = get_configuration(vnfd, kdu_name)
2522 if descriptor_config:
2523 vdu_id = None
2524 vdu_index = 0
2525 vdu_name = None
2526 kdur = next(
2527 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2528 )
2529 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2530 if kdur.get("additionalParams"):
2531 deploy_params_kdu.update(
2532 parse_yaml_strings(kdur["additionalParams"].copy())
2533 )
2534
2535 self._deploy_n2vc(
2536 logging_text=logging_text,
2537 db_nsr=db_nsr,
2538 db_vnfr=db_vnfr,
2539 nslcmop_id=nslcmop_id,
2540 nsr_id=nsr_id,
2541 nsi_id=nsi_id,
2542 vnfd_id=vnfd_id,
2543 vdu_id=vdu_id,
2544 kdu_name=kdu_name,
2545 member_vnf_index=member_vnf_index,
2546 vdu_index=vdu_index,
2547 vdu_name=vdu_name,
2548 deploy_params=deploy_params_kdu,
2549 descriptor_config=descriptor_config,
2550 base_folder=base_folder,
2551 task_instantiation_info=tasks_dict_info,
2552 stage=stage,
2553 )
2554
2555 # Check if this NS has a charm configuration
2556 descriptor_config = nsd.get("ns-configuration")
2557 if descriptor_config and descriptor_config.get("juju"):
2558 vnfd_id = None
2559 db_vnfr = None
2560 member_vnf_index = None
2561 vdu_id = None
2562 kdu_name = None
2563 vdu_index = 0
2564 vdu_name = None
2565
2566 # Get additional parameters
2567 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2568 if db_nsr.get("additionalParamsForNs"):
2569 deploy_params.update(
2570 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2571 )
2572 base_folder = nsd["_admin"]["storage"]
2573 self._deploy_n2vc(
2574 logging_text=logging_text,
2575 db_nsr=db_nsr,
2576 db_vnfr=db_vnfr,
2577 nslcmop_id=nslcmop_id,
2578 nsr_id=nsr_id,
2579 nsi_id=nsi_id,
2580 vnfd_id=vnfd_id,
2581 vdu_id=vdu_id,
2582 kdu_name=kdu_name,
2583 member_vnf_index=member_vnf_index,
2584 vdu_index=vdu_index,
2585 vdu_name=vdu_name,
2586 deploy_params=deploy_params,
2587 descriptor_config=descriptor_config,
2588 base_folder=base_folder,
2589 task_instantiation_info=tasks_dict_info,
2590 stage=stage,
2591 )
2592
2593 # rest of staff will be done at finally
2594
2595 except (
2596 ROclient.ROClientException,
2597 DbException,
2598 LcmException,
2599 N2VCException,
2600 ) as e:
2601 self.logger.error(
2602 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2603 )
2604 exc = e
2605 except asyncio.CancelledError:
2606 self.logger.error(
2607 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2608 )
2609 exc = "Operation was cancelled"
2610 except Exception as e:
2611 exc = traceback.format_exc()
2612 self.logger.critical(
2613 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2614 exc_info=True,
2615 )
2616 finally:
2617 if exc:
2618 error_list.append(str(exc))
2619 try:
2620 # wait for pending tasks
2621 if tasks_dict_info:
2622 stage[1] = "Waiting for instantiate pending tasks."
2623 self.logger.debug(logging_text + stage[1])
2624 error_list += await self._wait_for_tasks(
2625 logging_text,
2626 tasks_dict_info,
2627 timeout_ns_deploy,
2628 stage,
2629 nslcmop_id,
2630 nsr_id=nsr_id,
2631 )
2632 stage[1] = stage[2] = ""
2633 except asyncio.CancelledError:
2634 error_list.append("Cancelled")
2635 # TODO cancel all tasks
2636 except Exception as exc:
2637 error_list.append(str(exc))
2638
2639 # update operation-status
2640 db_nsr_update["operational-status"] = "running"
2641 # let's begin with VCA 'configured' status (later we can change it)
2642 db_nsr_update["config-status"] = "configured"
2643 for task, task_name in tasks_dict_info.items():
2644 if not task.done() or task.cancelled() or task.exception():
2645 if task_name.startswith(self.task_name_deploy_vca):
2646 # A N2VC task is pending
2647 db_nsr_update["config-status"] = "failed"
2648 else:
2649 # RO or KDU task is pending
2650 db_nsr_update["operational-status"] = "failed"
2651
2652 # update status at database
2653 if error_list:
2654 error_detail = ". ".join(error_list)
2655 self.logger.error(logging_text + error_detail)
2656 error_description_nslcmop = "{} Detail: {}".format(
2657 stage[0], error_detail
2658 )
2659 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2660 nslcmop_id, stage[0]
2661 )
2662
2663 db_nsr_update["detailed-status"] = (
2664 error_description_nsr + " Detail: " + error_detail
2665 )
2666 db_nslcmop_update["detailed-status"] = error_detail
2667 nslcmop_operation_state = "FAILED"
2668 ns_state = "BROKEN"
2669 else:
2670 error_detail = None
2671 error_description_nsr = error_description_nslcmop = None
2672 ns_state = "READY"
2673 db_nsr_update["detailed-status"] = "Done"
2674 db_nslcmop_update["detailed-status"] = "Done"
2675 nslcmop_operation_state = "COMPLETED"
2676
2677 if db_nsr:
2678 self._write_ns_status(
2679 nsr_id=nsr_id,
2680 ns_state=ns_state,
2681 current_operation="IDLE",
2682 current_operation_id=None,
2683 error_description=error_description_nsr,
2684 error_detail=error_detail,
2685 other_update=db_nsr_update,
2686 )
2687 self._write_op_status(
2688 op_id=nslcmop_id,
2689 stage="",
2690 error_message=error_description_nslcmop,
2691 operation_state=nslcmop_operation_state,
2692 other_update=db_nslcmop_update,
2693 )
2694
2695 if nslcmop_operation_state:
2696 try:
2697 await self.msg.aiowrite(
2698 "ns",
2699 "instantiated",
2700 {
2701 "nsr_id": nsr_id,
2702 "nslcmop_id": nslcmop_id,
2703 "operationState": nslcmop_operation_state,
2704 },
2705 loop=self.loop,
2706 )
2707 except Exception as e:
2708 self.logger.error(
2709 logging_text + "kafka_write notification Exception {}".format(e)
2710 )
2711
2712 self.logger.debug(logging_text + "Exit")
2713 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2714
2715 async def _add_vca_relations(
2716 self,
2717 logging_text,
2718 nsr_id,
2719 vca_index: int,
2720 timeout: int = 3600,
2721 vca_type: str = None,
2722 vca_id: str = None,
2723 ) -> bool:
2724
2725 # steps:
2726 # 1. find all relations for this VCA
2727 # 2. wait for other peers related
2728 # 3. add relations
2729
2730 try:
2731 vca_type = vca_type or "lxc_proxy_charm"
2732
2733 # STEP 1: find all relations for this VCA
2734
2735 # read nsr record
2736 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2737 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2738
2739 # this VCA data
2740 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2741
2742 # read all ns-configuration relations
2743 ns_relations = list()
2744 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2745 if db_ns_relations:
2746 for r in db_ns_relations:
2747 # check if this VCA is in the relation
2748 if my_vca.get("member-vnf-index") in (
2749 r.get("entities")[0].get("id"),
2750 r.get("entities")[1].get("id"),
2751 ):
2752 ns_relations.append(r)
2753
2754 # read all vnf-configuration relations
2755 vnf_relations = list()
2756 db_vnfd_list = db_nsr.get("vnfd-id")
2757 if db_vnfd_list:
2758 for vnfd in db_vnfd_list:
2759 db_vnf_relations = None
2760 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2761 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2762 if db_vnf_configuration:
2763 db_vnf_relations = db_vnf_configuration.get("relation", [])
2764 if db_vnf_relations:
2765 for r in db_vnf_relations:
2766 # check if this VCA is in the relation
2767 if my_vca.get("vdu_id") in (
2768 r.get("entities")[0].get("id"),
2769 r.get("entities")[1].get("id"),
2770 ):
2771 vnf_relations.append(r)
2772
2773 # if no relations, terminate
2774 if not ns_relations and not vnf_relations:
2775 self.logger.debug(logging_text + " No relations")
2776 return True
2777
2778 self.logger.debug(
2779 logging_text
2780 + " adding relations\n {}\n {}".format(
2781 ns_relations, vnf_relations
2782 )
2783 )
2784
2785 # add all relations
2786 start = time()
2787 while True:
2788 # check timeout
2789 now = time()
2790 if now - start >= timeout:
2791 self.logger.error(logging_text + " : timeout adding relations")
2792 return False
2793
2794 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2795 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2796
2797 # for each defined NS relation, find the VCA's related
2798 for r in ns_relations.copy():
2799 from_vca_ee_id = None
2800 to_vca_ee_id = None
2801 from_vca_endpoint = None
2802 to_vca_endpoint = None
2803 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2804 for vca in vca_list:
2805 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2806 "id"
2807 ) and vca.get("config_sw_installed"):
2808 from_vca_ee_id = vca.get("ee_id")
2809 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2810 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2811 "id"
2812 ) and vca.get("config_sw_installed"):
2813 to_vca_ee_id = vca.get("ee_id")
2814 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2815 if from_vca_ee_id and to_vca_ee_id:
2816 # add relation
2817 await self.vca_map[vca_type].add_relation(
2818 ee_id_1=from_vca_ee_id,
2819 ee_id_2=to_vca_ee_id,
2820 endpoint_1=from_vca_endpoint,
2821 endpoint_2=to_vca_endpoint,
2822 vca_id=vca_id,
2823 )
2824 # remove entry from relations list
2825 ns_relations.remove(r)
2826 else:
2827 # check failed peers
2828 try:
2829 vca_status_list = db_nsr.get("configurationStatus")
2830 if vca_status_list:
2831 for i in range(len(vca_list)):
2832 vca = vca_list[i]
2833 vca_status = vca_status_list[i]
2834 if vca.get("member-vnf-index") == r.get("entities")[
2835 0
2836 ].get("id"):
2837 if vca_status.get("status") == "BROKEN":
2838 # peer broken: remove relation from list
2839 ns_relations.remove(r)
2840 if vca.get("member-vnf-index") == r.get("entities")[
2841 1
2842 ].get("id"):
2843 if vca_status.get("status") == "BROKEN":
2844 # peer broken: remove relation from list
2845 ns_relations.remove(r)
2846 except Exception:
2847 # ignore
2848 pass
2849
2850 # for each defined VNF relation, find the VCA's related
2851 for r in vnf_relations.copy():
2852 from_vca_ee_id = None
2853 to_vca_ee_id = None
2854 from_vca_endpoint = None
2855 to_vca_endpoint = None
2856 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2857 for vca in vca_list:
2858 key_to_check = "vdu_id"
2859 if vca.get("vdu_id") is None:
2860 key_to_check = "vnfd_id"
2861 if vca.get(key_to_check) == r.get("entities")[0].get(
2862 "id"
2863 ) and vca.get("config_sw_installed"):
2864 from_vca_ee_id = vca.get("ee_id")
2865 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2866 if vca.get(key_to_check) == r.get("entities")[1].get(
2867 "id"
2868 ) and vca.get("config_sw_installed"):
2869 to_vca_ee_id = vca.get("ee_id")
2870 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2871 if from_vca_ee_id and to_vca_ee_id:
2872 # add relation
2873 await self.vca_map[vca_type].add_relation(
2874 ee_id_1=from_vca_ee_id,
2875 ee_id_2=to_vca_ee_id,
2876 endpoint_1=from_vca_endpoint,
2877 endpoint_2=to_vca_endpoint,
2878 vca_id=vca_id,
2879 )
2880 # remove entry from relations list
2881 vnf_relations.remove(r)
2882 else:
2883 # check failed peers
2884 try:
2885 vca_status_list = db_nsr.get("configurationStatus")
2886 if vca_status_list:
2887 for i in range(len(vca_list)):
2888 vca = vca_list[i]
2889 vca_status = vca_status_list[i]
2890 if vca.get("vdu_id") == r.get("entities")[0].get(
2891 "id"
2892 ):
2893 if vca_status.get("status") == "BROKEN":
2894 # peer broken: remove relation from list
2895 vnf_relations.remove(r)
2896 if vca.get("vdu_id") == r.get("entities")[1].get(
2897 "id"
2898 ):
2899 if vca_status.get("status") == "BROKEN":
2900 # peer broken: remove relation from list
2901 vnf_relations.remove(r)
2902 except Exception:
2903 # ignore
2904 pass
2905
2906 # wait for next try
2907 await asyncio.sleep(5.0)
2908
2909 if not ns_relations and not vnf_relations:
2910 self.logger.debug("Relations added")
2911 break
2912
2913 return True
2914
2915 except Exception as e:
2916 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2917 return False
2918
2919 async def _install_kdu(
2920 self,
2921 nsr_id: str,
2922 nsr_db_path: str,
2923 vnfr_data: dict,
2924 kdu_index: int,
2925 kdud: dict,
2926 vnfd: dict,
2927 k8s_instance_info: dict,
2928 k8params: dict = None,
2929 timeout: int = 600,
2930 vca_id: str = None,
2931 ):
2932
2933 try:
2934 k8sclustertype = k8s_instance_info["k8scluster-type"]
2935 # Instantiate kdu
2936 db_dict_install = {
2937 "collection": "nsrs",
2938 "filter": {"_id": nsr_id},
2939 "path": nsr_db_path,
2940 }
2941
2942 if k8s_instance_info.get("kdu-deployment-name"):
2943 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
2944 else:
2945 kdu_instance = self.k8scluster_map[
2946 k8sclustertype
2947 ].generate_kdu_instance_name(
2948 db_dict=db_dict_install,
2949 kdu_model=k8s_instance_info["kdu-model"],
2950 kdu_name=k8s_instance_info["kdu-name"],
2951 )
2952 self.update_db_2(
2953 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2954 )
2955 await self.k8scluster_map[k8sclustertype].install(
2956 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2957 kdu_model=k8s_instance_info["kdu-model"],
2958 atomic=True,
2959 params=k8params,
2960 db_dict=db_dict_install,
2961 timeout=timeout,
2962 kdu_name=k8s_instance_info["kdu-name"],
2963 namespace=k8s_instance_info["namespace"],
2964 kdu_instance=kdu_instance,
2965 vca_id=vca_id,
2966 )
2967 self.update_db_2(
2968 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2969 )
2970
2971 # Obtain services to obtain management service ip
2972 services = await self.k8scluster_map[k8sclustertype].get_services(
2973 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2974 kdu_instance=kdu_instance,
2975 namespace=k8s_instance_info["namespace"],
2976 )
2977
2978 # Obtain management service info (if exists)
2979 vnfr_update_dict = {}
2980 kdu_config = get_configuration(vnfd, kdud["name"])
2981 if kdu_config:
2982 target_ee_list = kdu_config.get("execution-environment-list", [])
2983 else:
2984 target_ee_list = []
2985
2986 if services:
2987 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2988 mgmt_services = [
2989 service
2990 for service in kdud.get("service", [])
2991 if service.get("mgmt-service")
2992 ]
2993 for mgmt_service in mgmt_services:
2994 for service in services:
2995 if service["name"].startswith(mgmt_service["name"]):
2996 # Mgmt service found, Obtain service ip
2997 ip = service.get("external_ip", service.get("cluster_ip"))
2998 if isinstance(ip, list) and len(ip) == 1:
2999 ip = ip[0]
3000
3001 vnfr_update_dict[
3002 "kdur.{}.ip-address".format(kdu_index)
3003 ] = ip
3004
3005 # Check if must update also mgmt ip at the vnf
3006 service_external_cp = mgmt_service.get(
3007 "external-connection-point-ref"
3008 )
3009 if service_external_cp:
3010 if (
3011 deep_get(vnfd, ("mgmt-interface", "cp"))
3012 == service_external_cp
3013 ):
3014 vnfr_update_dict["ip-address"] = ip
3015
3016 if find_in_list(
3017 target_ee_list,
3018 lambda ee: ee.get(
3019 "external-connection-point-ref", ""
3020 )
3021 == service_external_cp,
3022 ):
3023 vnfr_update_dict[
3024 "kdur.{}.ip-address".format(kdu_index)
3025 ] = ip
3026 break
3027 else:
3028 self.logger.warn(
3029 "Mgmt service name: {} not found".format(
3030 mgmt_service["name"]
3031 )
3032 )
3033
3034 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3035 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3036
3037 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3038 if (
3039 kdu_config
3040 and kdu_config.get("initial-config-primitive")
3041 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3042 ):
3043 initial_config_primitive_list = kdu_config.get(
3044 "initial-config-primitive"
3045 )
3046 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3047
3048 for initial_config_primitive in initial_config_primitive_list:
3049 primitive_params_ = self._map_primitive_params(
3050 initial_config_primitive, {}, {}
3051 )
3052
3053 await asyncio.wait_for(
3054 self.k8scluster_map[k8sclustertype].exec_primitive(
3055 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3056 kdu_instance=kdu_instance,
3057 primitive_name=initial_config_primitive["name"],
3058 params=primitive_params_,
3059 db_dict=db_dict_install,
3060 vca_id=vca_id,
3061 ),
3062 timeout=timeout,
3063 )
3064
3065 except Exception as e:
3066 # Prepare update db with error and raise exception
3067 try:
3068 self.update_db_2(
3069 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3070 )
3071 self.update_db_2(
3072 "vnfrs",
3073 vnfr_data.get("_id"),
3074 {"kdur.{}.status".format(kdu_index): "ERROR"},
3075 )
3076 except Exception:
3077 # ignore to keep original exception
3078 pass
3079 # reraise original error
3080 raise
3081
3082 return kdu_instance
3083
3084 async def deploy_kdus(
3085 self,
3086 logging_text,
3087 nsr_id,
3088 nslcmop_id,
3089 db_vnfrs,
3090 db_vnfds,
3091 task_instantiation_info,
3092 ):
3093 # Launch kdus if present in the descriptor
3094
3095 k8scluster_id_2_uuic = {
3096 "helm-chart-v3": {},
3097 "helm-chart": {},
3098 "juju-bundle": {},
3099 }
3100
3101 async def _get_cluster_id(cluster_id, cluster_type):
3102 nonlocal k8scluster_id_2_uuic
3103 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3104 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3105
3106 # check if K8scluster is creating and wait look if previous tasks in process
3107 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3108 "k8scluster", cluster_id
3109 )
3110 if task_dependency:
3111 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3112 task_name, cluster_id
3113 )
3114 self.logger.debug(logging_text + text)
3115 await asyncio.wait(task_dependency, timeout=3600)
3116
3117 db_k8scluster = self.db.get_one(
3118 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3119 )
3120 if not db_k8scluster:
3121 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3122
3123 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3124 if not k8s_id:
3125 if cluster_type == "helm-chart-v3":
3126 try:
3127 # backward compatibility for existing clusters that have not been initialized for helm v3
3128 k8s_credentials = yaml.safe_dump(
3129 db_k8scluster.get("credentials")
3130 )
3131 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3132 k8s_credentials, reuse_cluster_uuid=cluster_id
3133 )
3134 db_k8scluster_update = {}
3135 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3136 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3137 db_k8scluster_update[
3138 "_admin.helm-chart-v3.created"
3139 ] = uninstall_sw
3140 db_k8scluster_update[
3141 "_admin.helm-chart-v3.operationalState"
3142 ] = "ENABLED"
3143 self.update_db_2(
3144 "k8sclusters", cluster_id, db_k8scluster_update
3145 )
3146 except Exception as e:
3147 self.logger.error(
3148 logging_text
3149 + "error initializing helm-v3 cluster: {}".format(str(e))
3150 )
3151 raise LcmException(
3152 "K8s cluster '{}' has not been initialized for '{}'".format(
3153 cluster_id, cluster_type
3154 )
3155 )
3156 else:
3157 raise LcmException(
3158 "K8s cluster '{}' has not been initialized for '{}'".format(
3159 cluster_id, cluster_type
3160 )
3161 )
3162 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3163 return k8s_id
3164
3165 logging_text += "Deploy kdus: "
3166 step = ""
3167 try:
3168 db_nsr_update = {"_admin.deployed.K8s": []}
3169 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3170
3171 index = 0
3172 updated_cluster_list = []
3173 updated_v3_cluster_list = []
3174
3175 for vnfr_data in db_vnfrs.values():
3176 vca_id = self.get_vca_id(vnfr_data, {})
3177 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3178 # Step 0: Prepare and set parameters
3179 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3180 vnfd_id = vnfr_data.get("vnfd-id")
3181 vnfd_with_id = find_in_list(
3182 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3183 )
3184 kdud = next(
3185 kdud
3186 for kdud in vnfd_with_id["kdu"]
3187 if kdud["name"] == kdur["kdu-name"]
3188 )
3189 namespace = kdur.get("k8s-namespace")
3190 kdu_deployment_name = kdur.get("kdu-deployment-name")
3191 if kdur.get("helm-chart"):
3192 kdumodel = kdur["helm-chart"]
3193 # Default version: helm3, if helm-version is v2 assign v2
3194 k8sclustertype = "helm-chart-v3"
3195 self.logger.debug("kdur: {}".format(kdur))
3196 if (
3197 kdur.get("helm-version")
3198 and kdur.get("helm-version") == "v2"
3199 ):
3200 k8sclustertype = "helm-chart"
3201 elif kdur.get("juju-bundle"):
3202 kdumodel = kdur["juju-bundle"]
3203 k8sclustertype = "juju-bundle"
3204 else:
3205 raise LcmException(
3206 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3207 "juju-bundle. Maybe an old NBI version is running".format(
3208 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3209 )
3210 )
3211 # check if kdumodel is a file and exists
3212 try:
3213 vnfd_with_id = find_in_list(
3214 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3215 )
3216 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3217 if storage and storage.get(
3218 "pkg-dir"
3219 ): # may be not present if vnfd has not artifacts
3220 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3221 filename = "{}/{}/{}s/{}".format(
3222 storage["folder"],
3223 storage["pkg-dir"],
3224 k8sclustertype,
3225 kdumodel,
3226 )
3227 if self.fs.file_exists(
3228 filename, mode="file"
3229 ) or self.fs.file_exists(filename, mode="dir"):
3230 kdumodel = self.fs.path + filename
3231 except (asyncio.TimeoutError, asyncio.CancelledError):
3232 raise
3233 except Exception: # it is not a file
3234 pass
3235
3236 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3237 step = "Synchronize repos for k8s cluster '{}'".format(
3238 k8s_cluster_id
3239 )
3240 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3241
3242 # Synchronize repos
3243 if (
3244 k8sclustertype == "helm-chart"
3245 and cluster_uuid not in updated_cluster_list
3246 ) or (
3247 k8sclustertype == "helm-chart-v3"
3248 and cluster_uuid not in updated_v3_cluster_list
3249 ):
3250 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3251 self.k8scluster_map[k8sclustertype].synchronize_repos(
3252 cluster_uuid=cluster_uuid
3253 )
3254 )
3255 if del_repo_list or added_repo_dict:
3256 if k8sclustertype == "helm-chart":
3257 unset = {
3258 "_admin.helm_charts_added." + item: None
3259 for item in del_repo_list
3260 }
3261 updated = {
3262 "_admin.helm_charts_added." + item: name
3263 for item, name in added_repo_dict.items()
3264 }
3265 updated_cluster_list.append(cluster_uuid)
3266 elif k8sclustertype == "helm-chart-v3":
3267 unset = {
3268 "_admin.helm_charts_v3_added." + item: None
3269 for item in del_repo_list
3270 }
3271 updated = {
3272 "_admin.helm_charts_v3_added." + item: name
3273 for item, name in added_repo_dict.items()
3274 }
3275 updated_v3_cluster_list.append(cluster_uuid)
3276 self.logger.debug(
3277 logging_text + "repos synchronized on k8s cluster "
3278 "'{}' to_delete: {}, to_add: {}".format(
3279 k8s_cluster_id, del_repo_list, added_repo_dict
3280 )
3281 )
3282 self.db.set_one(
3283 "k8sclusters",
3284 {"_id": k8s_cluster_id},
3285 updated,
3286 unset=unset,
3287 )
3288
3289 # Instantiate kdu
3290 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3291 vnfr_data["member-vnf-index-ref"],
3292 kdur["kdu-name"],
3293 k8s_cluster_id,
3294 )
3295 k8s_instance_info = {
3296 "kdu-instance": None,
3297 "k8scluster-uuid": cluster_uuid,
3298 "k8scluster-type": k8sclustertype,
3299 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3300 "kdu-name": kdur["kdu-name"],
3301 "kdu-model": kdumodel,
3302 "namespace": namespace,
3303 "kdu-deployment-name": kdu_deployment_name,
3304 }
3305 db_path = "_admin.deployed.K8s.{}".format(index)
3306 db_nsr_update[db_path] = k8s_instance_info
3307 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3308 vnfd_with_id = find_in_list(
3309 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3310 )
3311 task = asyncio.ensure_future(
3312 self._install_kdu(
3313 nsr_id,
3314 db_path,
3315 vnfr_data,
3316 kdu_index,
3317 kdud,
3318 vnfd_with_id,
3319 k8s_instance_info,
3320 k8params=desc_params,
3321 timeout=600,
3322 vca_id=vca_id,
3323 )
3324 )
3325 self.lcm_tasks.register(
3326 "ns",
3327 nsr_id,
3328 nslcmop_id,
3329 "instantiate_KDU-{}".format(index),
3330 task,
3331 )
3332 task_instantiation_info[task] = "Deploying KDU {}".format(
3333 kdur["kdu-name"]
3334 )
3335
3336 index += 1
3337
3338 except (LcmException, asyncio.CancelledError):
3339 raise
3340 except Exception as e:
3341 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3342 if isinstance(e, (N2VCException, DbException)):
3343 self.logger.error(logging_text + msg)
3344 else:
3345 self.logger.critical(logging_text + msg, exc_info=True)
3346 raise LcmException(msg)
3347 finally:
3348 if db_nsr_update:
3349 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3350
3351 def _deploy_n2vc(
3352 self,
3353 logging_text,
3354 db_nsr,
3355 db_vnfr,
3356 nslcmop_id,
3357 nsr_id,
3358 nsi_id,
3359 vnfd_id,
3360 vdu_id,
3361 kdu_name,
3362 member_vnf_index,
3363 vdu_index,
3364 vdu_name,
3365 deploy_params,
3366 descriptor_config,
3367 base_folder,
3368 task_instantiation_info,
3369 stage,
3370 ):
3371 # launch instantiate_N2VC in a asyncio task and register task object
3372 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3373 # if not found, create one entry and update database
3374 # fill db_nsr._admin.deployed.VCA.<index>
3375
3376 self.logger.debug(
3377 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3378 )
3379 if "execution-environment-list" in descriptor_config:
3380 ee_list = descriptor_config.get("execution-environment-list", [])
3381 elif "juju" in descriptor_config:
3382 ee_list = [descriptor_config] # ns charms
3383 else: # other types as script are not supported
3384 ee_list = []
3385
3386 for ee_item in ee_list:
3387 self.logger.debug(
3388 logging_text
3389 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3390 ee_item.get("juju"), ee_item.get("helm-chart")
3391 )
3392 )
3393 ee_descriptor_id = ee_item.get("id")
3394 if ee_item.get("juju"):
3395 vca_name = ee_item["juju"].get("charm")
3396 vca_type = (
3397 "lxc_proxy_charm"
3398 if ee_item["juju"].get("charm") is not None
3399 else "native_charm"
3400 )
3401 if ee_item["juju"].get("cloud") == "k8s":
3402 vca_type = "k8s_proxy_charm"
3403 elif ee_item["juju"].get("proxy") is False:
3404 vca_type = "native_charm"
3405 elif ee_item.get("helm-chart"):
3406 vca_name = ee_item["helm-chart"]
3407 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3408 vca_type = "helm"
3409 else:
3410 vca_type = "helm-v3"
3411 else:
3412 self.logger.debug(
3413 logging_text + "skipping non juju neither charm configuration"
3414 )
3415 continue
3416
3417 vca_index = -1
3418 for vca_index, vca_deployed in enumerate(
3419 db_nsr["_admin"]["deployed"]["VCA"]
3420 ):
3421 if not vca_deployed:
3422 continue
3423 if (
3424 vca_deployed.get("member-vnf-index") == member_vnf_index
3425 and vca_deployed.get("vdu_id") == vdu_id
3426 and vca_deployed.get("kdu_name") == kdu_name
3427 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3428 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3429 ):
3430 break
3431 else:
3432 # not found, create one.
3433 target = (
3434 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3435 )
3436 if vdu_id:
3437 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3438 elif kdu_name:
3439 target += "/kdu/{}".format(kdu_name)
3440 vca_deployed = {
3441 "target_element": target,
3442 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3443 "member-vnf-index": member_vnf_index,
3444 "vdu_id": vdu_id,
3445 "kdu_name": kdu_name,
3446 "vdu_count_index": vdu_index,
3447 "operational-status": "init", # TODO revise
3448 "detailed-status": "", # TODO revise
3449 "step": "initial-deploy", # TODO revise
3450 "vnfd_id": vnfd_id,
3451 "vdu_name": vdu_name,
3452 "type": vca_type,
3453 "ee_descriptor_id": ee_descriptor_id,
3454 }
3455 vca_index += 1
3456
3457 # create VCA and configurationStatus in db
3458 db_dict = {
3459 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3460 "configurationStatus.{}".format(vca_index): dict(),
3461 }
3462 self.update_db_2("nsrs", nsr_id, db_dict)
3463
3464 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3465
3466 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3467 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3468 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3469
3470 # Launch task
3471 task_n2vc = asyncio.ensure_future(
3472 self.instantiate_N2VC(
3473 logging_text=logging_text,
3474 vca_index=vca_index,
3475 nsi_id=nsi_id,
3476 db_nsr=db_nsr,
3477 db_vnfr=db_vnfr,
3478 vdu_id=vdu_id,
3479 kdu_name=kdu_name,
3480 vdu_index=vdu_index,
3481 deploy_params=deploy_params,
3482 config_descriptor=descriptor_config,
3483 base_folder=base_folder,
3484 nslcmop_id=nslcmop_id,
3485 stage=stage,
3486 vca_type=vca_type,
3487 vca_name=vca_name,
3488 ee_config_descriptor=ee_item,
3489 )
3490 )
3491 self.lcm_tasks.register(
3492 "ns",
3493 nsr_id,
3494 nslcmop_id,
3495 "instantiate_N2VC-{}".format(vca_index),
3496 task_n2vc,
3497 )
3498 task_instantiation_info[
3499 task_n2vc
3500 ] = self.task_name_deploy_vca + " {}.{}".format(
3501 member_vnf_index or "", vdu_id or ""
3502 )
3503
3504 @staticmethod
3505 def _create_nslcmop(nsr_id, operation, params):
3506 """
3507 Creates a ns-lcm-opp content to be stored at database.
3508 :param nsr_id: internal id of the instance
3509 :param operation: instantiate, terminate, scale, action, ...
3510 :param params: user parameters for the operation
3511 :return: dictionary following SOL005 format
3512 """
3513 # Raise exception if invalid arguments
3514 if not (nsr_id and operation and params):
3515 raise LcmException(
3516 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3517 )
3518 now = time()
3519 _id = str(uuid4())
3520 nslcmop = {
3521 "id": _id,
3522 "_id": _id,
3523 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3524 "operationState": "PROCESSING",
3525 "statusEnteredTime": now,
3526 "nsInstanceId": nsr_id,
3527 "lcmOperationType": operation,
3528 "startTime": now,
3529 "isAutomaticInvocation": False,
3530 "operationParams": params,
3531 "isCancelPending": False,
3532 "links": {
3533 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3534 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3535 },
3536 }
3537 return nslcmop
3538
3539 def _format_additional_params(self, params):
3540 params = params or {}
3541 for key, value in params.items():
3542 if str(value).startswith("!!yaml "):
3543 params[key] = yaml.safe_load(value[7:])
3544 return params
3545
3546 def _get_terminate_primitive_params(self, seq, vnf_index):
3547 primitive = seq.get("name")
3548 primitive_params = {}
3549 params = {
3550 "member_vnf_index": vnf_index,
3551 "primitive": primitive,
3552 "primitive_params": primitive_params,
3553 }
3554 desc_params = {}
3555 return self._map_primitive_params(seq, params, desc_params)
3556
3557 # sub-operations
3558
3559 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3560 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3561 if op.get("operationState") == "COMPLETED":
3562 # b. Skip sub-operation
3563 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3564 return self.SUBOPERATION_STATUS_SKIP
3565 else:
3566 # c. retry executing sub-operation
3567 # The sub-operation exists, and operationState != 'COMPLETED'
3568 # Update operationState = 'PROCESSING' to indicate a retry.
3569 operationState = "PROCESSING"
3570 detailed_status = "In progress"
3571 self._update_suboperation_status(
3572 db_nslcmop, op_index, operationState, detailed_status
3573 )
3574 # Return the sub-operation index
3575 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3576 # with arguments extracted from the sub-operation
3577 return op_index
3578
3579 # Find a sub-operation where all keys in a matching dictionary must match
3580 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3581 def _find_suboperation(self, db_nslcmop, match):
3582 if db_nslcmop and match:
3583 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3584 for i, op in enumerate(op_list):
3585 if all(op.get(k) == match[k] for k in match):
3586 return i
3587 return self.SUBOPERATION_STATUS_NOT_FOUND
3588
3589 # Update status for a sub-operation given its index
3590 def _update_suboperation_status(
3591 self, db_nslcmop, op_index, operationState, detailed_status
3592 ):
3593 # Update DB for HA tasks
3594 q_filter = {"_id": db_nslcmop["_id"]}
3595 update_dict = {
3596 "_admin.operations.{}.operationState".format(op_index): operationState,
3597 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3598 }
3599 self.db.set_one(
3600 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3601 )
3602
3603 # Add sub-operation, return the index of the added sub-operation
3604 # Optionally, set operationState, detailed-status, and operationType
3605 # Status and type are currently set for 'scale' sub-operations:
3606 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3607 # 'detailed-status' : status message
3608 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3609 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3610 def _add_suboperation(
3611 self,
3612 db_nslcmop,
3613 vnf_index,
3614 vdu_id,
3615 vdu_count_index,
3616 vdu_name,
3617 primitive,
3618 mapped_primitive_params,
3619 operationState=None,
3620 detailed_status=None,
3621 operationType=None,
3622 RO_nsr_id=None,
3623 RO_scaling_info=None,
3624 ):
3625 if not db_nslcmop:
3626 return self.SUBOPERATION_STATUS_NOT_FOUND
3627 # Get the "_admin.operations" list, if it exists
3628 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3629 op_list = db_nslcmop_admin.get("operations")
3630 # Create or append to the "_admin.operations" list
3631 new_op = {
3632 "member_vnf_index": vnf_index,
3633 "vdu_id": vdu_id,
3634 "vdu_count_index": vdu_count_index,
3635 "primitive": primitive,
3636 "primitive_params": mapped_primitive_params,
3637 }
3638 if operationState:
3639 new_op["operationState"] = operationState
3640 if detailed_status:
3641 new_op["detailed-status"] = detailed_status
3642 if operationType:
3643 new_op["lcmOperationType"] = operationType
3644 if RO_nsr_id:
3645 new_op["RO_nsr_id"] = RO_nsr_id
3646 if RO_scaling_info:
3647 new_op["RO_scaling_info"] = RO_scaling_info
3648 if not op_list:
3649 # No existing operations, create key 'operations' with current operation as first list element
3650 db_nslcmop_admin.update({"operations": [new_op]})
3651 op_list = db_nslcmop_admin.get("operations")
3652 else:
3653 # Existing operations, append operation to list
3654 op_list.append(new_op)
3655
3656 db_nslcmop_update = {"_admin.operations": op_list}
3657 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3658 op_index = len(op_list) - 1
3659 return op_index
3660
3661 # Helper methods for scale() sub-operations
3662
3663 # pre-scale/post-scale:
3664 # Check for 3 different cases:
3665 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3666 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3667 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3668 def _check_or_add_scale_suboperation(
3669 self,
3670 db_nslcmop,
3671 vnf_index,
3672 vnf_config_primitive,
3673 primitive_params,
3674 operationType,
3675 RO_nsr_id=None,
3676 RO_scaling_info=None,
3677 ):
3678 # Find this sub-operation
3679 if RO_nsr_id and RO_scaling_info:
3680 operationType = "SCALE-RO"
3681 match = {
3682 "member_vnf_index": vnf_index,
3683 "RO_nsr_id": RO_nsr_id,
3684 "RO_scaling_info": RO_scaling_info,
3685 }
3686 else:
3687 match = {
3688 "member_vnf_index": vnf_index,
3689 "primitive": vnf_config_primitive,
3690 "primitive_params": primitive_params,
3691 "lcmOperationType": operationType,
3692 }
3693 op_index = self._find_suboperation(db_nslcmop, match)
3694 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3695 # a. New sub-operation
3696 # The sub-operation does not exist, add it.
3697 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3698 # The following parameters are set to None for all kind of scaling:
3699 vdu_id = None
3700 vdu_count_index = None
3701 vdu_name = None
3702 if RO_nsr_id and RO_scaling_info:
3703 vnf_config_primitive = None
3704 primitive_params = None
3705 else:
3706 RO_nsr_id = None
3707 RO_scaling_info = None
3708 # Initial status for sub-operation
3709 operationState = "PROCESSING"
3710 detailed_status = "In progress"
3711 # Add sub-operation for pre/post-scaling (zero or more operations)
3712 self._add_suboperation(
3713 db_nslcmop,
3714 vnf_index,
3715 vdu_id,
3716 vdu_count_index,
3717 vdu_name,
3718 vnf_config_primitive,
3719 primitive_params,
3720 operationState,
3721 detailed_status,
3722 operationType,
3723 RO_nsr_id,
3724 RO_scaling_info,
3725 )
3726 return self.SUBOPERATION_STATUS_NEW
3727 else:
3728 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3729 # or op_index (operationState != 'COMPLETED')
3730 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3731
3732 # Function to return execution_environment id
3733
3734 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3735 # TODO vdu_index_count
3736 for vca in vca_deployed_list:
3737 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3738 return vca["ee_id"]
3739
3740 async def destroy_N2VC(
3741 self,
3742 logging_text,
3743 db_nslcmop,
3744 vca_deployed,
3745 config_descriptor,
3746 vca_index,
3747 destroy_ee=True,
3748 exec_primitives=True,
3749 scaling_in=False,
3750 vca_id: str = None,
3751 ):
3752 """
3753 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3754 :param logging_text:
3755 :param db_nslcmop:
3756 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3757 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3758 :param vca_index: index in the database _admin.deployed.VCA
3759 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3760 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3761 not executed properly
3762 :param scaling_in: True destroys the application, False destroys the model
3763 :return: None or exception
3764 """
3765
3766 self.logger.debug(
3767 logging_text
3768 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3769 vca_index, vca_deployed, config_descriptor, destroy_ee
3770 )
3771 )
3772
3773 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3774
3775 # execute terminate_primitives
3776 if exec_primitives:
3777 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3778 config_descriptor.get("terminate-config-primitive"),
3779 vca_deployed.get("ee_descriptor_id"),
3780 )
3781 vdu_id = vca_deployed.get("vdu_id")
3782 vdu_count_index = vca_deployed.get("vdu_count_index")
3783 vdu_name = vca_deployed.get("vdu_name")
3784 vnf_index = vca_deployed.get("member-vnf-index")
3785 if terminate_primitives and vca_deployed.get("needed_terminate"):
3786 for seq in terminate_primitives:
3787 # For each sequence in list, get primitive and call _ns_execute_primitive()
3788 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3789 vnf_index, seq.get("name")
3790 )
3791 self.logger.debug(logging_text + step)
3792 # Create the primitive for each sequence, i.e. "primitive": "touch"
3793 primitive = seq.get("name")
3794 mapped_primitive_params = self._get_terminate_primitive_params(
3795 seq, vnf_index
3796 )
3797
3798 # Add sub-operation
3799 self._add_suboperation(
3800 db_nslcmop,
3801 vnf_index,
3802 vdu_id,
3803 vdu_count_index,
3804 vdu_name,
3805 primitive,
3806 mapped_primitive_params,
3807 )
3808 # Sub-operations: Call _ns_execute_primitive() instead of action()
3809 try:
3810 result, result_detail = await self._ns_execute_primitive(
3811 vca_deployed["ee_id"],
3812 primitive,
3813 mapped_primitive_params,
3814 vca_type=vca_type,
3815 vca_id=vca_id,
3816 )
3817 except LcmException:
3818 # this happens when VCA is not deployed. In this case it is not needed to terminate
3819 continue
3820 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3821 if result not in result_ok:
3822 raise LcmException(
3823 "terminate_primitive {} for vnf_member_index={} fails with "
3824 "error {}".format(seq.get("name"), vnf_index, result_detail)
3825 )
3826 # set that this VCA do not need terminated
3827 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3828 vca_index
3829 )
3830 self.update_db_2(
3831 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3832 )
3833
3834 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3835 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3836
3837 if destroy_ee:
3838 await self.vca_map[vca_type].delete_execution_environment(
3839 vca_deployed["ee_id"],
3840 scaling_in=scaling_in,
3841 vca_type=vca_type,
3842 vca_id=vca_id,
3843 )
3844
3845 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3846 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3847 namespace = "." + db_nsr["_id"]
3848 try:
3849 await self.n2vc.delete_namespace(
3850 namespace=namespace,
3851 total_timeout=self.timeout_charm_delete,
3852 vca_id=vca_id,
3853 )
3854 except N2VCNotFound: # already deleted. Skip
3855 pass
3856 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3857
3858 async def _terminate_RO(
3859 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3860 ):
3861 """
3862 Terminates a deployment from RO
3863 :param logging_text:
3864 :param nsr_deployed: db_nsr._admin.deployed
3865 :param nsr_id:
3866 :param nslcmop_id:
3867 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3868 this method will update only the index 2, but it will write on database the concatenated content of the list
3869 :return:
3870 """
3871 db_nsr_update = {}
3872 failed_detail = []
3873 ro_nsr_id = ro_delete_action = None
3874 if nsr_deployed and nsr_deployed.get("RO"):
3875 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3876 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3877 try:
3878 if ro_nsr_id:
3879 stage[2] = "Deleting ns from VIM."
3880 db_nsr_update["detailed-status"] = " ".join(stage)
3881 self._write_op_status(nslcmop_id, stage)
3882 self.logger.debug(logging_text + stage[2])
3883 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3884 self._write_op_status(nslcmop_id, stage)
3885 desc = await self.RO.delete("ns", ro_nsr_id)
3886 ro_delete_action = desc["action_id"]
3887 db_nsr_update[
3888 "_admin.deployed.RO.nsr_delete_action_id"
3889 ] = ro_delete_action
3890 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3891 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3892 if ro_delete_action:
3893 # wait until NS is deleted from VIM
3894 stage[2] = "Waiting ns deleted from VIM."
3895 detailed_status_old = None
3896 self.logger.debug(
3897 logging_text
3898 + stage[2]
3899 + " RO_id={} ro_delete_action={}".format(
3900 ro_nsr_id, ro_delete_action
3901 )
3902 )
3903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3904 self._write_op_status(nslcmop_id, stage)
3905
3906 delete_timeout = 20 * 60 # 20 minutes
3907 while delete_timeout > 0:
3908 desc = await self.RO.show(
3909 "ns",
3910 item_id_name=ro_nsr_id,
3911 extra_item="action",
3912 extra_item_id=ro_delete_action,
3913 )
3914
3915 # deploymentStatus
3916 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3917
3918 ns_status, ns_status_info = self.RO.check_action_status(desc)
3919 if ns_status == "ERROR":
3920 raise ROclient.ROClientException(ns_status_info)
3921 elif ns_status == "BUILD":
3922 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3923 elif ns_status == "ACTIVE":
3924 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3925 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3926 break
3927 else:
3928 assert (
3929 False
3930 ), "ROclient.check_action_status returns unknown {}".format(
3931 ns_status
3932 )
3933 if stage[2] != detailed_status_old:
3934 detailed_status_old = stage[2]
3935 db_nsr_update["detailed-status"] = " ".join(stage)
3936 self._write_op_status(nslcmop_id, stage)
3937 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3938 await asyncio.sleep(5, loop=self.loop)
3939 delete_timeout -= 5
3940 else: # delete_timeout <= 0:
3941 raise ROclient.ROClientException(
3942 "Timeout waiting ns deleted from VIM"
3943 )
3944
3945 except Exception as e:
3946 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3947 if (
3948 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3949 ): # not found
3950 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3951 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3952 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3953 self.logger.debug(
3954 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3955 )
3956 elif (
3957 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3958 ): # conflict
3959 failed_detail.append("delete conflict: {}".format(e))
3960 self.logger.debug(
3961 logging_text
3962 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3963 )
3964 else:
3965 failed_detail.append("delete error: {}".format(e))
3966 self.logger.error(
3967 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
3968 )
3969
3970 # Delete nsd
3971 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3972 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3973 try:
3974 stage[2] = "Deleting nsd from RO."
3975 db_nsr_update["detailed-status"] = " ".join(stage)
3976 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3977 self._write_op_status(nslcmop_id, stage)
3978 await self.RO.delete("nsd", ro_nsd_id)
3979 self.logger.debug(
3980 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
3981 )
3982 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3983 except Exception as e:
3984 if (
3985 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3986 ): # not found
3987 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3988 self.logger.debug(
3989 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
3990 )
3991 elif (
3992 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3993 ): # conflict
3994 failed_detail.append(
3995 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
3996 )
3997 self.logger.debug(logging_text + failed_detail[-1])
3998 else:
3999 failed_detail.append(
4000 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4001 )
4002 self.logger.error(logging_text + failed_detail[-1])
4003
4004 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4005 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4006 if not vnf_deployed or not vnf_deployed["id"]:
4007 continue
4008 try:
4009 ro_vnfd_id = vnf_deployed["id"]
4010 stage[
4011 2
4012 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4013 vnf_deployed["member-vnf-index"], ro_vnfd_id
4014 )
4015 db_nsr_update["detailed-status"] = " ".join(stage)
4016 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4017 self._write_op_status(nslcmop_id, stage)
4018 await self.RO.delete("vnfd", ro_vnfd_id)
4019 self.logger.debug(
4020 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4021 )
4022 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4023 except Exception as e:
4024 if (
4025 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4026 ): # not found
4027 db_nsr_update[
4028 "_admin.deployed.RO.vnfd.{}.id".format(index)
4029 ] = None
4030 self.logger.debug(
4031 logging_text
4032 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4033 )
4034 elif (
4035 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4036 ): # conflict
4037 failed_detail.append(
4038 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4039 )
4040 self.logger.debug(logging_text + failed_detail[-1])
4041 else:
4042 failed_detail.append(
4043 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4044 )
4045 self.logger.error(logging_text + failed_detail[-1])
4046
4047 if failed_detail:
4048 stage[2] = "Error deleting from VIM"
4049 else:
4050 stage[2] = "Deleted from VIM"
4051 db_nsr_update["detailed-status"] = " ".join(stage)
4052 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4053 self._write_op_status(nslcmop_id, stage)
4054
4055 if failed_detail:
4056 raise LcmException("; ".join(failed_detail))
4057
4058 async def terminate(self, nsr_id, nslcmop_id):
4059 # Try to lock HA task here
4060 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4061 if not task_is_locked_by_me:
4062 return
4063
4064 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4065 self.logger.debug(logging_text + "Enter")
4066 timeout_ns_terminate = self.timeout_ns_terminate
4067 db_nsr = None
4068 db_nslcmop = None
4069 operation_params = None
4070 exc = None
4071 error_list = [] # annotates all failed error messages
4072 db_nslcmop_update = {}
4073 autoremove = False # autoremove after terminated
4074 tasks_dict_info = {}
4075 db_nsr_update = {}
4076 stage = [
4077 "Stage 1/3: Preparing task.",
4078 "Waiting for previous operations to terminate.",
4079 "",
4080 ]
4081 # ^ contains [stage, step, VIM-status]
4082 try:
4083 # wait for any previous tasks in process
4084 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4085
4086 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4087 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4088 operation_params = db_nslcmop.get("operationParams") or {}
4089 if operation_params.get("timeout_ns_terminate"):
4090 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4091 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4092 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4093
4094 db_nsr_update["operational-status"] = "terminating"
4095 db_nsr_update["config-status"] = "terminating"
4096 self._write_ns_status(
4097 nsr_id=nsr_id,
4098 ns_state="TERMINATING",
4099 current_operation="TERMINATING",
4100 current_operation_id=nslcmop_id,
4101 other_update=db_nsr_update,
4102 )
4103 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4104 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4105 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4106 return
4107
4108 stage[1] = "Getting vnf descriptors from db."
4109 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4110 db_vnfrs_dict = {
4111 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4112 }
4113 db_vnfds_from_id = {}
4114 db_vnfds_from_member_index = {}
4115 # Loop over VNFRs
4116 for vnfr in db_vnfrs_list:
4117 vnfd_id = vnfr["vnfd-id"]
4118 if vnfd_id not in db_vnfds_from_id:
4119 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4120 db_vnfds_from_id[vnfd_id] = vnfd
4121 db_vnfds_from_member_index[
4122 vnfr["member-vnf-index-ref"]
4123 ] = db_vnfds_from_id[vnfd_id]
4124
4125 # Destroy individual execution environments when there are terminating primitives.
4126 # Rest of EE will be deleted at once
4127 # TODO - check before calling _destroy_N2VC
4128 # if not operation_params.get("skip_terminate_primitives"):#
4129 # or not vca.get("needed_terminate"):
4130 stage[0] = "Stage 2/3 execute terminating primitives."
4131 self.logger.debug(logging_text + stage[0])
4132 stage[1] = "Looking execution environment that needs terminate."
4133 self.logger.debug(logging_text + stage[1])
4134
4135 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4136 config_descriptor = None
4137 vca_member_vnf_index = vca.get("member-vnf-index")
4138 vca_id = self.get_vca_id(
4139 db_vnfrs_dict.get(vca_member_vnf_index)
4140 if vca_member_vnf_index
4141 else None,
4142 db_nsr,
4143 )
4144 if not vca or not vca.get("ee_id"):
4145 continue
4146 if not vca.get("member-vnf-index"):
4147 # ns
4148 config_descriptor = db_nsr.get("ns-configuration")
4149 elif vca.get("vdu_id"):
4150 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4151 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4152 elif vca.get("kdu_name"):
4153 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4154 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4155 else:
4156 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4157 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4158 vca_type = vca.get("type")
4159 exec_terminate_primitives = not operation_params.get(
4160 "skip_terminate_primitives"
4161 ) and vca.get("needed_terminate")
4162 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4163 # pending native charms
4164 destroy_ee = (
4165 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4166 )
4167 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4168 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4169 task = asyncio.ensure_future(
4170 self.destroy_N2VC(
4171 logging_text,
4172 db_nslcmop,
4173 vca,
4174 config_descriptor,
4175 vca_index,
4176 destroy_ee,
4177 exec_terminate_primitives,
4178 vca_id=vca_id,
4179 )
4180 )
4181 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4182
4183 # wait for pending tasks of terminate primitives
4184 if tasks_dict_info:
4185 self.logger.debug(
4186 logging_text
4187 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4188 )
4189 error_list = await self._wait_for_tasks(
4190 logging_text,
4191 tasks_dict_info,
4192 min(self.timeout_charm_delete, timeout_ns_terminate),
4193 stage,
4194 nslcmop_id,
4195 )
4196 tasks_dict_info.clear()
4197 if error_list:
4198 return # raise LcmException("; ".join(error_list))
4199
4200 # remove All execution environments at once
4201 stage[0] = "Stage 3/3 delete all."
4202
4203 if nsr_deployed.get("VCA"):
4204 stage[1] = "Deleting all execution environments."
4205 self.logger.debug(logging_text + stage[1])
4206 vca_id = self.get_vca_id({}, db_nsr)
4207 task_delete_ee = asyncio.ensure_future(
4208 asyncio.wait_for(
4209 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4210 timeout=self.timeout_charm_delete,
4211 )
4212 )
4213 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4214 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4215
4216 # Delete from k8scluster
4217 stage[1] = "Deleting KDUs."
4218 self.logger.debug(logging_text + stage[1])
4219 # print(nsr_deployed)
4220 for kdu in get_iterable(nsr_deployed, "K8s"):
4221 if not kdu or not kdu.get("kdu-instance"):
4222 continue
4223 kdu_instance = kdu.get("kdu-instance")
4224 if kdu.get("k8scluster-type") in self.k8scluster_map:
4225 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4226 vca_id = self.get_vca_id({}, db_nsr)
4227 task_delete_kdu_instance = asyncio.ensure_future(
4228 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4229 cluster_uuid=kdu.get("k8scluster-uuid"),
4230 kdu_instance=kdu_instance,
4231 vca_id=vca_id,
4232 )
4233 )
4234 else:
4235 self.logger.error(
4236 logging_text
4237 + "Unknown k8s deployment type {}".format(
4238 kdu.get("k8scluster-type")
4239 )
4240 )
4241 continue
4242 tasks_dict_info[
4243 task_delete_kdu_instance
4244 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4245
4246 # remove from RO
4247 stage[1] = "Deleting ns from VIM."
4248 if self.ng_ro:
4249 task_delete_ro = asyncio.ensure_future(
4250 self._terminate_ng_ro(
4251 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4252 )
4253 )
4254 else:
4255 task_delete_ro = asyncio.ensure_future(
4256 self._terminate_RO(
4257 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4258 )
4259 )
4260 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4261
4262 # rest of staff will be done at finally
4263
4264 except (
4265 ROclient.ROClientException,
4266 DbException,
4267 LcmException,
4268 N2VCException,
4269 ) as e:
4270 self.logger.error(logging_text + "Exit Exception {}".format(e))
4271 exc = e
4272 except asyncio.CancelledError:
4273 self.logger.error(
4274 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4275 )
4276 exc = "Operation was cancelled"
4277 except Exception as e:
4278 exc = traceback.format_exc()
4279 self.logger.critical(
4280 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4281 exc_info=True,
4282 )
4283 finally:
4284 if exc:
4285 error_list.append(str(exc))
4286 try:
4287 # wait for pending tasks
4288 if tasks_dict_info:
4289 stage[1] = "Waiting for terminate pending tasks."
4290 self.logger.debug(logging_text + stage[1])
4291 error_list += await self._wait_for_tasks(
4292 logging_text,
4293 tasks_dict_info,
4294 timeout_ns_terminate,
4295 stage,
4296 nslcmop_id,
4297 )
4298 stage[1] = stage[2] = ""
4299 except asyncio.CancelledError:
4300 error_list.append("Cancelled")
4301 # TODO cancell all tasks
4302 except Exception as exc:
4303 error_list.append(str(exc))
4304 # update status at database
4305 if error_list:
4306 error_detail = "; ".join(error_list)
4307 # self.logger.error(logging_text + error_detail)
4308 error_description_nslcmop = "{} Detail: {}".format(
4309 stage[0], error_detail
4310 )
4311 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4312 nslcmop_id, stage[0]
4313 )
4314
4315 db_nsr_update["operational-status"] = "failed"
4316 db_nsr_update["detailed-status"] = (
4317 error_description_nsr + " Detail: " + error_detail
4318 )
4319 db_nslcmop_update["detailed-status"] = error_detail
4320 nslcmop_operation_state = "FAILED"
4321 ns_state = "BROKEN"
4322 else:
4323 error_detail = None
4324 error_description_nsr = error_description_nslcmop = None
4325 ns_state = "NOT_INSTANTIATED"
4326 db_nsr_update["operational-status"] = "terminated"
4327 db_nsr_update["detailed-status"] = "Done"
4328 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4329 db_nslcmop_update["detailed-status"] = "Done"
4330 nslcmop_operation_state = "COMPLETED"
4331
4332 if db_nsr:
4333 self._write_ns_status(
4334 nsr_id=nsr_id,
4335 ns_state=ns_state,
4336 current_operation="IDLE",
4337 current_operation_id=None,
4338 error_description=error_description_nsr,
4339 error_detail=error_detail,
4340 other_update=db_nsr_update,
4341 )
4342 self._write_op_status(
4343 op_id=nslcmop_id,
4344 stage="",
4345 error_message=error_description_nslcmop,
4346 operation_state=nslcmop_operation_state,
4347 other_update=db_nslcmop_update,
4348 )
4349 if ns_state == "NOT_INSTANTIATED":
4350 try:
4351 self.db.set_list(
4352 "vnfrs",
4353 {"nsr-id-ref": nsr_id},
4354 {"_admin.nsState": "NOT_INSTANTIATED"},
4355 )
4356 except DbException as e:
4357 self.logger.warn(
4358 logging_text
4359 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4360 nsr_id, e
4361 )
4362 )
4363 if operation_params:
4364 autoremove = operation_params.get("autoremove", False)
4365 if nslcmop_operation_state:
4366 try:
4367 await self.msg.aiowrite(
4368 "ns",
4369 "terminated",
4370 {
4371 "nsr_id": nsr_id,
4372 "nslcmop_id": nslcmop_id,
4373 "operationState": nslcmop_operation_state,
4374 "autoremove": autoremove,
4375 },
4376 loop=self.loop,
4377 )
4378 except Exception as e:
4379 self.logger.error(
4380 logging_text + "kafka_write notification Exception {}".format(e)
4381 )
4382
4383 self.logger.debug(logging_text + "Exit")
4384 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4385
4386 async def _wait_for_tasks(
4387 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4388 ):
4389 time_start = time()
4390 error_detail_list = []
4391 error_list = []
4392 pending_tasks = list(created_tasks_info.keys())
4393 num_tasks = len(pending_tasks)
4394 num_done = 0
4395 stage[1] = "{}/{}.".format(num_done, num_tasks)
4396 self._write_op_status(nslcmop_id, stage)
4397 while pending_tasks:
4398 new_error = None
4399 _timeout = timeout + time_start - time()
4400 done, pending_tasks = await asyncio.wait(
4401 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4402 )
4403 num_done += len(done)
4404 if not done: # Timeout
4405 for task in pending_tasks:
4406 new_error = created_tasks_info[task] + ": Timeout"
4407 error_detail_list.append(new_error)
4408 error_list.append(new_error)
4409 break
4410 for task in done:
4411 if task.cancelled():
4412 exc = "Cancelled"
4413 else:
4414 exc = task.exception()
4415 if exc:
4416 if isinstance(exc, asyncio.TimeoutError):
4417 exc = "Timeout"
4418 new_error = created_tasks_info[task] + ": {}".format(exc)
4419 error_list.append(created_tasks_info[task])
4420 error_detail_list.append(new_error)
4421 if isinstance(
4422 exc,
4423 (
4424 str,
4425 DbException,
4426 N2VCException,
4427 ROclient.ROClientException,
4428 LcmException,
4429 K8sException,
4430 NgRoException,
4431 ),
4432 ):
4433 self.logger.error(logging_text + new_error)
4434 else:
4435 exc_traceback = "".join(
4436 traceback.format_exception(None, exc, exc.__traceback__)
4437 )
4438 self.logger.error(
4439 logging_text
4440 + created_tasks_info[task]
4441 + " "
4442 + exc_traceback
4443 )
4444 else:
4445 self.logger.debug(
4446 logging_text + created_tasks_info[task] + ": Done"
4447 )
4448 stage[1] = "{}/{}.".format(num_done, num_tasks)
4449 if new_error:
4450 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4451 if nsr_id: # update also nsr
4452 self.update_db_2(
4453 "nsrs",
4454 nsr_id,
4455 {
4456 "errorDescription": "Error at: " + ", ".join(error_list),
4457 "errorDetail": ". ".join(error_detail_list),
4458 },
4459 )
4460 self._write_op_status(nslcmop_id, stage)
4461 return error_detail_list
4462
4463 @staticmethod
4464 def _map_primitive_params(primitive_desc, params, instantiation_params):
4465 """
4466 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4467 The default-value is used. If it is between < > it look for a value at instantiation_params
4468 :param primitive_desc: portion of VNFD/NSD that describes primitive
4469 :param params: Params provided by user
4470 :param instantiation_params: Instantiation params provided by user
4471 :return: a dictionary with the calculated params
4472 """
4473 calculated_params = {}
4474 for parameter in primitive_desc.get("parameter", ()):
4475 param_name = parameter["name"]
4476 if param_name in params:
4477 calculated_params[param_name] = params[param_name]
4478 elif "default-value" in parameter or "value" in parameter:
4479 if "value" in parameter:
4480 calculated_params[param_name] = parameter["value"]
4481 else:
4482 calculated_params[param_name] = parameter["default-value"]
4483 if (
4484 isinstance(calculated_params[param_name], str)
4485 and calculated_params[param_name].startswith("<")
4486 and calculated_params[param_name].endswith(">")
4487 ):
4488 if calculated_params[param_name][1:-1] in instantiation_params:
4489 calculated_params[param_name] = instantiation_params[
4490 calculated_params[param_name][1:-1]
4491 ]
4492 else:
4493 raise LcmException(
4494 "Parameter {} needed to execute primitive {} not provided".format(
4495 calculated_params[param_name], primitive_desc["name"]
4496 )
4497 )
4498 else:
4499 raise LcmException(
4500 "Parameter {} needed to execute primitive {} not provided".format(
4501 param_name, primitive_desc["name"]
4502 )
4503 )
4504
4505 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4506 calculated_params[param_name] = yaml.safe_dump(
4507 calculated_params[param_name], default_flow_style=True, width=256
4508 )
4509 elif isinstance(calculated_params[param_name], str) and calculated_params[
4510 param_name
4511 ].startswith("!!yaml "):
4512 calculated_params[param_name] = calculated_params[param_name][7:]
4513 if parameter.get("data-type") == "INTEGER":
4514 try:
4515 calculated_params[param_name] = int(calculated_params[param_name])
4516 except ValueError: # error converting string to int
4517 raise LcmException(
4518 "Parameter {} of primitive {} must be integer".format(
4519 param_name, primitive_desc["name"]
4520 )
4521 )
4522 elif parameter.get("data-type") == "BOOLEAN":
4523 calculated_params[param_name] = not (
4524 (str(calculated_params[param_name])).lower() == "false"
4525 )
4526
4527 # add always ns_config_info if primitive name is config
4528 if primitive_desc["name"] == "config":
4529 if "ns_config_info" in instantiation_params:
4530 calculated_params["ns_config_info"] = instantiation_params[
4531 "ns_config_info"
4532 ]
4533 return calculated_params
4534
4535 def _look_for_deployed_vca(
4536 self,
4537 deployed_vca,
4538 member_vnf_index,
4539 vdu_id,
4540 vdu_count_index,
4541 kdu_name=None,
4542 ee_descriptor_id=None,
4543 ):
4544 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4545 for vca in deployed_vca:
4546 if not vca:
4547 continue
4548 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4549 continue
4550 if (
4551 vdu_count_index is not None
4552 and vdu_count_index != vca["vdu_count_index"]
4553 ):
4554 continue
4555 if kdu_name and kdu_name != vca["kdu_name"]:
4556 continue
4557 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4558 continue
4559 break
4560 else:
4561 # vca_deployed not found
4562 raise LcmException(
4563 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4564 " is not deployed".format(
4565 member_vnf_index,
4566 vdu_id,
4567 vdu_count_index,
4568 kdu_name,
4569 ee_descriptor_id,
4570 )
4571 )
4572 # get ee_id
4573 ee_id = vca.get("ee_id")
4574 vca_type = vca.get(
4575 "type", "lxc_proxy_charm"
4576 ) # default value for backward compatibility - proxy charm
4577 if not ee_id:
4578 raise LcmException(
4579 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4580 "execution environment".format(
4581 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4582 )
4583 )
4584 return ee_id, vca_type
4585
4586 async def _ns_execute_primitive(
4587 self,
4588 ee_id,
4589 primitive,
4590 primitive_params,
4591 retries=0,
4592 retries_interval=30,
4593 timeout=None,
4594 vca_type=None,
4595 db_dict=None,
4596 vca_id: str = None,
4597 ) -> (str, str):
4598 try:
4599 if primitive == "config":
4600 primitive_params = {"params": primitive_params}
4601
4602 vca_type = vca_type or "lxc_proxy_charm"
4603
4604 while retries >= 0:
4605 try:
4606 output = await asyncio.wait_for(
4607 self.vca_map[vca_type].exec_primitive(
4608 ee_id=ee_id,
4609 primitive_name=primitive,
4610 params_dict=primitive_params,
4611 progress_timeout=self.timeout_progress_primitive,
4612 total_timeout=self.timeout_primitive,
4613 db_dict=db_dict,
4614 vca_id=vca_id,
4615 vca_type=vca_type,
4616 ),
4617 timeout=timeout or self.timeout_primitive,
4618 )
4619 # execution was OK
4620 break
4621 except asyncio.CancelledError:
4622 raise
4623 except Exception as e: # asyncio.TimeoutError
4624 if isinstance(e, asyncio.TimeoutError):
4625 e = "Timeout"
4626 retries -= 1
4627 if retries >= 0:
4628 self.logger.debug(
4629 "Error executing action {} on {} -> {}".format(
4630 primitive, ee_id, e
4631 )
4632 )
4633 # wait and retry
4634 await asyncio.sleep(retries_interval, loop=self.loop)
4635 else:
4636 return "FAILED", str(e)
4637
4638 return "COMPLETED", output
4639
4640 except (LcmException, asyncio.CancelledError):
4641 raise
4642 except Exception as e:
4643 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4644
4645 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4646 """
4647 Updating the vca_status with latest juju information in nsrs record
4648 :param: nsr_id: Id of the nsr
4649 :param: nslcmop_id: Id of the nslcmop
4650 :return: None
4651 """
4652
4653 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4654 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4655 vca_id = self.get_vca_id({}, db_nsr)
4656 if db_nsr["_admin"]["deployed"]["K8s"]:
4657 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4658 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4659 await self._on_update_k8s_db(
4660 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4661 )
4662 else:
4663 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4664 table, filter = "nsrs", {"_id": nsr_id}
4665 path = "_admin.deployed.VCA.{}.".format(vca_index)
4666 await self._on_update_n2vc_db(table, filter, path, {})
4667
4668 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4669 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4670
4671 async def action(self, nsr_id, nslcmop_id):
4672 # Try to lock HA task here
4673 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4674 if not task_is_locked_by_me:
4675 return
4676
4677 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4678 self.logger.debug(logging_text + "Enter")
4679 # get all needed from database
4680 db_nsr = None
4681 db_nslcmop = None
4682 db_nsr_update = {}
4683 db_nslcmop_update = {}
4684 nslcmop_operation_state = None
4685 error_description_nslcmop = None
4686 exc = None
4687 try:
4688 # wait for any previous tasks in process
4689 step = "Waiting for previous operations to terminate"
4690 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4691
4692 self._write_ns_status(
4693 nsr_id=nsr_id,
4694 ns_state=None,
4695 current_operation="RUNNING ACTION",
4696 current_operation_id=nslcmop_id,
4697 )
4698
4699 step = "Getting information from database"
4700 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4701 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4702 if db_nslcmop["operationParams"].get("primitive_params"):
4703 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4704 db_nslcmop["operationParams"]["primitive_params"]
4705 )
4706
4707 nsr_deployed = db_nsr["_admin"].get("deployed")
4708 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4709 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4710 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4711 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4712 primitive = db_nslcmop["operationParams"]["primitive"]
4713 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4714 timeout_ns_action = db_nslcmop["operationParams"].get(
4715 "timeout_ns_action", self.timeout_primitive
4716 )
4717
4718 if vnf_index:
4719 step = "Getting vnfr from database"
4720 db_vnfr = self.db.get_one(
4721 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4722 )
4723 if db_vnfr.get("kdur"):
4724 kdur_list = []
4725 for kdur in db_vnfr["kdur"]:
4726 if kdur.get("additionalParams"):
4727 kdur["additionalParams"] = json.loads(
4728 kdur["additionalParams"]
4729 )
4730 kdur_list.append(kdur)
4731 db_vnfr["kdur"] = kdur_list
4732 step = "Getting vnfd from database"
4733 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4734 else:
4735 step = "Getting nsd from database"
4736 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4737
4738 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4739 # for backward compatibility
4740 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4741 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4742 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4743 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4744
4745 # look for primitive
4746 config_primitive_desc = descriptor_configuration = None
4747 if vdu_id:
4748 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4749 elif kdu_name:
4750 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4751 elif vnf_index:
4752 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4753 else:
4754 descriptor_configuration = db_nsd.get("ns-configuration")
4755
4756 if descriptor_configuration and descriptor_configuration.get(
4757 "config-primitive"
4758 ):
4759 for config_primitive in descriptor_configuration["config-primitive"]:
4760 if config_primitive["name"] == primitive:
4761 config_primitive_desc = config_primitive
4762 break
4763
4764 if not config_primitive_desc:
4765 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4766 raise LcmException(
4767 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4768 primitive
4769 )
4770 )
4771 primitive_name = primitive
4772 ee_descriptor_id = None
4773 else:
4774 primitive_name = config_primitive_desc.get(
4775 "execution-environment-primitive", primitive
4776 )
4777 ee_descriptor_id = config_primitive_desc.get(
4778 "execution-environment-ref"
4779 )
4780
4781 if vnf_index:
4782 if vdu_id:
4783 vdur = next(
4784 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4785 )
4786 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4787 elif kdu_name:
4788 kdur = next(
4789 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4790 )
4791 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4792 else:
4793 desc_params = parse_yaml_strings(
4794 db_vnfr.get("additionalParamsForVnf")
4795 )
4796 else:
4797 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4798 if kdu_name and get_configuration(db_vnfd, kdu_name):
4799 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4800 actions = set()
4801 for primitive in kdu_configuration.get("initial-config-primitive", []):
4802 actions.add(primitive["name"])
4803 for primitive in kdu_configuration.get("config-primitive", []):
4804 actions.add(primitive["name"])
4805 kdu_action = True if primitive_name in actions else False
4806
4807 # TODO check if ns is in a proper status
4808 if kdu_name and (
4809 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4810 ):
4811 # kdur and desc_params already set from before
4812 if primitive_params:
4813 desc_params.update(primitive_params)
4814 # TODO Check if we will need something at vnf level
4815 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4816 if (
4817 kdu_name == kdu["kdu-name"]
4818 and kdu["member-vnf-index"] == vnf_index
4819 ):
4820 break
4821 else:
4822 raise LcmException(
4823 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4824 )
4825
4826 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4827 msg = "unknown k8scluster-type '{}'".format(
4828 kdu.get("k8scluster-type")
4829 )
4830 raise LcmException(msg)
4831
4832 db_dict = {
4833 "collection": "nsrs",
4834 "filter": {"_id": nsr_id},
4835 "path": "_admin.deployed.K8s.{}".format(index),
4836 }
4837 self.logger.debug(
4838 logging_text
4839 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4840 )
4841 step = "Executing kdu {}".format(primitive_name)
4842 if primitive_name == "upgrade":
4843 if desc_params.get("kdu_model"):
4844 kdu_model = desc_params.get("kdu_model")
4845 del desc_params["kdu_model"]
4846 else:
4847 kdu_model = kdu.get("kdu-model")
4848 parts = kdu_model.split(sep=":")
4849 if len(parts) == 2:
4850 kdu_model = parts[0]
4851
4852 detailed_status = await asyncio.wait_for(
4853 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4854 cluster_uuid=kdu.get("k8scluster-uuid"),
4855 kdu_instance=kdu.get("kdu-instance"),
4856 atomic=True,
4857 kdu_model=kdu_model,
4858 params=desc_params,
4859 db_dict=db_dict,
4860 timeout=timeout_ns_action,
4861 ),
4862 timeout=timeout_ns_action + 10,
4863 )
4864 self.logger.debug(
4865 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4866 )
4867 elif primitive_name == "rollback":
4868 detailed_status = await asyncio.wait_for(
4869 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4870 cluster_uuid=kdu.get("k8scluster-uuid"),
4871 kdu_instance=kdu.get("kdu-instance"),
4872 db_dict=db_dict,
4873 ),
4874 timeout=timeout_ns_action,
4875 )
4876 elif primitive_name == "status":
4877 detailed_status = await asyncio.wait_for(
4878 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4879 cluster_uuid=kdu.get("k8scluster-uuid"),
4880 kdu_instance=kdu.get("kdu-instance"),
4881 vca_id=vca_id,
4882 ),
4883 timeout=timeout_ns_action,
4884 )
4885 else:
4886 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4887 kdu["kdu-name"], nsr_id
4888 )
4889 params = self._map_primitive_params(
4890 config_primitive_desc, primitive_params, desc_params
4891 )
4892
4893 detailed_status = await asyncio.wait_for(
4894 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4895 cluster_uuid=kdu.get("k8scluster-uuid"),
4896 kdu_instance=kdu_instance,
4897 primitive_name=primitive_name,
4898 params=params,
4899 db_dict=db_dict,
4900 timeout=timeout_ns_action,
4901 vca_id=vca_id,
4902 ),
4903 timeout=timeout_ns_action,
4904 )
4905
4906 if detailed_status:
4907 nslcmop_operation_state = "COMPLETED"
4908 else:
4909 detailed_status = ""
4910 nslcmop_operation_state = "FAILED"
4911 else:
4912 ee_id, vca_type = self._look_for_deployed_vca(
4913 nsr_deployed["VCA"],
4914 member_vnf_index=vnf_index,
4915 vdu_id=vdu_id,
4916 vdu_count_index=vdu_count_index,
4917 ee_descriptor_id=ee_descriptor_id,
4918 )
4919 for vca_index, vca_deployed in enumerate(
4920 db_nsr["_admin"]["deployed"]["VCA"]
4921 ):
4922 if vca_deployed.get("member-vnf-index") == vnf_index:
4923 db_dict = {
4924 "collection": "nsrs",
4925 "filter": {"_id": nsr_id},
4926 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4927 }
4928 break
4929 (
4930 nslcmop_operation_state,
4931 detailed_status,
4932 ) = await self._ns_execute_primitive(
4933 ee_id,
4934 primitive=primitive_name,
4935 primitive_params=self._map_primitive_params(
4936 config_primitive_desc, primitive_params, desc_params
4937 ),
4938 timeout=timeout_ns_action,
4939 vca_type=vca_type,
4940 db_dict=db_dict,
4941 vca_id=vca_id,
4942 )
4943
4944 db_nslcmop_update["detailed-status"] = detailed_status
4945 error_description_nslcmop = (
4946 detailed_status if nslcmop_operation_state == "FAILED" else ""
4947 )
4948 self.logger.debug(
4949 logging_text
4950 + " task Done with result {} {}".format(
4951 nslcmop_operation_state, detailed_status
4952 )
4953 )
4954 return # database update is called inside finally
4955
4956 except (DbException, LcmException, N2VCException, K8sException) as e:
4957 self.logger.error(logging_text + "Exit Exception {}".format(e))
4958 exc = e
4959 except asyncio.CancelledError:
4960 self.logger.error(
4961 logging_text + "Cancelled Exception while '{}'".format(step)
4962 )
4963 exc = "Operation was cancelled"
4964 except asyncio.TimeoutError:
4965 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4966 exc = "Timeout"
4967 except Exception as e:
4968 exc = traceback.format_exc()
4969 self.logger.critical(
4970 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
4971 exc_info=True,
4972 )
4973 finally:
4974 if exc:
4975 db_nslcmop_update[
4976 "detailed-status"
4977 ] = (
4978 detailed_status
4979 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4980 nslcmop_operation_state = "FAILED"
4981 if db_nsr:
4982 self._write_ns_status(
4983 nsr_id=nsr_id,
4984 ns_state=db_nsr[
4985 "nsState"
4986 ], # TODO check if degraded. For the moment use previous status
4987 current_operation="IDLE",
4988 current_operation_id=None,
4989 # error_description=error_description_nsr,
4990 # error_detail=error_detail,
4991 other_update=db_nsr_update,
4992 )
4993
4994 self._write_op_status(
4995 op_id=nslcmop_id,
4996 stage="",
4997 error_message=error_description_nslcmop,
4998 operation_state=nslcmop_operation_state,
4999 other_update=db_nslcmop_update,
5000 )
5001
5002 if nslcmop_operation_state:
5003 try:
5004 await self.msg.aiowrite(
5005 "ns",
5006 "actioned",
5007 {
5008 "nsr_id": nsr_id,
5009 "nslcmop_id": nslcmop_id,
5010 "operationState": nslcmop_operation_state,
5011 },
5012 loop=self.loop,
5013 )
5014 except Exception as e:
5015 self.logger.error(
5016 logging_text + "kafka_write notification Exception {}".format(e)
5017 )
5018 self.logger.debug(logging_text + "Exit")
5019 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5020 return nslcmop_operation_state, detailed_status
5021
5022 async def scale(self, nsr_id, nslcmop_id):
5023 # Try to lock HA task here
5024 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5025 if not task_is_locked_by_me:
5026 return
5027
5028 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5029 stage = ["", "", ""]
5030 tasks_dict_info = {}
5031 # ^ stage, step, VIM progress
5032 self.logger.debug(logging_text + "Enter")
5033 # get all needed from database
5034 db_nsr = None
5035 db_nslcmop_update = {}
5036 db_nsr_update = {}
5037 exc = None
5038 # in case of error, indicates what part of scale was failed to put nsr at error status
5039 scale_process = None
5040 old_operational_status = ""
5041 old_config_status = ""
5042 nsi_id = None
5043 try:
5044 # wait for any previous tasks in process
5045 step = "Waiting for previous operations to terminate"
5046 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5047 self._write_ns_status(
5048 nsr_id=nsr_id,
5049 ns_state=None,
5050 current_operation="SCALING",
5051 current_operation_id=nslcmop_id,
5052 )
5053
5054 step = "Getting nslcmop from database"
5055 self.logger.debug(
5056 step + " after having waited for previous tasks to be completed"
5057 )
5058 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5059
5060 step = "Getting nsr from database"
5061 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5062 old_operational_status = db_nsr["operational-status"]
5063 old_config_status = db_nsr["config-status"]
5064
5065 step = "Parsing scaling parameters"
5066 db_nsr_update["operational-status"] = "scaling"
5067 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5068 nsr_deployed = db_nsr["_admin"].get("deployed")
5069
5070 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5071 "scaleByStepData"
5072 ]["member-vnf-index"]
5073 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5074 "scaleByStepData"
5075 ]["scaling-group-descriptor"]
5076 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5077 # for backward compatibility
5078 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5079 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5080 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5081 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5082
5083 step = "Getting vnfr from database"
5084 db_vnfr = self.db.get_one(
5085 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5086 )
5087
5088 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5089
5090 step = "Getting vnfd from database"
5091 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5092
5093 base_folder = db_vnfd["_admin"]["storage"]
5094
5095 step = "Getting scaling-group-descriptor"
5096 scaling_descriptor = find_in_list(
5097 get_scaling_aspect(db_vnfd),
5098 lambda scale_desc: scale_desc["name"] == scaling_group,
5099 )
5100 if not scaling_descriptor:
5101 raise LcmException(
5102 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5103 "at vnfd:scaling-group-descriptor".format(scaling_group)
5104 )
5105
5106 step = "Sending scale order to VIM"
5107 # TODO check if ns is in a proper status
5108 nb_scale_op = 0
5109 if not db_nsr["_admin"].get("scaling-group"):
5110 self.update_db_2(
5111 "nsrs",
5112 nsr_id,
5113 {
5114 "_admin.scaling-group": [
5115 {"name": scaling_group, "nb-scale-op": 0}
5116 ]
5117 },
5118 )
5119 admin_scale_index = 0
5120 else:
5121 for admin_scale_index, admin_scale_info in enumerate(
5122 db_nsr["_admin"]["scaling-group"]
5123 ):
5124 if admin_scale_info["name"] == scaling_group:
5125 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5126 break
5127 else: # not found, set index one plus last element and add new entry with the name
5128 admin_scale_index += 1
5129 db_nsr_update[
5130 "_admin.scaling-group.{}.name".format(admin_scale_index)
5131 ] = scaling_group
5132
5133 vca_scaling_info = []
5134 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5135 if scaling_type == "SCALE_OUT":
5136 if "aspect-delta-details" not in scaling_descriptor:
5137 raise LcmException(
5138 "Aspect delta details not fount in scaling descriptor {}".format(
5139 scaling_descriptor["name"]
5140 )
5141 )
5142 # count if max-instance-count is reached
5143 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5144
5145 scaling_info["scaling_direction"] = "OUT"
5146 scaling_info["vdu-create"] = {}
5147 scaling_info["kdu-create"] = {}
5148 for delta in deltas:
5149 for vdu_delta in delta.get("vdu-delta", {}):
5150 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5151 # vdu_index also provides the number of instance of the targeted vdu
5152 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5153 cloud_init_text = self._get_vdu_cloud_init_content(
5154 vdud, db_vnfd
5155 )
5156 if cloud_init_text:
5157 additional_params = (
5158 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5159 or {}
5160 )
5161 cloud_init_list = []
5162
5163 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5164 max_instance_count = 10
5165 if vdu_profile and "max-number-of-instances" in vdu_profile:
5166 max_instance_count = vdu_profile.get(
5167 "max-number-of-instances", 10
5168 )
5169
5170 default_instance_num = get_number_of_instances(
5171 db_vnfd, vdud["id"]
5172 )
5173 instances_number = vdu_delta.get("number-of-instances", 1)
5174 nb_scale_op += instances_number
5175
5176 new_instance_count = nb_scale_op + default_instance_num
5177 # Control if new count is over max and vdu count is less than max.
5178 # Then assign new instance count
5179 if new_instance_count > max_instance_count > vdu_count:
5180 instances_number = new_instance_count - max_instance_count
5181 else:
5182 instances_number = instances_number
5183
5184 if new_instance_count > max_instance_count:
5185 raise LcmException(
5186 "reached the limit of {} (max-instance-count) "
5187 "scaling-out operations for the "
5188 "scaling-group-descriptor '{}'".format(
5189 nb_scale_op, scaling_group
5190 )
5191 )
5192 for x in range(vdu_delta.get("number-of-instances", 1)):
5193 if cloud_init_text:
5194 # TODO Information of its own ip is not available because db_vnfr is not updated.
5195 additional_params["OSM"] = get_osm_params(
5196 db_vnfr, vdu_delta["id"], vdu_index + x
5197 )
5198 cloud_init_list.append(
5199 self._parse_cloud_init(
5200 cloud_init_text,
5201 additional_params,
5202 db_vnfd["id"],
5203 vdud["id"],
5204 )
5205 )
5206 vca_scaling_info.append(
5207 {
5208 "osm_vdu_id": vdu_delta["id"],
5209 "member-vnf-index": vnf_index,
5210 "type": "create",
5211 "vdu_index": vdu_index + x,
5212 }
5213 )
5214 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5215 for kdu_delta in delta.get("kdu-resource-delta", {}):
5216 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5217 kdu_name = kdu_profile["kdu-name"]
5218 resource_name = kdu_profile["resource-name"]
5219
5220 # Might have different kdus in the same delta
5221 # Should have list for each kdu
5222 if not scaling_info["kdu-create"].get(kdu_name, None):
5223 scaling_info["kdu-create"][kdu_name] = []
5224
5225 kdur = get_kdur(db_vnfr, kdu_name)
5226 if kdur.get("helm-chart"):
5227 k8s_cluster_type = "helm-chart-v3"
5228 self.logger.debug("kdur: {}".format(kdur))
5229 if (
5230 kdur.get("helm-version")
5231 and kdur.get("helm-version") == "v2"
5232 ):
5233 k8s_cluster_type = "helm-chart"
5234 raise NotImplementedError
5235 elif kdur.get("juju-bundle"):
5236 k8s_cluster_type = "juju-bundle"
5237 else:
5238 raise LcmException(
5239 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5240 "juju-bundle. Maybe an old NBI version is running".format(
5241 db_vnfr["member-vnf-index-ref"], kdu_name
5242 )
5243 )
5244
5245 max_instance_count = 10
5246 if kdu_profile and "max-number-of-instances" in kdu_profile:
5247 max_instance_count = kdu_profile.get(
5248 "max-number-of-instances", 10
5249 )
5250
5251 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5252 deployed_kdu, _ = get_deployed_kdu(
5253 nsr_deployed, kdu_name, vnf_index
5254 )
5255 if deployed_kdu is None:
5256 raise LcmException(
5257 "KDU '{}' for vnf '{}' not deployed".format(
5258 kdu_name, vnf_index
5259 )
5260 )
5261 kdu_instance = deployed_kdu.get("kdu-instance")
5262 instance_num = await self.k8scluster_map[
5263 k8s_cluster_type
5264 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5265 kdu_replica_count = instance_num + kdu_delta.get(
5266 "number-of-instances", 1
5267 )
5268
5269 # Control if new count is over max and instance_num is less than max.
5270 # Then assign max instance number to kdu replica count
5271 if kdu_replica_count > max_instance_count > instance_num:
5272 kdu_replica_count = max_instance_count
5273 if kdu_replica_count > max_instance_count:
5274 raise LcmException(
5275 "reached the limit of {} (max-instance-count) "
5276 "scaling-out operations for the "
5277 "scaling-group-descriptor '{}'".format(
5278 instance_num, scaling_group
5279 )
5280 )
5281
5282 for x in range(kdu_delta.get("number-of-instances", 1)):
5283 vca_scaling_info.append(
5284 {
5285 "osm_kdu_id": kdu_name,
5286 "member-vnf-index": vnf_index,
5287 "type": "create",
5288 "kdu_index": instance_num + x - 1,
5289 }
5290 )
5291 scaling_info["kdu-create"][kdu_name].append(
5292 {
5293 "member-vnf-index": vnf_index,
5294 "type": "create",
5295 "k8s-cluster-type": k8s_cluster_type,
5296 "resource-name": resource_name,
5297 "scale": kdu_replica_count,
5298 }
5299 )
5300 elif scaling_type == "SCALE_IN":
5301 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5302
5303 scaling_info["scaling_direction"] = "IN"
5304 scaling_info["vdu-delete"] = {}
5305 scaling_info["kdu-delete"] = {}
5306
5307 for delta in deltas:
5308 for vdu_delta in delta.get("vdu-delta", {}):
5309 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5310 min_instance_count = 0
5311 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5312 if vdu_profile and "min-number-of-instances" in vdu_profile:
5313 min_instance_count = vdu_profile["min-number-of-instances"]
5314
5315 default_instance_num = get_number_of_instances(
5316 db_vnfd, vdu_delta["id"]
5317 )
5318 instance_num = vdu_delta.get("number-of-instances", 1)
5319 nb_scale_op -= instance_num
5320
5321 new_instance_count = nb_scale_op + default_instance_num
5322
5323 if new_instance_count < min_instance_count < vdu_count:
5324 instances_number = min_instance_count - new_instance_count
5325 else:
5326 instances_number = instance_num
5327
5328 if new_instance_count < min_instance_count:
5329 raise LcmException(
5330 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5331 "scaling-group-descriptor '{}'".format(
5332 nb_scale_op, scaling_group
5333 )
5334 )
5335 for x in range(vdu_delta.get("number-of-instances", 1)):
5336 vca_scaling_info.append(
5337 {
5338 "osm_vdu_id": vdu_delta["id"],
5339 "member-vnf-index": vnf_index,
5340 "type": "delete",
5341 "vdu_index": vdu_index - 1 - x,
5342 }
5343 )
5344 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5345 for kdu_delta in delta.get("kdu-resource-delta", {}):
5346 kdu_profile = get_kdu_profile(db_vnfd, kdu_delta["id"])
5347 kdu_name = kdu_profile["kdu-name"]
5348 resource_name = kdu_profile["resource-name"]
5349
5350 if not scaling_info["kdu-delete"].get(kdu_name, None):
5351 scaling_info["kdu-delete"][kdu_name] = []
5352
5353 kdur = get_kdur(db_vnfr, kdu_name)
5354 if kdur.get("helm-chart"):
5355 k8s_cluster_type = "helm-chart-v3"
5356 self.logger.debug("kdur: {}".format(kdur))
5357 if (
5358 kdur.get("helm-version")
5359 and kdur.get("helm-version") == "v2"
5360 ):
5361 k8s_cluster_type = "helm-chart"
5362 raise NotImplementedError
5363 elif kdur.get("juju-bundle"):
5364 k8s_cluster_type = "juju-bundle"
5365 else:
5366 raise LcmException(
5367 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5368 "juju-bundle. Maybe an old NBI version is running".format(
5369 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5370 )
5371 )
5372
5373 min_instance_count = 0
5374 if kdu_profile and "min-number-of-instances" in kdu_profile:
5375 min_instance_count = kdu_profile["min-number-of-instances"]
5376
5377 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5378 deployed_kdu, _ = get_deployed_kdu(
5379 nsr_deployed, kdu_name, vnf_index
5380 )
5381 if deployed_kdu is None:
5382 raise LcmException(
5383 "KDU '{}' for vnf '{}' not deployed".format(
5384 kdu_name, vnf_index
5385 )
5386 )
5387 kdu_instance = deployed_kdu.get("kdu-instance")
5388 instance_num = await self.k8scluster_map[
5389 k8s_cluster_type
5390 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5391 kdu_replica_count = instance_num - kdu_delta.get(
5392 "number-of-instances", 1
5393 )
5394
5395 if kdu_replica_count < min_instance_count < instance_num:
5396 kdu_replica_count = min_instance_count
5397 if kdu_replica_count < min_instance_count:
5398 raise LcmException(
5399 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5400 "scaling-group-descriptor '{}'".format(
5401 instance_num, scaling_group
5402 )
5403 )
5404
5405 for x in range(kdu_delta.get("number-of-instances", 1)):
5406 vca_scaling_info.append(
5407 {
5408 "osm_kdu_id": kdu_name,
5409 "member-vnf-index": vnf_index,
5410 "type": "delete",
5411 "kdu_index": instance_num - x - 1,
5412 }
5413 )
5414 scaling_info["kdu-delete"][kdu_name].append(
5415 {
5416 "member-vnf-index": vnf_index,
5417 "type": "delete",
5418 "k8s-cluster-type": k8s_cluster_type,
5419 "resource-name": resource_name,
5420 "scale": kdu_replica_count,
5421 }
5422 )
5423
5424 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5425 vdu_delete = copy(scaling_info.get("vdu-delete"))
5426 if scaling_info["scaling_direction"] == "IN":
5427 for vdur in reversed(db_vnfr["vdur"]):
5428 if vdu_delete.get(vdur["vdu-id-ref"]):
5429 vdu_delete[vdur["vdu-id-ref"]] -= 1
5430 scaling_info["vdu"].append(
5431 {
5432 "name": vdur.get("name") or vdur.get("vdu-name"),
5433 "vdu_id": vdur["vdu-id-ref"],
5434 "interface": [],
5435 }
5436 )
5437 for interface in vdur["interfaces"]:
5438 scaling_info["vdu"][-1]["interface"].append(
5439 {
5440 "name": interface["name"],
5441 "ip_address": interface["ip-address"],
5442 "mac_address": interface.get("mac-address"),
5443 }
5444 )
5445 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5446
5447 # PRE-SCALE BEGIN
5448 step = "Executing pre-scale vnf-config-primitive"
5449 if scaling_descriptor.get("scaling-config-action"):
5450 for scaling_config_action in scaling_descriptor[
5451 "scaling-config-action"
5452 ]:
5453 if (
5454 scaling_config_action.get("trigger") == "pre-scale-in"
5455 and scaling_type == "SCALE_IN"
5456 ) or (
5457 scaling_config_action.get("trigger") == "pre-scale-out"
5458 and scaling_type == "SCALE_OUT"
5459 ):
5460 vnf_config_primitive = scaling_config_action[
5461 "vnf-config-primitive-name-ref"
5462 ]
5463 step = db_nslcmop_update[
5464 "detailed-status"
5465 ] = "executing pre-scale scaling-config-action '{}'".format(
5466 vnf_config_primitive
5467 )
5468
5469 # look for primitive
5470 for config_primitive in (
5471 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5472 ).get("config-primitive", ()):
5473 if config_primitive["name"] == vnf_config_primitive:
5474 break
5475 else:
5476 raise LcmException(
5477 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5478 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5479 "primitive".format(scaling_group, vnf_config_primitive)
5480 )
5481
5482 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5483 if db_vnfr.get("additionalParamsForVnf"):
5484 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5485
5486 scale_process = "VCA"
5487 db_nsr_update["config-status"] = "configuring pre-scaling"
5488 primitive_params = self._map_primitive_params(
5489 config_primitive, {}, vnfr_params
5490 )
5491
5492 # Pre-scale retry check: Check if this sub-operation has been executed before
5493 op_index = self._check_or_add_scale_suboperation(
5494 db_nslcmop,
5495 vnf_index,
5496 vnf_config_primitive,
5497 primitive_params,
5498 "PRE-SCALE",
5499 )
5500 if op_index == self.SUBOPERATION_STATUS_SKIP:
5501 # Skip sub-operation
5502 result = "COMPLETED"
5503 result_detail = "Done"
5504 self.logger.debug(
5505 logging_text
5506 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5507 vnf_config_primitive, result, result_detail
5508 )
5509 )
5510 else:
5511 if op_index == self.SUBOPERATION_STATUS_NEW:
5512 # New sub-operation: Get index of this sub-operation
5513 op_index = (
5514 len(db_nslcmop.get("_admin", {}).get("operations"))
5515 - 1
5516 )
5517 self.logger.debug(
5518 logging_text
5519 + "vnf_config_primitive={} New sub-operation".format(
5520 vnf_config_primitive
5521 )
5522 )
5523 else:
5524 # retry: Get registered params for this existing sub-operation
5525 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5526 op_index
5527 ]
5528 vnf_index = op.get("member_vnf_index")
5529 vnf_config_primitive = op.get("primitive")
5530 primitive_params = op.get("primitive_params")
5531 self.logger.debug(
5532 logging_text
5533 + "vnf_config_primitive={} Sub-operation retry".format(
5534 vnf_config_primitive
5535 )
5536 )
5537 # Execute the primitive, either with new (first-time) or registered (reintent) args
5538 ee_descriptor_id = config_primitive.get(
5539 "execution-environment-ref"
5540 )
5541 primitive_name = config_primitive.get(
5542 "execution-environment-primitive", vnf_config_primitive
5543 )
5544 ee_id, vca_type = self._look_for_deployed_vca(
5545 nsr_deployed["VCA"],
5546 member_vnf_index=vnf_index,
5547 vdu_id=None,
5548 vdu_count_index=None,
5549 ee_descriptor_id=ee_descriptor_id,
5550 )
5551 result, result_detail = await self._ns_execute_primitive(
5552 ee_id,
5553 primitive_name,
5554 primitive_params,
5555 vca_type=vca_type,
5556 vca_id=vca_id,
5557 )
5558 self.logger.debug(
5559 logging_text
5560 + "vnf_config_primitive={} Done with result {} {}".format(
5561 vnf_config_primitive, result, result_detail
5562 )
5563 )
5564 # Update operationState = COMPLETED | FAILED
5565 self._update_suboperation_status(
5566 db_nslcmop, op_index, result, result_detail
5567 )
5568
5569 if result == "FAILED":
5570 raise LcmException(result_detail)
5571 db_nsr_update["config-status"] = old_config_status
5572 scale_process = None
5573 # PRE-SCALE END
5574
5575 db_nsr_update[
5576 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5577 ] = nb_scale_op
5578 db_nsr_update[
5579 "_admin.scaling-group.{}.time".format(admin_scale_index)
5580 ] = time()
5581
5582 # SCALE-IN VCA - BEGIN
5583 if vca_scaling_info:
5584 step = db_nslcmop_update[
5585 "detailed-status"
5586 ] = "Deleting the execution environments"
5587 scale_process = "VCA"
5588 for vca_info in vca_scaling_info:
5589 if vca_info["type"] == "delete":
5590 member_vnf_index = str(vca_info["member-vnf-index"])
5591 self.logger.debug(
5592 logging_text + "vdu info: {}".format(vca_info)
5593 )
5594 if vca_info.get("osm_vdu_id"):
5595 vdu_id = vca_info["osm_vdu_id"]
5596 vdu_index = int(vca_info["vdu_index"])
5597 stage[
5598 1
5599 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5600 member_vnf_index, vdu_id, vdu_index
5601 )
5602 else:
5603 vdu_index = 0
5604 kdu_id = vca_info["osm_kdu_id"]
5605 stage[
5606 1
5607 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5608 member_vnf_index, kdu_id, vdu_index
5609 )
5610 stage[2] = step = "Scaling in VCA"
5611 self._write_op_status(op_id=nslcmop_id, stage=stage)
5612 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5613 config_update = db_nsr["configurationStatus"]
5614 for vca_index, vca in enumerate(vca_update):
5615 if (
5616 (vca or vca.get("ee_id"))
5617 and vca["member-vnf-index"] == member_vnf_index
5618 and vca["vdu_count_index"] == vdu_index
5619 ):
5620 if vca.get("vdu_id"):
5621 config_descriptor = get_configuration(
5622 db_vnfd, vca.get("vdu_id")
5623 )
5624 elif vca.get("kdu_name"):
5625 config_descriptor = get_configuration(
5626 db_vnfd, vca.get("kdu_name")
5627 )
5628 else:
5629 config_descriptor = get_configuration(
5630 db_vnfd, db_vnfd["id"]
5631 )
5632 operation_params = (
5633 db_nslcmop.get("operationParams") or {}
5634 )
5635 exec_terminate_primitives = not operation_params.get(
5636 "skip_terminate_primitives"
5637 ) and vca.get("needed_terminate")
5638 task = asyncio.ensure_future(
5639 asyncio.wait_for(
5640 self.destroy_N2VC(
5641 logging_text,
5642 db_nslcmop,
5643 vca,
5644 config_descriptor,
5645 vca_index,
5646 destroy_ee=True,
5647 exec_primitives=exec_terminate_primitives,
5648 scaling_in=True,
5649 vca_id=vca_id,
5650 ),
5651 timeout=self.timeout_charm_delete,
5652 )
5653 )
5654 tasks_dict_info[task] = "Terminating VCA {}".format(
5655 vca.get("ee_id")
5656 )
5657 del vca_update[vca_index]
5658 del config_update[vca_index]
5659 # wait for pending tasks of terminate primitives
5660 if tasks_dict_info:
5661 self.logger.debug(
5662 logging_text
5663 + "Waiting for tasks {}".format(
5664 list(tasks_dict_info.keys())
5665 )
5666 )
5667 error_list = await self._wait_for_tasks(
5668 logging_text,
5669 tasks_dict_info,
5670 min(
5671 self.timeout_charm_delete, self.timeout_ns_terminate
5672 ),
5673 stage,
5674 nslcmop_id,
5675 )
5676 tasks_dict_info.clear()
5677 if error_list:
5678 raise LcmException("; ".join(error_list))
5679
5680 db_vca_and_config_update = {
5681 "_admin.deployed.VCA": vca_update,
5682 "configurationStatus": config_update,
5683 }
5684 self.update_db_2(
5685 "nsrs", db_nsr["_id"], db_vca_and_config_update
5686 )
5687 scale_process = None
5688 # SCALE-IN VCA - END
5689
5690 # SCALE RO - BEGIN
5691 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5692 scale_process = "RO"
5693 if self.ro_config.get("ng"):
5694 await self._scale_ng_ro(
5695 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5696 )
5697 scaling_info.pop("vdu-create", None)
5698 scaling_info.pop("vdu-delete", None)
5699
5700 scale_process = None
5701 # SCALE RO - END
5702
5703 # SCALE KDU - BEGIN
5704 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5705 scale_process = "KDU"
5706 await self._scale_kdu(
5707 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5708 )
5709 scaling_info.pop("kdu-create", None)
5710 scaling_info.pop("kdu-delete", None)
5711
5712 scale_process = None
5713 # SCALE KDU - END
5714
5715 if db_nsr_update:
5716 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5717
5718 # SCALE-UP VCA - BEGIN
5719 if vca_scaling_info:
5720 step = db_nslcmop_update[
5721 "detailed-status"
5722 ] = "Creating new execution environments"
5723 scale_process = "VCA"
5724 for vca_info in vca_scaling_info:
5725 if vca_info["type"] == "create":
5726 member_vnf_index = str(vca_info["member-vnf-index"])
5727 self.logger.debug(
5728 logging_text + "vdu info: {}".format(vca_info)
5729 )
5730 vnfd_id = db_vnfr["vnfd-ref"]
5731 if vca_info.get("osm_vdu_id"):
5732 vdu_index = int(vca_info["vdu_index"])
5733 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5734 if db_vnfr.get("additionalParamsForVnf"):
5735 deploy_params.update(
5736 parse_yaml_strings(
5737 db_vnfr["additionalParamsForVnf"].copy()
5738 )
5739 )
5740 descriptor_config = get_configuration(
5741 db_vnfd, db_vnfd["id"]
5742 )
5743 if descriptor_config:
5744 vdu_id = None
5745 vdu_name = None
5746 kdu_name = None
5747 self._deploy_n2vc(
5748 logging_text=logging_text
5749 + "member_vnf_index={} ".format(member_vnf_index),
5750 db_nsr=db_nsr,
5751 db_vnfr=db_vnfr,
5752 nslcmop_id=nslcmop_id,
5753 nsr_id=nsr_id,
5754 nsi_id=nsi_id,
5755 vnfd_id=vnfd_id,
5756 vdu_id=vdu_id,
5757 kdu_name=kdu_name,
5758 member_vnf_index=member_vnf_index,
5759 vdu_index=vdu_index,
5760 vdu_name=vdu_name,
5761 deploy_params=deploy_params,
5762 descriptor_config=descriptor_config,
5763 base_folder=base_folder,
5764 task_instantiation_info=tasks_dict_info,
5765 stage=stage,
5766 )
5767 vdu_id = vca_info["osm_vdu_id"]
5768 vdur = find_in_list(
5769 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5770 )
5771 descriptor_config = get_configuration(db_vnfd, vdu_id)
5772 if vdur.get("additionalParams"):
5773 deploy_params_vdu = parse_yaml_strings(
5774 vdur["additionalParams"]
5775 )
5776 else:
5777 deploy_params_vdu = deploy_params
5778 deploy_params_vdu["OSM"] = get_osm_params(
5779 db_vnfr, vdu_id, vdu_count_index=vdu_index
5780 )
5781 if descriptor_config:
5782 vdu_name = None
5783 kdu_name = None
5784 stage[
5785 1
5786 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5787 member_vnf_index, vdu_id, vdu_index
5788 )
5789 stage[2] = step = "Scaling out VCA"
5790 self._write_op_status(op_id=nslcmop_id, stage=stage)
5791 self._deploy_n2vc(
5792 logging_text=logging_text
5793 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5794 member_vnf_index, vdu_id, vdu_index
5795 ),
5796 db_nsr=db_nsr,
5797 db_vnfr=db_vnfr,
5798 nslcmop_id=nslcmop_id,
5799 nsr_id=nsr_id,
5800 nsi_id=nsi_id,
5801 vnfd_id=vnfd_id,
5802 vdu_id=vdu_id,
5803 kdu_name=kdu_name,
5804 member_vnf_index=member_vnf_index,
5805 vdu_index=vdu_index,
5806 vdu_name=vdu_name,
5807 deploy_params=deploy_params_vdu,
5808 descriptor_config=descriptor_config,
5809 base_folder=base_folder,
5810 task_instantiation_info=tasks_dict_info,
5811 stage=stage,
5812 )
5813 else:
5814 kdu_name = vca_info["osm_kdu_id"]
5815 descriptor_config = get_configuration(db_vnfd, kdu_name)
5816 if descriptor_config:
5817 vdu_id = None
5818 kdu_index = int(vca_info["kdu_index"])
5819 vdu_name = None
5820 kdur = next(
5821 x
5822 for x in db_vnfr["kdur"]
5823 if x["kdu-name"] == kdu_name
5824 )
5825 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5826 if kdur.get("additionalParams"):
5827 deploy_params_kdu = parse_yaml_strings(
5828 kdur["additionalParams"]
5829 )
5830
5831 self._deploy_n2vc(
5832 logging_text=logging_text,
5833 db_nsr=db_nsr,
5834 db_vnfr=db_vnfr,
5835 nslcmop_id=nslcmop_id,
5836 nsr_id=nsr_id,
5837 nsi_id=nsi_id,
5838 vnfd_id=vnfd_id,
5839 vdu_id=vdu_id,
5840 kdu_name=kdu_name,
5841 member_vnf_index=member_vnf_index,
5842 vdu_index=kdu_index,
5843 vdu_name=vdu_name,
5844 deploy_params=deploy_params_kdu,
5845 descriptor_config=descriptor_config,
5846 base_folder=base_folder,
5847 task_instantiation_info=tasks_dict_info,
5848 stage=stage,
5849 )
5850 # SCALE-UP VCA - END
5851 scale_process = None
5852
5853 # POST-SCALE BEGIN
5854 # execute primitive service POST-SCALING
5855 step = "Executing post-scale vnf-config-primitive"
5856 if scaling_descriptor.get("scaling-config-action"):
5857 for scaling_config_action in scaling_descriptor[
5858 "scaling-config-action"
5859 ]:
5860 if (
5861 scaling_config_action.get("trigger") == "post-scale-in"
5862 and scaling_type == "SCALE_IN"
5863 ) or (
5864 scaling_config_action.get("trigger") == "post-scale-out"
5865 and scaling_type == "SCALE_OUT"
5866 ):
5867 vnf_config_primitive = scaling_config_action[
5868 "vnf-config-primitive-name-ref"
5869 ]
5870 step = db_nslcmop_update[
5871 "detailed-status"
5872 ] = "executing post-scale scaling-config-action '{}'".format(
5873 vnf_config_primitive
5874 )
5875
5876 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5877 if db_vnfr.get("additionalParamsForVnf"):
5878 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5879
5880 # look for primitive
5881 for config_primitive in (
5882 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5883 ).get("config-primitive", ()):
5884 if config_primitive["name"] == vnf_config_primitive:
5885 break
5886 else:
5887 raise LcmException(
5888 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5889 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5890 "config-primitive".format(
5891 scaling_group, vnf_config_primitive
5892 )
5893 )
5894 scale_process = "VCA"
5895 db_nsr_update["config-status"] = "configuring post-scaling"
5896 primitive_params = self._map_primitive_params(
5897 config_primitive, {}, vnfr_params
5898 )
5899
5900 # Post-scale retry check: Check if this sub-operation has been executed before
5901 op_index = self._check_or_add_scale_suboperation(
5902 db_nslcmop,
5903 vnf_index,
5904 vnf_config_primitive,
5905 primitive_params,
5906 "POST-SCALE",
5907 )
5908 if op_index == self.SUBOPERATION_STATUS_SKIP:
5909 # Skip sub-operation
5910 result = "COMPLETED"
5911 result_detail = "Done"
5912 self.logger.debug(
5913 logging_text
5914 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5915 vnf_config_primitive, result, result_detail
5916 )
5917 )
5918 else:
5919 if op_index == self.SUBOPERATION_STATUS_NEW:
5920 # New sub-operation: Get index of this sub-operation
5921 op_index = (
5922 len(db_nslcmop.get("_admin", {}).get("operations"))
5923 - 1
5924 )
5925 self.logger.debug(
5926 logging_text
5927 + "vnf_config_primitive={} New sub-operation".format(
5928 vnf_config_primitive
5929 )
5930 )
5931 else:
5932 # retry: Get registered params for this existing sub-operation
5933 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5934 op_index
5935 ]
5936 vnf_index = op.get("member_vnf_index")
5937 vnf_config_primitive = op.get("primitive")
5938 primitive_params = op.get("primitive_params")
5939 self.logger.debug(
5940 logging_text
5941 + "vnf_config_primitive={} Sub-operation retry".format(
5942 vnf_config_primitive
5943 )
5944 )
5945 # Execute the primitive, either with new (first-time) or registered (reintent) args
5946 ee_descriptor_id = config_primitive.get(
5947 "execution-environment-ref"
5948 )
5949 primitive_name = config_primitive.get(
5950 "execution-environment-primitive", vnf_config_primitive
5951 )
5952 ee_id, vca_type = self._look_for_deployed_vca(
5953 nsr_deployed["VCA"],
5954 member_vnf_index=vnf_index,
5955 vdu_id=None,
5956 vdu_count_index=None,
5957 ee_descriptor_id=ee_descriptor_id,
5958 )
5959 result, result_detail = await self._ns_execute_primitive(
5960 ee_id,
5961 primitive_name,
5962 primitive_params,
5963 vca_type=vca_type,
5964 vca_id=vca_id,
5965 )
5966 self.logger.debug(
5967 logging_text
5968 + "vnf_config_primitive={} Done with result {} {}".format(
5969 vnf_config_primitive, result, result_detail
5970 )
5971 )
5972 # Update operationState = COMPLETED | FAILED
5973 self._update_suboperation_status(
5974 db_nslcmop, op_index, result, result_detail
5975 )
5976
5977 if result == "FAILED":
5978 raise LcmException(result_detail)
5979 db_nsr_update["config-status"] = old_config_status
5980 scale_process = None
5981 # POST-SCALE END
5982
5983 db_nsr_update[
5984 "detailed-status"
5985 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5986 db_nsr_update["operational-status"] = (
5987 "running"
5988 if old_operational_status == "failed"
5989 else old_operational_status
5990 )
5991 db_nsr_update["config-status"] = old_config_status
5992 return
5993 except (
5994 ROclient.ROClientException,
5995 DbException,
5996 LcmException,
5997 NgRoException,
5998 ) as e:
5999 self.logger.error(logging_text + "Exit Exception {}".format(e))
6000 exc = e
6001 except asyncio.CancelledError:
6002 self.logger.error(
6003 logging_text + "Cancelled Exception while '{}'".format(step)
6004 )
6005 exc = "Operation was cancelled"
6006 except Exception as e:
6007 exc = traceback.format_exc()
6008 self.logger.critical(
6009 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6010 exc_info=True,
6011 )
6012 finally:
6013 self._write_ns_status(
6014 nsr_id=nsr_id,
6015 ns_state=None,
6016 current_operation="IDLE",
6017 current_operation_id=None,
6018 )
6019 if tasks_dict_info:
6020 stage[1] = "Waiting for instantiate pending tasks."
6021 self.logger.debug(logging_text + stage[1])
6022 exc = await self._wait_for_tasks(
6023 logging_text,
6024 tasks_dict_info,
6025 self.timeout_ns_deploy,
6026 stage,
6027 nslcmop_id,
6028 nsr_id=nsr_id,
6029 )
6030 if exc:
6031 db_nslcmop_update[
6032 "detailed-status"
6033 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6034 nslcmop_operation_state = "FAILED"
6035 if db_nsr:
6036 db_nsr_update["operational-status"] = old_operational_status
6037 db_nsr_update["config-status"] = old_config_status
6038 db_nsr_update["detailed-status"] = ""
6039 if scale_process:
6040 if "VCA" in scale_process:
6041 db_nsr_update["config-status"] = "failed"
6042 if "RO" in scale_process:
6043 db_nsr_update["operational-status"] = "failed"
6044 db_nsr_update[
6045 "detailed-status"
6046 ] = "FAILED scaling nslcmop={} {}: {}".format(
6047 nslcmop_id, step, exc
6048 )
6049 else:
6050 error_description_nslcmop = None
6051 nslcmop_operation_state = "COMPLETED"
6052 db_nslcmop_update["detailed-status"] = "Done"
6053
6054 self._write_op_status(
6055 op_id=nslcmop_id,
6056 stage="",
6057 error_message=error_description_nslcmop,
6058 operation_state=nslcmop_operation_state,
6059 other_update=db_nslcmop_update,
6060 )
6061 if db_nsr:
6062 self._write_ns_status(
6063 nsr_id=nsr_id,
6064 ns_state=None,
6065 current_operation="IDLE",
6066 current_operation_id=None,
6067 other_update=db_nsr_update,
6068 )
6069
6070 if nslcmop_operation_state:
6071 try:
6072 msg = {
6073 "nsr_id": nsr_id,
6074 "nslcmop_id": nslcmop_id,
6075 "operationState": nslcmop_operation_state,
6076 }
6077 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6078 except Exception as e:
6079 self.logger.error(
6080 logging_text + "kafka_write notification Exception {}".format(e)
6081 )
6082 self.logger.debug(logging_text + "Exit")
6083 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6084
6085 async def _scale_kdu(
6086 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6087 ):
6088 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6089 for kdu_name in _scaling_info:
6090 for kdu_scaling_info in _scaling_info[kdu_name]:
6091 deployed_kdu, index = get_deployed_kdu(
6092 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6093 )
6094 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6095 kdu_instance = deployed_kdu["kdu-instance"]
6096 scale = int(kdu_scaling_info["scale"])
6097 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6098
6099 db_dict = {
6100 "collection": "nsrs",
6101 "filter": {"_id": nsr_id},
6102 "path": "_admin.deployed.K8s.{}".format(index),
6103 }
6104
6105 step = "scaling application {}".format(
6106 kdu_scaling_info["resource-name"]
6107 )
6108 self.logger.debug(logging_text + step)
6109
6110 if kdu_scaling_info["type"] == "delete":
6111 kdu_config = get_configuration(db_vnfd, kdu_name)
6112 if (
6113 kdu_config
6114 and kdu_config.get("terminate-config-primitive")
6115 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6116 ):
6117 terminate_config_primitive_list = kdu_config.get(
6118 "terminate-config-primitive"
6119 )
6120 terminate_config_primitive_list.sort(
6121 key=lambda val: int(val["seq"])
6122 )
6123
6124 for (
6125 terminate_config_primitive
6126 ) in terminate_config_primitive_list:
6127 primitive_params_ = self._map_primitive_params(
6128 terminate_config_primitive, {}, {}
6129 )
6130 step = "execute terminate config primitive"
6131 self.logger.debug(logging_text + step)
6132 await asyncio.wait_for(
6133 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6134 cluster_uuid=cluster_uuid,
6135 kdu_instance=kdu_instance,
6136 primitive_name=terminate_config_primitive["name"],
6137 params=primitive_params_,
6138 db_dict=db_dict,
6139 vca_id=vca_id,
6140 ),
6141 timeout=600,
6142 )
6143
6144 await asyncio.wait_for(
6145 self.k8scluster_map[k8s_cluster_type].scale(
6146 kdu_instance,
6147 scale,
6148 kdu_scaling_info["resource-name"],
6149 vca_id=vca_id,
6150 ),
6151 timeout=self.timeout_vca_on_error,
6152 )
6153
6154 if kdu_scaling_info["type"] == "create":
6155 kdu_config = get_configuration(db_vnfd, kdu_name)
6156 if (
6157 kdu_config
6158 and kdu_config.get("initial-config-primitive")
6159 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6160 ):
6161 initial_config_primitive_list = kdu_config.get(
6162 "initial-config-primitive"
6163 )
6164 initial_config_primitive_list.sort(
6165 key=lambda val: int(val["seq"])
6166 )
6167
6168 for initial_config_primitive in initial_config_primitive_list:
6169 primitive_params_ = self._map_primitive_params(
6170 initial_config_primitive, {}, {}
6171 )
6172 step = "execute initial config primitive"
6173 self.logger.debug(logging_text + step)
6174 await asyncio.wait_for(
6175 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6176 cluster_uuid=cluster_uuid,
6177 kdu_instance=kdu_instance,
6178 primitive_name=initial_config_primitive["name"],
6179 params=primitive_params_,
6180 db_dict=db_dict,
6181 vca_id=vca_id,
6182 ),
6183 timeout=600,
6184 )
6185
6186 async def _scale_ng_ro(
6187 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6188 ):
6189 nsr_id = db_nslcmop["nsInstanceId"]
6190 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6191 db_vnfrs = {}
6192
6193 # read from db: vnfd's for every vnf
6194 db_vnfds = []
6195
6196 # for each vnf in ns, read vnfd
6197 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6198 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6199 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6200 # if we haven't this vnfd, read it from db
6201 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6202 # read from db
6203 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6204 db_vnfds.append(vnfd)
6205 n2vc_key = self.n2vc.get_public_key()
6206 n2vc_key_list = [n2vc_key]
6207 self.scale_vnfr(
6208 db_vnfr,
6209 vdu_scaling_info.get("vdu-create"),
6210 vdu_scaling_info.get("vdu-delete"),
6211 mark_delete=True,
6212 )
6213 # db_vnfr has been updated, update db_vnfrs to use it
6214 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6215 await self._instantiate_ng_ro(
6216 logging_text,
6217 nsr_id,
6218 db_nsd,
6219 db_nsr,
6220 db_nslcmop,
6221 db_vnfrs,
6222 db_vnfds,
6223 n2vc_key_list,
6224 stage=stage,
6225 start_deploy=time(),
6226 timeout_ns_deploy=self.timeout_ns_deploy,
6227 )
6228 if vdu_scaling_info.get("vdu-delete"):
6229 self.scale_vnfr(
6230 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6231 )
6232
6233 async def add_prometheus_metrics(
6234 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6235 ):
6236 if not self.prometheus:
6237 return
6238 # look if exist a file called 'prometheus*.j2' and
6239 artifact_content = self.fs.dir_ls(artifact_path)
6240 job_file = next(
6241 (
6242 f
6243 for f in artifact_content
6244 if f.startswith("prometheus") and f.endswith(".j2")
6245 ),
6246 None,
6247 )
6248 if not job_file:
6249 return
6250 with self.fs.file_open((artifact_path, job_file), "r") as f:
6251 job_data = f.read()
6252
6253 # TODO get_service
6254 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6255 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6256 host_port = "80"
6257 vnfr_id = vnfr_id.replace("-", "")
6258 variables = {
6259 "JOB_NAME": vnfr_id,
6260 "TARGET_IP": target_ip,
6261 "EXPORTER_POD_IP": host_name,
6262 "EXPORTER_POD_PORT": host_port,
6263 }
6264 job_list = self.prometheus.parse_job(job_data, variables)
6265 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6266 for job in job_list:
6267 if (
6268 not isinstance(job.get("job_name"), str)
6269 or vnfr_id not in job["job_name"]
6270 ):
6271 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6272 job["nsr_id"] = nsr_id
6273 job_dict = {jl["job_name"]: jl for jl in job_list}
6274 if await self.prometheus.update(job_dict):
6275 return list(job_dict.keys())
6276
6277 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6278 """
6279 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6280
6281 :param: vim_account_id: VIM Account ID
6282
6283 :return: (cloud_name, cloud_credential)
6284 """
6285 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6286 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6287
6288 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6289 """
6290 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6291
6292 :param: vim_account_id: VIM Account ID
6293
6294 :return: (cloud_name, cloud_credential)
6295 """
6296 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6297 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")