from osm_lcm.ng_ro import NgRoClient, NgRoException
from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
from n2vc.k8s_helm_conn import K8sHelmConnector
+from n2vc.k8s_helm3_conn import K8sHelm3Connector
from n2vc.k8s_juju_conn import K8sJujuConnector
from osm_common.dbbase import DbException
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-class N2VCJujuConnectorLCM(N2VCJujuConnector):
-
- async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
- progress_timeout: float = None, total_timeout: float = None,
- config: dict = None, artifact_path: str = None,
- vca_type: str = None) -> (str, dict):
- # admit two new parameters, artifact_path and vca_type
- if vca_type == "k8s_proxy_charm":
- ee_id = await self.install_k8s_proxy_charm(
- charm_name=artifact_path[artifact_path.rfind("/") + 1:],
- namespace=namespace,
- artifact_path=artifact_path,
- db_dict=db_dict)
- return ee_id, None
- else:
- return await super().create_execution_environment(
- namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
- progress_timeout=progress_timeout, total_timeout=total_timeout)
-
- async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
- progress_timeout: float = None, total_timeout: float = None,
- config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
- if vca_type == "k8s_proxy_charm":
- return
- return await super().install_configuration_sw(
- ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
- total_timeout=total_timeout, config=config, num_units=num_units)
-
-
class NsLcm(LcmBase):
timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
self.vca_config = config["VCA"].copy()
# create N2VC connector
- self.n2vc = N2VCJujuConnectorLCM(
+ self.n2vc = N2VCJujuConnector(
db=self.db,
fs=self.fs,
log=self.logger,
on_update_db=self._on_update_n2vc_db
)
- self.k8sclusterhelm = K8sHelmConnector(
+ self.k8sclusterhelm2 = K8sHelmConnector(
kubectl_command=self.vca_config.get("kubectlpath"),
helm_command=self.vca_config.get("helmpath"),
fs=self.fs,
on_update_db=None,
)
+ self.k8sclusterhelm3 = K8sHelm3Connector(
+ kubectl_command=self.vca_config.get("kubectlpath"),
+ helm_command=self.vca_config.get("helm3path"),
+ fs=self.fs,
+ log=self.logger,
+ db=self.db,
+ on_update_db=None,
+ )
+
self.k8sclusterjuju = K8sJujuConnector(
kubectl_command=self.vca_config.get("kubectlpath"),
juju_command=self.vca_config.get("jujupath"),
fs=self.fs,
log=self.logger,
db=self.db,
+ loop=self.loop,
on_update_db=None,
+ vca_config=self.vca_config,
)
self.k8scluster_map = {
- "helm-chart": self.k8sclusterhelm,
- "chart": self.k8sclusterhelm,
+ "helm-chart": self.k8sclusterhelm2,
+ "helm-chart-v3": self.k8sclusterhelm3,
+ "chart": self.k8sclusterhelm3,
"juju-bundle": self.k8sclusterjuju,
"juju": self.k8sclusterjuju,
}
"lxc_proxy_charm": self.n2vc,
"native_charm": self.n2vc,
"k8s_proxy_charm": self.n2vc,
- "helm": self.conn_helm_ee
+ "helm": self.conn_helm_ee,
+ "helm-v3": self.conn_helm_ee
}
self.prometheus = prometheus
# find old ee_id if exists
ee_id = vca_deployed.get("ee_id")
+ vim_account_id = (
+ deep_get(db_vnfr, ("vim-account-id",)) or
+ deep_get(deploy_params, ("OSM", "vim_account_id"))
+ )
+ vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
+ vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
# create or register execution environment in VCA
- if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
+ if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
self._write_configuration_status(
nsr_id=nsr_id,
step = "create execution environment"
self.logger.debug(logging_text + step)
- ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
- namespace=namespace,
- reuse_ee_id=ee_id,
- db_dict=db_dict,
- config=osm_config,
- artifact_path=artifact_path,
- vca_type=vca_type)
+
+ ee_id = None
+ credentials = None
+ if vca_type == "k8s_proxy_charm":
+ ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
+ charm_name=artifact_path[artifact_path.rfind("/") + 1:],
+ namespace=namespace,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ cloud_name=vca_k8s_cloud,
+ credential_name=vca_k8s_cloud_credential,
+ )
+ else:
+ ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
+ namespace=namespace,
+ reuse_ee_id=ee_id,
+ db_dict=db_dict,
+ config=osm_config,
+ cloud_name=vca_cloud,
+ credential_name=vca_cloud_credential,
+ )
elif vca_type == "native_charm":
step = "Waiting to VM being up and getting IP address"
step = "register execution environment {}".format(credentials)
self.logger.debug(logging_text + step)
ee_id = await self.vca_map[vca_type].register_execution_environment(
- credentials=credentials, namespace=namespace, db_dict=db_dict)
+ credentials=credentials,
+ namespace=namespace,
+ db_dict=db_dict,
+ cloud_name=vca_cloud,
+ credential_name=vca_cloud_credential,
+ )
# for compatibility with MON/POL modules, the need model and application name at database
# TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
if vdu_id == v["vdu-id-ref"]:
num_units = v.get("config-units") or 1
break
-
- await self.vca_map[vca_type].install_configuration_sw(
- ee_id=ee_id,
- artifact_path=artifact_path,
- db_dict=db_dict,
- config=config,
- num_units=num_units,
- vca_type=vca_type
- )
+ if vca_type != "k8s_proxy_charm":
+ await self.vca_map[vca_type].install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config,
+ num_units=num_units,
+ )
# write in db flag of configuration_sw already installed
self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
# if SSH access is required, then get execution environment SSH public
# if native charm we have waited already to VM be UP
- if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
+ if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
pub_key = None
user = None
# self.logger.debug("get ssh key block")
# TODO register in database that primitive is done
# STEP 7 Configure metrics
- if vca_type == "helm":
+ if vca_type == "helm" or vca_type == "helm-v3":
prometheus_jobs = await self.add_prometheus_metrics(
ee_id=ee_id,
artifact_path=artifact_path,
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
- k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
+ k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
async def _get_cluster_id(cluster_id, cluster_type):
nonlocal k8scluster_id_2_uuic
k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
if not k8s_id:
- raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
- cluster_type))
+ if cluster_type == "helm-chart-v3":
+ try:
+ # backward compatibility for existing clusters that have not been initialized for helm v3
+ k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
+ k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
+ reuse_cluster_uuid=cluster_id)
+ db_k8scluster_update = {}
+ db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
+ db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
+ db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
+ db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
+ self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
+ except Exception as e:
+ self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
+ raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
+ cluster_type))
+ else:
+ raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
+ format(cluster_id, cluster_type))
k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
return k8s_id
index = 0
updated_cluster_list = []
+ updated_v3_cluster_list = []
for vnfr_data in db_vnfrs.values():
for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
- k8sclustertype = "helm-chart"
+ # Default version: helm3, if helm-version is v2 assign v2
+ k8sclustertype = "helm-chart-v3"
+ self.logger.debug("kdur: {}".format(kdur))
+ if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
+ k8sclustertype = "helm-chart"
elif kdur.get("juju-bundle"):
kdumodel = kdur["juju-bundle"]
k8sclustertype = "juju-bundle"
cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
# Synchronize repos
- if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
+ if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
+ or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
del_repo_list, added_repo_dict = await asyncio.ensure_future(
- self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
+ self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
if del_repo_list or added_repo_dict:
- unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
- updated = {'_admin.helm_charts_added.' +
- item: name for item, name in added_repo_dict.items()}
- self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
- "to_add: {}".format(k8s_cluster_id, del_repo_list,
- added_repo_dict))
+ if k8sclustertype == "helm-chart":
+ unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
+ updated = {'_admin.helm_charts_added.' +
+ item: name for item, name in added_repo_dict.items()}
+ updated_cluster_list.append(cluster_uuid)
+ elif k8sclustertype == "helm-chart-v3":
+ unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
+ updated = {'_admin.helm_charts_v3_added.' +
+ item: name for item, name in added_repo_dict.items()}
+ updated_v3_cluster_list.append(cluster_uuid)
+ self.logger.debug(logging_text + "repos synchronized on k8s cluster "
+ "'{}' to_delete: {}, to_add: {}".
+ format(k8s_cluster_id, del_repo_list, added_repo_dict))
self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
- updated_cluster_list.append(cluster_uuid)
# Instantiate kdu
step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
vca_type = "native_charm"
elif ee_item.get("helm-chart"):
vca_name = ee_item['helm-chart']
- vca_type = "helm"
+ if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
+ vca_type = "helm"
+ else:
+ vca_type = "helm-v3"
else:
self.logger.debug(logging_text + "skipping non juju neither charm configuration")
continue
vca.get("needed_terminate"))
# For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
# pending native charms
- destroy_ee = True if vca_type in ("helm", "native_charm") else False
+ destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
# self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
# vca_index, vca.get("ee_id"), vca_type, destroy_ee))
task = asyncio.ensure_future(
job_dict = {jl["job_name"]: jl for jl in job_list}
if await self.prometheus.update(job_dict):
return list(job_dict.keys())
+
+ def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
+ """
+ Get VCA Cloud and VCA Cloud Credentials for the VIM account
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: (cloud_name, cloud_credential)
+ """
+ config = self.get_vim_account_config(vim_account_id)
+ return config.get("vca_cloud"), config.get("vca_cloud_credential")
+
+ def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
+ """
+ Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: (cloud_name, cloud_credential)
+ """
+ config = self.get_vim_account_config(vim_account_id)
+ return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
+
+ def get_vim_account_config(self, vim_account_id: str) -> dict:
+ """
+ Get VIM Account config from the OSM Database
+
+ :param: vim_account_id: VIM Account ID
+
+ :return: Dictionary with the config of the vim account
+ """
+ vim_account = self.db.get_one(table="vim_accounts", q_filter={"_id": vim_account_id}, fail_on_empty=False)
+ return vim_account.get("config", {}) if vim_account else {}