2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
25 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
26 from n2vc
.exceptions
import K8sException
29 class K8sHelm3Connector(K8sHelmBaseConnector
):
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
41 kubectl_command
: str = "/usr/bin/kubectl",
42 helm_command
: str = "/usr/bin/helm3",
47 Initializes helm connector for helm v3
49 :param fs: file system for kubernetes and helm configuration
50 :param db: database object to write current operation status
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
54 :param on_update_db: callback called when k8s connector updates database
58 K8sHelmBaseConnector
.__init
__(
63 kubectl_command
=kubectl_command
,
64 helm_command
=helm_command
,
65 on_update_db
=on_update_db
,
68 self
.log
.info("K8S Helm3 connector initialized")
80 namespace
: str = None,
83 """Install a helm chart
85 :param cluster_uuid str: The UUID of the cluster to install to
86 :param kdu_model str: The name or path of a bundle to install
87 :param kdu_instance: Kdu instance name
88 :param atomic bool: If set, waits until the model is active and resets
89 the cluster on failure.
90 :param timeout int: The time, in seconds, to wait for the install
92 :param params dict: Key-value pairs of instantiation parameters
93 :param kdu_name: Name of the KDU instance to be installed
94 :param namespace: K8s namespace to use for the KDU instance
96 :param kwargs: Additional parameters (None yet)
98 :return: True if successful
100 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
101 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
104 self
.fs
.sync(from_path
=cluster_id
)
107 paths
, env
= self
._init
_paths
_env
(
108 cluster_name
=cluster_id
, create_if_not_exist
=True
111 # for helm3 if namespace does not exist must create it
112 if namespace
and namespace
!= "kube-system":
113 if not await self
._namespace
_exists
(cluster_id
, namespace
):
115 await self
._create
_namespace
(cluster_id
, namespace
)
116 except Exception as e
:
117 if not await self
._namespace
_exists
(cluster_id
, namespace
):
119 "namespace {} does not exist in cluster_id {} "
120 "error message: ".format(
124 self
.log
.error(err_msg
)
125 raise K8sException(err_msg
)
127 await self
._install
_impl
(
142 self
.fs
.reverse_sync(from_path
=cluster_id
)
144 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
147 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
150 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
153 return await self
._exec
_inspect
_comand
(
154 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
158 ####################################################################################
159 ################################### P R I V A T E ##################################
160 ####################################################################################
163 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
165 Creates and returns base cluster and kube dirs and returns them.
166 Also created helm3 dirs according to new directory specification, paths are
167 returned and also environment variables that must be provided to execute commands
169 Helm 3 directory specification uses XDG categories for variable support:
170 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
171 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
172 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
174 The variables assigned for this paths are:
175 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
176 $HELM_PATH_DATA but looking and helm env the variable names are different)
177 - Cache: $HELM_CACHE_HOME
178 - Config: $HELM_CONFIG_HOME
179 - Data: $HELM_DATA_HOME
180 - helm kubeconfig: $KUBECONFIG
182 :param cluster_name: cluster_name
183 :return: Dictionary with config_paths and dictionary with helm environment variables
187 if base
.endswith("/") or base
.endswith("\\"):
190 # base dir for cluster
191 cluster_dir
= base
+ "/" + cluster_name
194 kube_dir
= cluster_dir
+ "/" + ".kube"
195 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
196 self
.log
.debug("Creating dir {}".format(kube_dir
))
197 os
.makedirs(kube_dir
)
199 helm_path_cache
= cluster_dir
+ "/.cache/helm"
200 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
201 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
202 os
.makedirs(helm_path_cache
)
204 helm_path_config
= cluster_dir
+ "/.config/helm"
205 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
206 self
.log
.debug("Creating dir {}".format(helm_path_config
))
207 os
.makedirs(helm_path_config
)
209 helm_path_data
= cluster_dir
+ "/.local/share/helm"
210 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
211 self
.log
.debug("Creating dir {}".format(helm_path_data
))
212 os
.makedirs(helm_path_data
)
214 config_filename
= kube_dir
+ "/config"
216 # 2 - Prepare dictionary with paths
218 "kube_dir": kube_dir
,
219 "kube_config": config_filename
,
220 "cluster_dir": cluster_dir
,
223 # 3 - Prepare environment variables
225 "HELM_CACHE_HOME": helm_path_cache
,
226 "HELM_CONFIG_HOME": helm_path_config
,
227 "HELM_DATA_HOME": helm_path_data
,
228 "KUBECONFIG": config_filename
,
231 for file_name
, file in paths
.items():
232 if "dir" in file_name
and not os
.path
.exists(file):
233 err_msg
= "{} dir does not exist".format(file)
234 self
.log
.error(err_msg
)
235 raise K8sException(err_msg
)
239 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
241 "checking if namespace {} exists cluster_id {}".format(
242 namespace
, cluster_id
245 namespaces
= await self
._get
_namespaces
(cluster_id
)
246 return namespace
in namespaces
if namespaces
else False
248 async def _get_namespaces(self
, cluster_id
: str):
250 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
253 paths
, env
= self
._init
_paths
_env
(
254 cluster_name
=cluster_id
, create_if_not_exist
=True
257 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
258 self
.kubectl_command
, paths
["kube_config"]
260 output
, _rc
= await self
._local
_async
_exec
(
261 command
=command
, raise_exception_on_error
=True, env
=env
264 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
265 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
266 self
.log
.debug(f
"namespaces {namespaces}")
270 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
272 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
275 paths
, env
= self
._init
_paths
_env
(
276 cluster_name
=cluster_id
, create_if_not_exist
=True
279 command
= "{} --kubeconfig={} create namespace {}".format(
280 self
.kubectl_command
, paths
["kube_config"], namespace
282 _
, _rc
= await self
._local
_async
_exec
(
283 command
=command
, raise_exception_on_error
=True, env
=env
285 self
.log
.debug(f
"namespace {namespace} created")
289 async def _get_services(self
, cluster_id
: str, kdu_instance
: str, namespace
: str):
292 paths
, env
= self
._init
_paths
_env
(
293 cluster_name
=cluster_id
, create_if_not_exist
=True
296 command1
= "{} get manifest {} --namespace={}".format(
297 self
._helm
_command
, kdu_instance
, namespace
299 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
300 output
, _rc
= await self
._local
_async
_exec
_pipe
(
301 command1
, command2
, env
=env
, raise_exception_on_error
=True
303 services
= self
._parse
_services
(output
)
307 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
309 Implements the helm version dependent cluster initialization:
310 For helm3 it creates the namespace if it is not created
312 if namespace
!= "kube-system":
313 namespaces
= await self
._get
_namespaces
(cluster_id
)
314 if namespace
not in namespaces
:
315 await self
._create
_namespace
(cluster_id
, namespace
)
317 # If default repo is not included add
318 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
319 repo_list
= await self
.repo_list(cluster_uuid
)
320 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
321 if not stable_repo
and self
._stable
_repo
_url
:
322 await self
.repo_add(cluster_uuid
, "stable", self
._stable
_repo
_url
)
324 # Returns False as no software needs to be uninstalled
327 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
328 # nothing to do to uninstall sw
331 async def _instances_list(self
, cluster_id
: str):
334 paths
, env
= self
._init
_paths
_env
(
335 cluster_name
=cluster_id
, create_if_not_exist
=True
338 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
339 output
, _rc
= await self
._local
_async
_exec
(
340 command
=command
, raise_exception_on_error
=True, env
=env
343 if output
and len(output
) > 0:
344 self
.log
.debug("instances list output: {}".format(output
))
345 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
349 def _get_inspect_command(
350 self
, inspect_command
: str, kdu_model
: str, repo_str
: str, version
: str
352 inspect_command
= "{} show {} {}{} {}".format(
353 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
, version
355 return inspect_command
357 async def _status_kdu(
361 namespace
: str = None,
362 show_error_log
: bool = False,
363 return_text
: bool = False,
367 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
371 namespace
= "kube-system"
374 paths
, env
= self
._init
_paths
_env
(
375 cluster_name
=cluster_id
, create_if_not_exist
=True
377 command
= "{} status {} --namespace={} --output yaml".format(
378 self
._helm
_command
, kdu_instance
, namespace
381 output
, rc
= await self
._local
_async
_exec
(
383 raise_exception_on_error
=True,
384 show_error_log
=show_error_log
,
394 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
396 # remove field 'notes' and manifest
398 del data
.get("info")["notes"]
403 # unable to parse 'resources' as currently it is not included in helm3
406 def _get_install_command(
419 timeout_str
= "--timeout {}s".format(timeout
)
424 atomic_str
= "--atomic"
428 namespace_str
= "--namespace {}".format(namespace
)
433 version_str
= "--version {}".format(version
)
436 "{helm} install {name} {atomic} --output yaml "
437 "{params} {timeout} {ns} {model} {ver}".format(
438 helm
=self
._helm
_command
,
450 def _get_upgrade_command(
463 timeout_str
= "--timeout {}s".format(timeout
)
468 atomic_str
= "--atomic"
473 version_str
= "--version {}".format(version
)
478 namespace_str
= "--namespace {}".format(namespace
)
481 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
482 "{timeout} {ver}".format(
483 helm
=self
._helm
_command
,
485 namespace
=namespace_str
,
495 def _get_rollback_command(
496 self
, kdu_instance
: str, namespace
: str, revision
: float
498 return "{} rollback {} {} --namespace={} --wait".format(
499 self
._helm
_command
, kdu_instance
, revision
, namespace
502 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
504 return "{} uninstall {} --namespace={}".format(
505 self
._helm
_command
, kdu_instance
, namespace
508 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
510 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
511 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
513 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
517 "k8cluster with helm-id : {} not found".format(cluster_uuid
)