blob: 17e960f1a1814e6d92d000e0bc759950909ce40c [file] [log] [blame]
##
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
# This file is part of OSM
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
import asyncio
from typing import Union
from shlex import quote
import os
import yaml
from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
from n2vc.exceptions import K8sException
class K8sHelmConnector(K8sHelmBaseConnector):
"""
####################################################################################
################################### P U B L I C ####################################
####################################################################################
"""
def __init__(
self,
fs: object,
db: object,
kubectl_command: str = "/usr/bin/kubectl",
helm_command: str = "/usr/bin/helm",
log: object = None,
on_update_db=None,
):
"""
Initializes helm connector for helm v2
:param fs: file system for kubernetes and helm configuration
:param db: database object to write current operation status
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
:param log: logger
:param on_update_db: callback called when k8s connector updates database
"""
# parent class
K8sHelmBaseConnector.__init__(
self,
db=db,
log=log,
fs=fs,
kubectl_command=kubectl_command,
helm_command=helm_command,
on_update_db=on_update_db,
)
self.log.info("Initializing K8S Helm2 connector")
# initialize helm client-only
self.log.debug("Initializing helm client-only...")
command = "{} init --client-only {} ".format(
self._helm_command,
"--stable-repo-url {}".format(quote(self._stable_repo_url))
if self._stable_repo_url
else "--skip-repos",
)
try:
asyncio.create_task(
self._local_async_exec(command=command, raise_exception_on_error=False)
)
except Exception as e:
self.warning(
msg="helm init failed (it was already initialized): {}".format(e)
)
self.log.info("K8S Helm2 connector initialized")
async def install(
self,
cluster_uuid: str,
kdu_model: str,
kdu_instance: str,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
**kwargs,
):
"""
Deploys of a new KDU instance. It would implicitly rely on the `install` call
to deploy the Chart/Bundle properly parametrized (in practice, this call would
happen before any _initial-config-primitive_of the VNF is called).
:param cluster_uuid: UUID of a K8s cluster known by OSM
:param kdu_model: chart/reference (string), which can be either
of these options:
- a name of chart available via the repos known by OSM
(e.g. stable/openldap, stable/openldap:1.2.4)
- a path to a packaged chart (e.g. mychart.tgz)
- a path to an unpacked chart directory or a URL (e.g. mychart)
:param kdu_instance: Kdu instance name
:param atomic: If set, installation process purges chart/bundle on fail, also
will wait until all the K8s objects are active
:param timeout: Time in seconds to wait for the install of the chart/bundle
(defaults to Helm default timeout: 300s)
:param params: dictionary of key-value pairs for instantiation parameters
(overriding default values)
:param dict db_dict: where to write into database when the status changes.
It contains a dict with {collection: <str>, filter: {},
path: <str>},
e.g. {collection: "nsrs", filter:
{_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
:param kdu_name: Name of the KDU instance to be installed
:param namespace: K8s namespace to use for the KDU instance
:param kwargs: Additional parameters (None yet)
:return: True if successful
"""
self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
# sync local dir
self.fs.sync(from_path=cluster_uuid)
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_uuid, create_if_not_exist=True
)
await self._install_impl(
cluster_uuid,
kdu_model,
paths,
env,
kdu_instance,
atomic=atomic,
timeout=timeout,
params=params,
db_dict=db_dict,
kdu_name=kdu_name,
namespace=namespace,
)
# sync fs
self.fs.reverse_sync(from_path=cluster_uuid)
self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return await self._exec_inspect_command(
inspect_command="", kdu_model=kdu_model, repo_url=repo_url
)
"""
####################################################################################
################################### P R I V A T E ##################################
####################################################################################
"""
def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
"""
Creates and returns base cluster and kube dirs and returns them.
Also created helm3 dirs according to new directory specification, paths are
returned and also environment variables that must be provided to execute commands
Helm 2 directory specification uses helm_home dir:
The variables assigned for this paths are:
- Helm hone: $HELM_HOME
- helm kubeconfig: $KUBECONFIG
:param cluster_name: cluster_name
:return: Dictionary with config_paths and dictionary with helm environment variables
"""
base = self.fs.path
if base.endswith("/") or base.endswith("\\"):
base = base[:-1]
# base dir for cluster
cluster_dir = base + "/" + cluster_name
# kube dir
kube_dir = cluster_dir + "/" + ".kube"
if create_if_not_exist and not os.path.exists(kube_dir):
self.log.debug("Creating dir {}".format(kube_dir))
os.makedirs(kube_dir)
# helm home dir
helm_dir = cluster_dir + "/" + ".helm"
if create_if_not_exist and not os.path.exists(helm_dir):
self.log.debug("Creating dir {}".format(helm_dir))
os.makedirs(helm_dir)
config_filename = kube_dir + "/config"
# 2 - Prepare dictionary with paths
paths = {
"kube_dir": kube_dir,
"kube_config": config_filename,
"cluster_dir": cluster_dir,
"helm_dir": helm_dir,
}
for file_name, file in paths.items():
if "dir" in file_name and not os.path.exists(file):
err_msg = "{} dir does not exist".format(file)
self.log.error(err_msg)
raise K8sException(err_msg)
# 3 - Prepare environment variables
env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
return paths, env
async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command1 = "env KUBECONFIG={} {} get manifest {} ".format(
kubeconfig, self._helm_command, quote(kdu_instance)
)
command2 = "{} get --namespace={} -f -".format(
self.kubectl_command, quote(namespace)
)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
)
services = self._parse_services(output)
return services
async def _cluster_init(
self, cluster_id: str, namespace: str, paths: dict, env: dict
):
"""
Implements the helm version dependent cluster initialization:
For helm2 it initialized tiller environment if needed
"""
# check if tiller pod is up in cluster
command = "{} --kubeconfig={} --namespace={} get deployments".format(
self.kubectl_command, paths["kube_config"], quote(namespace)
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
output_table = self._output_to_table(output=output)
# find 'tiller' pod in all pods
already_initialized = False
try:
for row in output_table:
if row[0].startswith("tiller-deploy"):
already_initialized = True
break
except Exception:
pass
# helm init
n2vc_installed_sw = False
if not already_initialized:
self.log.info(
"Initializing helm in client and server: {}".format(cluster_id)
)
command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
self.kubectl_command, paths["kube_config"], quote(self.service_account)
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
command = (
"{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
"--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
).format(
self.kubectl_command, paths["kube_config"], quote(self.service_account)
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
command = (
"{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
" {} init"
).format(
self._helm_command,
paths["kube_config"],
quote(namespace),
quote(paths["helm_dir"]),
quote(self.service_account),
"--stable-repo-url {}".format(quote(self._stable_repo_url))
if self._stable_repo_url
else "--skip-repos",
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
n2vc_installed_sw = True
else:
# check client helm installation
check_file = paths["helm_dir"] + "/repository/repositories.yaml"
if not self._check_file_exists(
filename=check_file, exception_if_not_exists=False
):
self.log.info("Initializing helm in client: {}".format(cluster_id))
command = (
"{} --kubeconfig={} --tiller-namespace={} "
"--home={} init --client-only {} "
).format(
self._helm_command,
paths["kube_config"],
quote(namespace),
quote(paths["helm_dir"]),
"--stable-repo-url {}".format(quote(self._stable_repo_url))
if self._stable_repo_url
else "--skip-repos",
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
else:
self.log.info("Helm client already initialized")
repo_list = await self.repo_list(cluster_id)
for repo in repo_list:
if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
self.log.debug("Add new stable repo url: {}")
await self.repo_remove(cluster_id, "stable")
if self._stable_repo_url:
await self.repo_add(cluster_id, "stable", self._stable_repo_url)
break
return n2vc_installed_sw
async def _uninstall_sw(self, cluster_id: str, namespace: str):
# uninstall Tiller if necessary
self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
if not namespace:
# find namespace for tiller pod
command = "{} --kubeconfig={} get deployments --all-namespaces".format(
self.kubectl_command, quote(paths["kube_config"])
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
output_table = self._output_to_table(output=output)
namespace = None
for r in output_table:
try:
if "tiller-deploy" in r[1]:
namespace = r[0]
break
except Exception:
pass
else:
msg = "Tiller deployment not found in cluster {}".format(cluster_id)
self.log.error(msg)
self.log.debug("namespace for tiller: {}".format(namespace))
if namespace:
# uninstall tiller from cluster
self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
command = "{} --kubeconfig={} --home={} reset".format(
self._helm_command,
quote(paths["kube_config"]),
quote(paths["helm_dir"]),
)
self.log.debug("resetting: {}".format(command))
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
# Delete clusterrolebinding and serviceaccount.
# Ignore if errors for backward compatibility
command = (
"{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
"io/osm-tiller-cluster-rule"
).format(self.kubectl_command, quote(paths["kube_config"]))
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
command = (
"{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
self.kubectl_command,
quote(paths["kube_config"]),
quote(namespace),
quote(self.service_account),
)
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
else:
self.log.debug("namespace not found")
async def _instances_list(self, cluster_id):
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} list --output yaml".format(self._helm_command)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
if output and len(output) > 0:
# parse yaml and update keys to lower case to unify with helm3
instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
new_instances = []
for instance in instances:
new_instance = dict((k.lower(), v) for k, v in instance.items())
new_instances.append(new_instance)
return new_instances
else:
return []
def _get_inspect_command(
self, show_command: str, kdu_model: str, repo_str: str, version: str
):
inspect_command = "{} inspect {} {}{} {}".format(
self._helm_command, show_command, quote(kdu_model), repo_str, version
)
return inspect_command
def _get_get_command(
self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
):
get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
kubeconfig, self._helm_command, get_command, quote(kdu_instance)
)
return get_command
async def _status_kdu(
self,
cluster_id: str,
kdu_instance: str,
namespace: str = None,
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = ("env KUBECONFIG={} {} status {} --output yaml").format(
paths["kube_config"], self._helm_command, quote(kdu_instance)
)
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
show_error_log=show_error_log,
env=env,
)
if yaml_format:
return str(output)
if rc != 0:
return None
data = yaml.load(output, Loader=yaml.SafeLoader)
# remove field 'notes'
try:
del data.get("info").get("status")["notes"]
except KeyError:
pass
# parse the manifest to a list of dictionaries
if "manifest" in data:
manifest_str = data.get("manifest")
manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
data["manifest"] = []
for doc in manifest_docs:
data["manifest"].append(doc)
# parse field 'resources'
try:
resources = str(data.get("info").get("status").get("resources"))
resource_table = self._output_to_table(resources)
data.get("info").get("status")["resources"] = resource_table
except Exception:
pass
# set description to lowercase (unify with helm3)
try:
data.get("info")["description"] = data.get("info").pop("Description")
except KeyError:
pass
return data
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
repo_ids = []
cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
cluster = self.db.get_one("k8sclusters", cluster_filter)
if cluster:
repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
return repo_ids
else:
raise K8sException(
"k8cluster with helm-id : {} not found".format(cluster_uuid)
)
async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
status = await self._status_kdu(
cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
)
# extract info.status.resources-> str
# format:
# ==> v1/Deployment
# NAME READY UP-TO-DATE AVAILABLE AGE
# halting-horse-mongodb 0/1 1 0 0s
# halting-petit-mongodb 1/1 1 0 0s
# blank line
resources = K8sHelmBaseConnector._get_deep(
status, ("info", "status", "resources")
)
# convert to table
resources = K8sHelmBaseConnector._output_to_table(resources)
num_lines = len(resources)
index = 0
ready = True
while index < num_lines:
try:
line1 = resources[index]
index += 1
# find '==>' in column 0
if line1[0] == "==>":
line2 = resources[index]
index += 1
# find READY in column 1
if line2[1] == "READY":
# read next lines
line3 = resources[index]
index += 1
while len(line3) > 1 and index < num_lines:
ready_value = line3[1]
parts = ready_value.split(sep="/")
current = int(parts[0])
total = int(parts[1])
if current < total:
self.log.debug("NOT READY:\n {}".format(line3))
ready = False
line3 = resources[index]
index += 1
except Exception:
pass
return ready
def _get_install_command(
self,
kdu_model,
kdu_instance,
namespace,
params_str,
version,
atomic,
timeout,
kubeconfig,
) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}".format(timeout)
# atomic
atomic_str = ""
if atomic:
atomic_str = "--atomic"
# namespace
namespace_str = ""
if namespace:
namespace_str = "--namespace {}".format(quote(namespace))
# version
version_str = ""
if version:
version_str = "--version {}".format(version)
command = (
"env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
"{params} {timeout} --name={name} {ns} {model} {ver}".format(
kubeconfig=kubeconfig,
helm=self._helm_command,
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
name=quote(kdu_instance),
ns=namespace_str,
model=quote(kdu_model),
ver=version_str,
)
)
return command
def _get_upgrade_scale_command(
self,
kdu_model: str,
kdu_instance: str,
namespace: str,
scale: int,
version: str,
atomic: bool,
replica_str: str,
timeout: float,
resource_name: str,
kubeconfig: str,
) -> str:
"""Generates the command to scale a Helm Chart release
Args:
kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
namespace (str): Namespace where this KDU instance is deployed
scale (int): Scale count
version (str): Constraint with specific version of the Chart to use
atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
The --wait flag will be set automatically if --atomic is used
replica_str (str): The key under resource_name key where the scale count is stored
timeout (float): The time, in seconds, to wait
resource_name (str): The KDU's resource to scale
kubeconfig (str): Kubeconfig file path
Returns:
str: command to scale a Helm Chart release
"""
# scale
if resource_name:
scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
else:
scale_dict = {replica_str: scale}
scale_str = self._params_to_set_option(scale_dict)
return self._get_upgrade_command(
kdu_model=kdu_model,
kdu_instance=kdu_instance,
namespace=namespace,
params_str=scale_str,
version=version,
atomic=atomic,
timeout=timeout,
kubeconfig=kubeconfig,
)
def _get_upgrade_command(
self,
kdu_model,
kdu_instance,
namespace,
params_str,
version,
atomic,
timeout,
kubeconfig,
force: bool = False,
) -> str:
"""Generates the command to upgrade a Helm Chart release
Args:
kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
namespace (str): Namespace where this KDU instance is deployed
params_str (str): Params used to upgrade the Helm Chart release
version (str): Constraint with specific version of the Chart to use
atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
The --wait flag will be set automatically if --atomic is used
timeout (float): The time, in seconds, to wait
kubeconfig (str): Kubeconfig file path
force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
Returns:
str: command to upgrade a Helm Chart release
"""
timeout_str = ""
if timeout:
timeout_str = "--timeout {}".format(timeout)
# atomic
atomic_str = ""
if atomic:
atomic_str = "--atomic"
# force
force_str = ""
if force:
force_str = "--force "
# version
version_str = ""
if version:
version_str = "--version {}".format(quote(version))
# namespace
namespace_str = ""
if namespace:
namespace_str = "--namespace {}".format(quote(namespace))
command = (
"env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
"--reuse-values {name} {model} {ver}"
).format(
kubeconfig=kubeconfig,
helm=self._helm_command,
namespace=namespace_str,
atomic=atomic_str,
force=force_str,
params=params_str,
timeout=timeout_str,
name=quote(kdu_instance),
model=quote(kdu_model),
ver=version_str,
)
return command
def _get_rollback_command(
self, kdu_instance, namespace, revision, kubeconfig
) -> str:
return "env KUBECONFIG={} {} rollback {} {} --wait".format(
kubeconfig, self._helm_command, quote(kdu_instance), revision
)
def _get_uninstall_command(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
return "env KUBECONFIG={} {} delete --purge {}".format(
kubeconfig, self._helm_command, quote(kdu_instance)
)