2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
22 from typing
import Union
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelm3Connector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm3",
48 Initializes helm connector for helm v3
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("K8S Helm3 connector initialized")
81 namespace
: str = None,
84 """Install a helm chart
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
102 :param kwargs: Additional parameters (None yet)
104 :return: True if successful
107 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
110 self
.fs
.sync(from_path
=cluster_uuid
)
113 paths
, env
= self
._init
_paths
_env
(
114 cluster_name
=cluster_uuid
, create_if_not_exist
=True
117 # for helm3 if namespace does not exist must create it
118 if namespace
and namespace
!= "kube-system":
119 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
121 await self
._create
_namespace
(cluster_uuid
, namespace
)
122 except Exception as e
:
123 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
125 "namespace {} does not exist in cluster_id {} "
126 "error message: ".format(namespace
, e
)
128 self
.log
.error(err_msg
)
129 raise K8sException(err_msg
)
131 await self
._install
_impl
(
146 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
148 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
151 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
153 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
156 return await self
._exec
_inspect
_command
(
157 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
161 ####################################################################################
162 ################################### P R I V A T E ##################################
163 ####################################################################################
166 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
168 Creates and returns base cluster and kube dirs and returns them.
169 Also created helm3 dirs according to new directory specification, paths are
170 returned and also environment variables that must be provided to execute commands
172 Helm 3 directory specification uses XDG categories for variable support:
173 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
174 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
175 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
177 The variables assigned for this paths are:
178 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
179 $HELM_PATH_DATA but looking and helm env the variable names are different)
180 - Cache: $HELM_CACHE_HOME
181 - Config: $HELM_CONFIG_HOME
182 - Data: $HELM_DATA_HOME
183 - helm kubeconfig: $KUBECONFIG
185 :param cluster_name: cluster_name
186 :return: Dictionary with config_paths and dictionary with helm environment variables
190 if base
.endswith("/") or base
.endswith("\\"):
193 # base dir for cluster
194 cluster_dir
= base
+ "/" + cluster_name
197 kube_dir
= cluster_dir
+ "/" + ".kube"
198 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
199 self
.log
.debug("Creating dir {}".format(kube_dir
))
200 os
.makedirs(kube_dir
)
202 helm_path_cache
= cluster_dir
+ "/.cache/helm"
203 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
204 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
205 os
.makedirs(helm_path_cache
)
207 helm_path_config
= cluster_dir
+ "/.config/helm"
208 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
209 self
.log
.debug("Creating dir {}".format(helm_path_config
))
210 os
.makedirs(helm_path_config
)
212 helm_path_data
= cluster_dir
+ "/.local/share/helm"
213 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
214 self
.log
.debug("Creating dir {}".format(helm_path_data
))
215 os
.makedirs(helm_path_data
)
217 config_filename
= kube_dir
+ "/config"
219 # 2 - Prepare dictionary with paths
221 "kube_dir": kube_dir
,
222 "kube_config": config_filename
,
223 "cluster_dir": cluster_dir
,
226 # 3 - Prepare environment variables
228 "HELM_CACHE_HOME": helm_path_cache
,
229 "HELM_CONFIG_HOME": helm_path_config
,
230 "HELM_DATA_HOME": helm_path_data
,
231 "KUBECONFIG": config_filename
,
234 for file_name
, file in paths
.items():
235 if "dir" in file_name
and not os
.path
.exists(file):
236 err_msg
= "{} dir does not exist".format(file)
237 self
.log
.error(err_msg
)
238 raise K8sException(err_msg
)
242 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
244 "checking if namespace {} exists cluster_id {}".format(
245 namespace
, cluster_id
248 namespaces
= await self
._get
_namespaces
(cluster_id
)
249 return namespace
in namespaces
if namespaces
else False
251 async def _get_namespaces(self
, cluster_id
: str):
252 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
255 paths
, env
= self
._init
_paths
_env
(
256 cluster_name
=cluster_id
, create_if_not_exist
=True
259 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
260 self
.kubectl_command
, paths
["kube_config"]
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=True, env
=env
266 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
267 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
268 self
.log
.debug(f
"namespaces {namespaces}")
272 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
273 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
276 paths
, env
= self
._init
_paths
_env
(
277 cluster_name
=cluster_id
, create_if_not_exist
=True
280 command
= "{} --kubeconfig={} create namespace {}".format(
281 self
.kubectl_command
, paths
["kube_config"], namespace
283 _
, _rc
= await self
._local
_async
_exec
(
284 command
=command
, raise_exception_on_error
=True, env
=env
286 self
.log
.debug(f
"namespace {namespace} created")
290 async def _get_services(
291 self
, cluster_id
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
294 paths
, env
= self
._init
_paths
_env
(
295 cluster_name
=cluster_id
, create_if_not_exist
=True
298 command1
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
299 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
301 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
302 output
, _rc
= await self
._local
_async
_exec
_pipe
(
303 command1
, command2
, env
=env
, raise_exception_on_error
=True
305 services
= self
._parse
_services
(output
)
309 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
311 Implements the helm version dependent cluster initialization:
312 For helm3 it creates the namespace if it is not created
314 if namespace
!= "kube-system":
315 namespaces
= await self
._get
_namespaces
(cluster_id
)
316 if namespace
not in namespaces
:
317 await self
._create
_namespace
(cluster_id
, namespace
)
319 repo_list
= await self
.repo_list(cluster_id
)
320 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
321 if not stable_repo
and self
._stable
_repo
_url
:
322 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
324 # Returns False as no software needs to be uninstalled
327 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
328 # nothing to do to uninstall sw
331 async def _instances_list(self
, cluster_id
: str):
333 paths
, env
= self
._init
_paths
_env
(
334 cluster_name
=cluster_id
, create_if_not_exist
=True
337 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
338 output
, _rc
= await self
._local
_async
_exec
(
339 command
=command
, raise_exception_on_error
=True, env
=env
342 if output
and len(output
) > 0:
343 self
.log
.debug("instances list output: {}".format(output
))
344 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
348 def _get_inspect_command(
349 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
351 """Generates the command to obtain the information about an Helm Chart package
352 (´helm show ...´ command)
355 show_command: the second part of the command (`helm show <show_command>`)
356 kdu_model: The name or path of an Helm Chart
357 repo_url: Helm Chart repository url
358 version: constraint with specific version of the Chart to use
361 str: the generated Helm Chart command
364 inspect_command
= "{} show {} {}{} {}".format(
365 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
367 return inspect_command
369 def _get_get_command(
370 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
373 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
374 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
, namespace
379 async def _status_kdu(
383 namespace
: str = None,
384 yaml_format
: bool = False,
385 show_error_log
: bool = False,
386 ) -> Union
[str, dict]:
388 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
392 namespace
= "kube-system"
395 paths
, env
= self
._init
_paths
_env
(
396 cluster_name
=cluster_id
, create_if_not_exist
=True
398 command
= "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
399 paths
["kube_config"], self
._helm
_command
, kdu_instance
, namespace
402 output
, rc
= await self
._local
_async
_exec
(
404 raise_exception_on_error
=True,
405 show_error_log
=show_error_log
,
415 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
417 # remove field 'notes' and manifest
419 del data
.get("info")["notes"]
423 # parse the manifest to a list of dictionaries
424 if "manifest" in data
:
425 manifest_str
= data
.get("manifest")
426 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
428 data
["manifest"] = []
429 for doc
in manifest_docs
:
430 data
["manifest"].append(doc
)
434 def _get_install_command(
447 timeout_str
= "--timeout {}s".format(timeout
)
452 atomic_str
= "--atomic"
456 namespace_str
= "--namespace {}".format(namespace
)
461 version_str
= "--version {}".format(version
)
464 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
465 "{params} {timeout} {ns} {model} {ver}".format(
466 kubeconfig
=kubeconfig
,
467 helm
=self
._helm
_command
,
479 def _get_upgrade_scale_command(
492 """Generates the command to scale a Helm Chart release
495 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
496 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
497 namespace (str): Namespace where this KDU instance is deployed
498 scale (int): Scale count
499 version (str): Constraint with specific version of the Chart to use
500 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
501 The --wait flag will be set automatically if --atomic is used
502 replica_str (str): The key under resource_name key where the scale count is stored
503 timeout (float): The time, in seconds, to wait
504 resource_name (str): The KDU's resource to scale
505 kubeconfig (str): Kubeconfig file path
508 str: command to scale a Helm Chart release
513 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
515 scale_dict
= {replica_str
: scale
}
517 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
519 return self
._get
_upgrade
_command
(
521 kdu_instance
=kdu_instance
,
523 params_str
=scale_str
,
527 kubeconfig
=kubeconfig
,
530 def _get_upgrade_command(
542 """Generates the command to upgrade a Helm Chart release
545 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
546 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
547 namespace (str): Namespace where this KDU instance is deployed
548 params_str (str): Params used to upgrade the Helm Chart release
549 version (str): Constraint with specific version of the Chart to use
550 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
551 The --wait flag will be set automatically if --atomic is used
552 timeout (float): The time, in seconds, to wait
553 kubeconfig (str): Kubeconfig file path
554 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
556 str: command to upgrade a Helm Chart release
561 timeout_str
= "--timeout {}s".format(timeout
)
566 atomic_str
= "--atomic"
571 force_str
= "--force "
576 version_str
= "--version {}".format(version
)
581 namespace_str
= "--namespace {}".format(namespace
)
584 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
585 "--output yaml {params} {timeout} --reuse-values {ver}"
587 kubeconfig
=kubeconfig
,
588 helm
=self
._helm
_command
,
590 namespace
=namespace_str
,
600 def _get_rollback_command(
601 self
, kdu_instance
: str, namespace
: str, revision
: float, kubeconfig
: str
603 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
604 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
, namespace
607 def _get_uninstall_command(
608 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
610 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
611 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
614 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
616 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
617 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
619 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
623 "k8cluster with helm-id : {} not found".format(cluster_uuid
)