blob: ad230b5f36a7ee859dd8d7e8c77c41664766fd2b [file] [log] [blame]
##
# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
# This file is part of OSM
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact with: nfvlabs@tid.es
##
import asyncio
import os
import yaml
from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
from n2vc.exceptions import K8sException
class K8sHelmConnector(K8sHelmBaseConnector):
"""
####################################################################################
################################### P U B L I C ####################################
####################################################################################
"""
def __init__(
self,
fs: object,
db: object,
kubectl_command: str = "/usr/bin/kubectl",
helm_command: str = "/usr/bin/helm",
log: object = None,
on_update_db=None,
vca_config: dict = None,
):
"""
Initializes helm connector for helm v2
:param fs: file system for kubernetes and helm configuration
:param db: database object to write current operation status
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
:param log: logger
:param on_update_db: callback called when k8s connector updates database
"""
# parent class
K8sHelmBaseConnector.__init__(
self,
db=db,
log=log,
fs=fs,
kubectl_command=kubectl_command,
helm_command=helm_command,
on_update_db=on_update_db,
vca_config=vca_config,
)
self.log.info("Initializing K8S Helm2 connector")
# initialize helm client-only
self.log.debug("Initializing helm client-only...")
command = "{} init --client-only --stable-repo-url {} ".format(
self._helm_command, self._stable_repo_url)
try:
asyncio.ensure_future(
self._local_async_exec(command=command, raise_exception_on_error=False)
)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(self._local_async_exec(command=command,
# raise_exception_on_error=False))
except Exception as e:
self.warning(
msg="helm init failed (it was already initialized): {}".format(e)
)
self.log.info("K8S Helm2 connector initialized")
async def install(
self,
cluster_uuid: str,
kdu_model: str,
kdu_instance: str,
atomic: bool = True,
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
):
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
# sync local dir
self.fs.sync(from_path=cluster_id)
# init env, paths
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
await self._install_impl(
cluster_id,
kdu_model,
paths,
env,
kdu_instance,
atomic=atomic,
timeout=timeout,
params=params,
db_dict=db_dict,
kdu_name=kdu_name,
namespace=namespace,
)
# sync fs
self.fs.reverse_sync(from_path=cluster_id)
self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return await self._exec_inspect_comand(
inspect_command="", kdu_model=kdu_model, repo_url=repo_url
)
"""
####################################################################################
################################### P R I V A T E ##################################
####################################################################################
"""
def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
"""
Creates and returns base cluster and kube dirs and returns them.
Also created helm3 dirs according to new directory specification, paths are
returned and also environment variables that must be provided to execute commands
Helm 2 directory specification uses helm_home dir:
The variables assigned for this paths are:
- Helm hone: $HELM_HOME
- helm kubeconfig: $KUBECONFIG
:param cluster_name: cluster_name
:return: Dictionary with config_paths and dictionary with helm environment variables
"""
base = self.fs.path
if base.endswith("/") or base.endswith("\\"):
base = base[:-1]
# base dir for cluster
cluster_dir = base + "/" + cluster_name
# kube dir
kube_dir = cluster_dir + "/" + ".kube"
if create_if_not_exist and not os.path.exists(kube_dir):
self.log.debug("Creating dir {}".format(kube_dir))
os.makedirs(kube_dir)
# helm home dir
helm_dir = cluster_dir + "/" + ".helm"
if create_if_not_exist and not os.path.exists(helm_dir):
self.log.debug("Creating dir {}".format(helm_dir))
os.makedirs(helm_dir)
config_filename = kube_dir + "/config"
# 2 - Prepare dictionary with paths
paths = {
"kube_dir": kube_dir,
"kube_config": config_filename,
"cluster_dir": cluster_dir,
"helm_dir": helm_dir,
}
for file_name, file in paths.items():
if "dir" in file_name and not os.path.exists(file):
err_msg = "{} dir does not exist".format(file)
self.log.error(err_msg)
raise K8sException(err_msg)
# 3 - Prepare environment variables
env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
return paths, env
async def _get_services(self, cluster_id, kdu_instance, namespace):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
)
services = self._parse_services(output)
return services
async def _cluster_init(self, cluster_id: str, namespace: str,
paths: dict, env: dict):
"""
Implements the helm version dependent cluster initialization:
For helm2 it initialized tiller environment if needed
"""
# check if tiller pod is up in cluster
command = "{} --kubeconfig={} --namespace={} get deployments".format(
self.kubectl_command, paths["kube_config"], namespace
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
output_table = self._output_to_table(output=output)
# find 'tiller' pod in all pods
already_initialized = False
try:
for row in output_table:
if row[0].startswith("tiller-deploy"):
already_initialized = True
break
except Exception:
pass
# helm init
n2vc_installed_sw = False
if not already_initialized:
self.log.info(
"Initializing helm in client and server: {}".format(cluster_id)
)
command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
self.kubectl_command, paths["kube_config"], self.service_account
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
command = (
"{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
"--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
).format(self.kubectl_command, paths["kube_config"], self.service_account)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
command = (
"{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
" --stable-repo-url {} init"
).format(
self._helm_command,
paths["kube_config"],
namespace,
paths["helm_dir"],
self.service_account,
self._stable_repo_url
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
n2vc_installed_sw = True
else:
# check client helm installation
check_file = paths["helm_dir"] + "/repository/repositories.yaml"
if not self._check_file_exists(
filename=check_file, exception_if_not_exists=False
):
self.log.info("Initializing helm in client: {}".format(cluster_id))
command = (
"{} --kubeconfig={} --tiller-namespace={} "
"--home={} init --client-only --stable-repo-url {} "
).format(
self._helm_command,
paths["kube_config"],
namespace,
paths["helm_dir"],
self._stable_repo_url,
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
else:
self.log.info("Helm client already initialized")
# remove old stable repo and add new one
cluster_uuid = "{}:{}".format(namespace, cluster_id)
repo_list = await self.repo_list(cluster_uuid)
for repo in repo_list:
if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
self.log.debug("Add new stable repo url: {}")
await self.repo_remove(cluster_uuid,
"stable")
await self.repo_add(cluster_uuid,
"stable",
self._stable_repo_url)
break
return n2vc_installed_sw
async def _uninstall_sw(self, cluster_id: str, namespace: str):
# uninstall Tiller if necessary
self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
if not namespace:
# find namespace for tiller pod
command = "{} --kubeconfig={} get deployments --all-namespaces".format(
self.kubectl_command, paths["kube_config"]
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
output_table = self._output_to_table(output=output)
namespace = None
for r in output_table:
try:
if "tiller-deploy" in r[1]:
namespace = r[0]
break
except Exception:
pass
else:
msg = "Tiller deployment not found in cluster {}".format(cluster_id)
self.log.error(msg)
self.log.debug("namespace for tiller: {}".format(namespace))
if namespace:
# uninstall tiller from cluster
self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
command = "{} --kubeconfig={} --home={} reset".format(
self._helm_command, paths["kube_config"], paths["helm_dir"]
)
self.log.debug("resetting: {}".format(command))
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
# Delete clusterrolebinding and serviceaccount.
# Ignore if errors for backward compatibility
command = (
"{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
"io/osm-tiller-cluster-rule"
).format(self.kubectl_command, paths["kube_config"])
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
self.kubectl_command, paths["kube_config"], self.service_account
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
else:
self.log.debug("namespace not found")
async def _instances_list(self, cluster_id):
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} list --output yaml".format(self._helm_command)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
if output and len(output) > 0:
# parse yaml and update keys to lower case to unify with helm3
instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
new_instances = []
for instance in instances:
new_instance = dict((k.lower(), v) for k, v in instance.items())
new_instances.append(new_instance)
return new_instances
else:
return []
def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
version: str):
inspect_command = "{} inspect {} {}{} {}".format(
self._helm_command, show_command, kdu_model, repo_str, version
)
return inspect_command
async def _status_kdu(
self,
cluster_id: str,
kdu_instance: str,
namespace: str = None,
show_error_log: bool = False,
return_text: bool = False,
):
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
output, rc = await self._local_async_exec(
command=command,
raise_exception_on_error=True,
show_error_log=show_error_log,
env=env,
)
if return_text:
return str(output)
if rc != 0:
return None
data = yaml.load(output, Loader=yaml.SafeLoader)
# remove field 'notes'
try:
del data.get("info").get("status")["notes"]
except KeyError:
pass
# parse field 'resources'
try:
resources = str(data.get("info").get("status").get("resources"))
resource_table = self._output_to_table(resources)
data.get("info").get("status")["resources"] = resource_table
except Exception:
pass
# set description to lowercase (unify with helm3)
try:
data.get("info")["description"] = data.get("info").pop("Description")
except KeyError:
pass
return data
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
repo_ids = []
cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
cluster = self.db.get_one("k8sclusters", cluster_filter)
if cluster:
repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
return repo_ids
else:
raise K8sException(
"k8cluster with helm-id : {} not found".format(cluster_uuid)
)
async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
status = await self._status_kdu(
cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
)
# extract info.status.resources-> str
# format:
# ==> v1/Deployment
# NAME READY UP-TO-DATE AVAILABLE AGE
# halting-horse-mongodb 0/1 1 0 0s
# halting-petit-mongodb 1/1 1 0 0s
# blank line
resources = K8sHelmBaseConnector._get_deep(
status, ("info", "status", "resources")
)
# convert to table
resources = K8sHelmBaseConnector._output_to_table(resources)
num_lines = len(resources)
index = 0
ready = True
while index < num_lines:
try:
line1 = resources[index]
index += 1
# find '==>' in column 0
if line1[0] == "==>":
line2 = resources[index]
index += 1
# find READY in column 1
if line2[1] == "READY":
# read next lines
line3 = resources[index]
index += 1
while len(line3) > 1 and index < num_lines:
ready_value = line3[1]
parts = ready_value.split(sep="/")
current = int(parts[0])
total = int(parts[1])
if current < total:
self.log.debug("NOT READY:\n {}".format(line3))
ready = False
line3 = resources[index]
index += 1
except Exception:
pass
return ready
def _get_install_command(
self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}".format(timeout)
# atomic
atomic_str = ""
if atomic:
atomic_str = "--atomic"
# namespace
namespace_str = ""
if namespace:
namespace_str = "--namespace {}".format(namespace)
# version
version_str = ""
if version:
version_str = version_str = "--version {}".format(version)
command = (
"{helm} install {atomic} --output yaml "
"{params} {timeout} --name={name} {ns} {model} {ver}".format(
helm=self._helm_command,
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
name=kdu_instance,
ns=namespace_str,
model=kdu_model,
ver=version_str,
)
)
return command
def _get_upgrade_command(
self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}".format(timeout)
# atomic
atomic_str = ""
if atomic:
atomic_str = "--atomic"
# version
version_str = ""
if version:
version_str = "--version {}".format(version)
command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
.format(helm=self._helm_command,
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
name=kdu_instance,
model=kdu_model,
ver=version_str
)
return command
def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
return "{} rollback {} {} --wait".format(
self._helm_command, kdu_instance, revision
)
def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
return "{} delete --purge {}".format(self._helm_command, kdu_instance)