d828da27e15963c29cee21eb15c74d8c9947c7fa
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
22 from typing
import Union
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelm3Connector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm3",
48 Initializes helm connector for helm v3
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("K8S Helm3 connector initialized")
81 namespace
: str = None,
84 """Install a helm chart
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
102 :param kwargs: Additional parameters (None yet)
104 :return: True if successful
107 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
110 self
.fs
.sync(from_path
=cluster_uuid
)
113 paths
, env
= self
._init
_paths
_env
(
114 cluster_name
=cluster_uuid
, create_if_not_exist
=True
117 # for helm3 if namespace does not exist must create it
118 if namespace
and namespace
!= "kube-system":
119 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
121 await self
._create
_namespace
(cluster_uuid
, namespace
)
122 except Exception as e
:
123 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
125 "namespace {} does not exist in cluster_id {} "
126 "error message: ".format(namespace
, e
)
128 self
.log
.error(err_msg
)
129 raise K8sException(err_msg
)
131 await self
._install
_impl
(
146 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
148 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
151 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
157 return await self
._exec
_inspect
_command
(
158 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
167 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
191 if base
.endswith("/") or base
.endswith("\\"):
194 # base dir for cluster
195 cluster_dir
= base
+ "/" + cluster_name
198 kube_dir
= cluster_dir
+ "/" + ".kube"
199 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
200 self
.log
.debug("Creating dir {}".format(kube_dir
))
201 os
.makedirs(kube_dir
)
203 helm_path_cache
= cluster_dir
+ "/.cache/helm"
204 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
205 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
206 os
.makedirs(helm_path_cache
)
208 helm_path_config
= cluster_dir
+ "/.config/helm"
209 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
210 self
.log
.debug("Creating dir {}".format(helm_path_config
))
211 os
.makedirs(helm_path_config
)
213 helm_path_data
= cluster_dir
+ "/.local/share/helm"
214 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
215 self
.log
.debug("Creating dir {}".format(helm_path_data
))
216 os
.makedirs(helm_path_data
)
218 config_filename
= kube_dir
+ "/config"
220 # 2 - Prepare dictionary with paths
222 "kube_dir": kube_dir
,
223 "kube_config": config_filename
,
224 "cluster_dir": cluster_dir
,
227 # 3 - Prepare environment variables
229 "HELM_CACHE_HOME": helm_path_cache
,
230 "HELM_CONFIG_HOME": helm_path_config
,
231 "HELM_DATA_HOME": helm_path_data
,
232 "KUBECONFIG": config_filename
,
235 for file_name
, file in paths
.items():
236 if "dir" in file_name
and not os
.path
.exists(file):
237 err_msg
= "{} dir does not exist".format(file)
238 self
.log
.error(err_msg
)
239 raise K8sException(err_msg
)
243 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace
, cluster_id
249 namespaces
= await self
._get
_namespaces
(cluster_id
)
250 return namespace
in namespaces
if namespaces
else False
252 async def _get_namespaces(self
, cluster_id
: str):
254 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
257 paths
, env
= self
._init
_paths
_env
(
258 cluster_name
=cluster_id
, create_if_not_exist
=True
261 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
262 self
.kubectl_command
, paths
["kube_config"]
264 output
, _rc
= await self
._local
_async
_exec
(
265 command
=command
, raise_exception_on_error
=True, env
=env
268 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
269 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
270 self
.log
.debug(f
"namespaces {namespaces}")
274 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
276 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
279 paths
, env
= self
._init
_paths
_env
(
280 cluster_name
=cluster_id
, create_if_not_exist
=True
283 command
= "{} --kubeconfig={} create namespace {}".format(
284 self
.kubectl_command
, paths
["kube_config"], namespace
286 _
, _rc
= await self
._local
_async
_exec
(
287 command
=command
, raise_exception_on_error
=True, env
=env
289 self
.log
.debug(f
"namespace {namespace} created")
293 async def _get_services(
294 self
, cluster_id
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
298 paths
, env
= self
._init
_paths
_env
(
299 cluster_name
=cluster_id
, create_if_not_exist
=True
302 command1
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
303 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
305 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
306 output
, _rc
= await self
._local
_async
_exec
_pipe
(
307 command1
, command2
, env
=env
, raise_exception_on_error
=True
309 services
= self
._parse
_services
(output
)
313 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
315 Implements the helm version dependent cluster initialization:
316 For helm3 it creates the namespace if it is not created
318 if namespace
!= "kube-system":
319 namespaces
= await self
._get
_namespaces
(cluster_id
)
320 if namespace
not in namespaces
:
321 await self
._create
_namespace
(cluster_id
, namespace
)
323 repo_list
= await self
.repo_list(cluster_id
)
324 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
325 if not stable_repo
and self
._stable
_repo
_url
:
326 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
328 # Returns False as no software needs to be uninstalled
331 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
332 # nothing to do to uninstall sw
335 async def _instances_list(self
, cluster_id
: str):
338 paths
, env
= self
._init
_paths
_env
(
339 cluster_name
=cluster_id
, create_if_not_exist
=True
342 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
343 output
, _rc
= await self
._local
_async
_exec
(
344 command
=command
, raise_exception_on_error
=True, env
=env
347 if output
and len(output
) > 0:
348 self
.log
.debug("instances list output: {}".format(output
))
349 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
353 def _get_inspect_command(
354 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
356 """Generates the command to obtain the information about an Helm Chart package
357 (´helm show ...´ command)
360 show_command: the second part of the command (`helm show <show_command>`)
361 kdu_model: The name or path of an Helm Chart
362 repo_url: Helm Chart repository url
363 version: constraint with specific version of the Chart to use
366 str: the generated Helm Chart command
369 inspect_command
= "{} show {} {}{} {}".format(
370 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
372 return inspect_command
374 def _get_get_command(
375 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
378 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
379 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
, namespace
384 async def _status_kdu(
388 namespace
: str = None,
389 yaml_format
: bool = False,
390 show_error_log
: bool = False,
391 ) -> Union
[str, dict]:
394 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
398 namespace
= "kube-system"
401 paths
, env
= self
._init
_paths
_env
(
402 cluster_name
=cluster_id
, create_if_not_exist
=True
404 command
= "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
405 paths
["kube_config"], self
._helm
_command
, kdu_instance
, namespace
408 output
, rc
= await self
._local
_async
_exec
(
410 raise_exception_on_error
=True,
411 show_error_log
=show_error_log
,
421 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
423 # remove field 'notes' and manifest
425 del data
.get("info")["notes"]
429 # parse the manifest to a list of dictionaries
430 if "manifest" in data
:
431 manifest_str
= data
.get("manifest")
432 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
434 data
["manifest"] = []
435 for doc
in manifest_docs
:
436 data
["manifest"].append(doc
)
440 def _get_install_command(
454 timeout_str
= "--timeout {}s".format(timeout
)
459 atomic_str
= "--atomic"
463 namespace_str
= "--namespace {}".format(namespace
)
468 version_str
= "--version {}".format(version
)
471 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
472 "{params} {timeout} {ns} {model} {ver}".format(
473 kubeconfig
=kubeconfig
,
474 helm
=self
._helm
_command
,
486 def _get_upgrade_scale_command(
499 """Generates the command to scale a Helm Chart release
502 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
503 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
504 namespace (str): Namespace where this KDU instance is deployed
505 scale (int): Scale count
506 version (str): Constraint with specific version of the Chart to use
507 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
508 The --wait flag will be set automatically if --atomic is used
509 replica_str (str): The key under resource_name key where the scale count is stored
510 timeout (float): The time, in seconds, to wait
511 resource_name (str): The KDU's resource to scale
512 kubeconfig (str): Kubeconfig file path
515 str: command to scale a Helm Chart release
520 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
522 scale_dict
= {replica_str
: scale
}
524 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
526 return self
._get
_upgrade
_command
(
528 kdu_instance
=kdu_instance
,
530 params_str
=scale_str
,
534 kubeconfig
=kubeconfig
,
537 def _get_upgrade_command(
548 """Generates the command to upgrade a Helm Chart release
551 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
552 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
553 namespace (str): Namespace where this KDU instance is deployed
554 params_str (str): Params used to upgrade the Helm Chart release
555 version (str): Constraint with specific version of the Chart to use
556 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
557 The --wait flag will be set automatically if --atomic is used
558 timeout (float): The time, in seconds, to wait
559 kubeconfig (str): Kubeconfig file path
562 str: command to upgrade a Helm Chart release
567 timeout_str
= "--timeout {}s".format(timeout
)
572 atomic_str
= "--atomic"
577 version_str
= "--version {}".format(version
)
582 namespace_str
= "--namespace {}".format(namespace
)
585 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
586 "--output yaml {params} {timeout} --reuse-values {ver}"
588 kubeconfig
=kubeconfig
,
589 helm
=self
._helm
_command
,
591 namespace
=namespace_str
,
600 def _get_rollback_command(
601 self
, kdu_instance
: str, namespace
: str, revision
: float, kubeconfig
: str
603 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
604 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
, namespace
607 def _get_uninstall_command(
608 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
611 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
612 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
615 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
617 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
618 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
620 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
624 "k8cluster with helm-id : {} not found".format(cluster_uuid
)