2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
25 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
26 from n2vc
.exceptions
import K8sException
29 class K8sHelm3Connector(K8sHelmBaseConnector
):
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
41 kubectl_command
: str = "/usr/bin/kubectl",
42 helm_command
: str = "/usr/bin/helm3",
47 Initializes helm connector for helm v3
49 :param fs: file system for kubernetes and helm configuration
50 :param db: database object to write current operation status
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
54 :param on_update_db: callback called when k8s connector updates database
58 K8sHelmBaseConnector
.__init
__(self
,
62 kubectl_command
=kubectl_command
,
63 helm_command
=helm_command
,
64 on_update_db
=on_update_db
)
66 self
.log
.info("K8S Helm3 connector initialized")
77 namespace
: str = None,
79 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
80 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
83 self
.fs
.sync(from_path
=cluster_id
)
86 paths
, env
= self
._init
_paths
_env
(
87 cluster_name
=cluster_id
, create_if_not_exist
=True
90 # for helm3 if namespace does not exist must create it
91 if namespace
and namespace
!= "kube-system":
92 namespaces
= await self
._get
_namespaces
(cluster_id
)
93 if namespace
not in namespaces
:
94 await self
._create
_namespace
(cluster_id
, namespace
)
96 kdu_instance
= await self
._install
_impl
(cluster_id
,
108 self
.fs
.reverse_sync(from_path
=cluster_id
)
110 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
113 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
116 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
119 return await self
._exec
_inspect
_comand
(
120 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
124 ####################################################################################
125 ################################### P R I V A T E ##################################
126 ####################################################################################
129 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
131 Creates and returns base cluster and kube dirs and returns them.
132 Also created helm3 dirs according to new directory specification, paths are
133 returned and also environment variables that must be provided to execute commands
135 Helm 3 directory specification uses XDG categories for variable support:
136 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
137 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
138 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
140 The variables assigned for this paths are:
141 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
142 $HELM_PATH_DATA but looking and helm env the variable names are different)
143 - Cache: $HELM_CACHE_HOME
144 - Config: $HELM_CONFIG_HOME
145 - Data: $HELM_DATA_HOME
146 - helm kubeconfig: $KUBECONFIG
148 :param cluster_name: cluster_name
149 :return: Dictionary with config_paths and dictionary with helm environment variables
153 if base
.endswith("/") or base
.endswith("\\"):
156 # base dir for cluster
157 cluster_dir
= base
+ "/" + cluster_name
160 kube_dir
= cluster_dir
+ "/" + ".kube"
161 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
162 self
.log
.debug("Creating dir {}".format(kube_dir
))
163 os
.makedirs(kube_dir
)
165 helm_path_cache
= cluster_dir
+ "/.cache/helm"
166 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
167 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
168 os
.makedirs(helm_path_cache
)
170 helm_path_config
= cluster_dir
+ "/.config/helm"
171 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
172 self
.log
.debug("Creating dir {}".format(helm_path_config
))
173 os
.makedirs(helm_path_config
)
175 helm_path_data
= cluster_dir
+ "/.local/share/helm"
176 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
177 self
.log
.debug("Creating dir {}".format(helm_path_data
))
178 os
.makedirs(helm_path_data
)
180 config_filename
= kube_dir
+ "/config"
182 # 2 - Prepare dictionary with paths
184 "kube_dir": kube_dir
,
185 "kube_config": config_filename
,
186 "cluster_dir": cluster_dir
189 # 3 - Prepare environment variables
191 "HELM_CACHE_HOME": helm_path_cache
,
192 "HELM_CONFIG_HOME": helm_path_config
,
193 "HELM_DATA_HOME": helm_path_data
,
194 "KUBECONFIG": config_filename
197 for file_name
, file in paths
.items():
198 if "dir" in file_name
and not os
.path
.exists(file):
199 err_msg
= "{} dir does not exist".format(file)
200 self
.log
.error(err_msg
)
201 raise K8sException(err_msg
)
205 async def _get_namespaces(self
,
208 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
211 paths
, env
= self
._init
_paths
_env
(
212 cluster_name
=cluster_id
, create_if_not_exist
=True
215 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
216 self
.kubectl_command
, paths
["kube_config"]
218 output
, _rc
= await self
._local
_async
_exec
(
219 command
=command
, raise_exception_on_error
=True, env
=env
222 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
223 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
224 self
.log
.debug(f
"namespaces {namespaces}")
228 async def _create_namespace(self
,
232 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
235 paths
, env
= self
._init
_paths
_env
(
236 cluster_name
=cluster_id
, create_if_not_exist
=True
239 command
= "{} --kubeconfig={} create namespace {}".format(
240 self
.kubectl_command
, paths
["kube_config"], namespace
242 _
, _rc
= await self
._local
_async
_exec
(
243 command
=command
, raise_exception_on_error
=True, env
=env
245 self
.log
.debug(f
"namespace {namespace} created")
249 async def _get_services(self
, cluster_id
: str, kdu_instance
: str, namespace
: str):
252 paths
, env
= self
._init
_paths
_env
(
253 cluster_name
=cluster_id
, create_if_not_exist
=True
256 command1
= "{} get manifest {} --namespace={}".format(
257 self
._helm
_command
, kdu_instance
, namespace
259 command2
= "{} get --namespace={} -f -".format(
260 self
.kubectl_command
, namespace
262 output
, _rc
= await self
._local
_async
_exec
_pipe
(
263 command1
, command2
, env
=env
, raise_exception_on_error
=True
265 services
= self
._parse
_services
(output
)
269 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
271 Implements the helm version dependent cluster initialization:
272 For helm3 it creates the namespace if it is not created
274 if namespace
!= "kube-system":
275 namespaces
= await self
._get
_namespaces
(cluster_id
)
276 if namespace
not in namespaces
:
277 await self
._create
_namespace
(cluster_id
, namespace
)
279 # If default repo is not included add
280 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
281 repo_list
= await self
.repo_list(cluster_uuid
)
282 for repo
in repo_list
:
283 self
.log
.debug("repo")
284 if repo
["name"] == "stable":
285 self
.log
.debug("Default repo already present")
288 await self
.repo_add(cluster_uuid
,
290 "https://kubernetes-charts.storage.googleapis.com/")
292 # Returns False as no software needs to be uninstalled
295 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
296 # nothing to do to uninstall sw
299 async def _instances_list(self
, cluster_id
: str):
302 paths
, env
= self
._init
_paths
_env
(
303 cluster_name
=cluster_id
, create_if_not_exist
=True
306 command
= "{} list --all-namespaces --output yaml".format(
309 output
, _rc
= await self
._local
_async
_exec
(
310 command
=command
, raise_exception_on_error
=True, env
=env
313 if output
and len(output
) > 0:
314 self
.log
.debug("instances list output: {}".format(output
))
315 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
319 def _get_inspect_command(self
, inspect_command
: str, kdu_model
: str, repo_str
: str,
321 inspect_command
= "{} show {} {}{} {}".format(
322 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
, version
324 return inspect_command
326 async def _status_kdu(
330 namespace
: str = None,
331 show_error_log
: bool = False,
332 return_text
: bool = False,
335 self
.log
.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
))
338 namespace
= "kube-system"
341 paths
, env
= self
._init
_paths
_env
(
342 cluster_name
=cluster_id
, create_if_not_exist
=True
344 command
= "{} status {} --namespace={} --output yaml".format(
345 self
._helm
_command
, kdu_instance
, namespace
348 output
, rc
= await self
._local
_async
_exec
(
350 raise_exception_on_error
=True,
351 show_error_log
=show_error_log
,
361 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
363 # remove field 'notes' and manifest
365 del data
.get("info")["notes"]
370 # unable to parse 'resources' as currently it is not included in helm3
373 def _get_install_command(self
, kdu_model
: str, kdu_instance
: str, namespace
: str,
374 params_str
: str, version
: str, atomic
: bool, timeout
: float) -> str:
378 timeout_str
= "--timeout {}s".format(timeout
)
383 atomic_str
= "--atomic"
387 namespace_str
= "--namespace {}".format(namespace
)
392 version_str
= "--version {}".format(version
)
395 "{helm} install {name} {atomic} --output yaml "
396 "{params} {timeout} {ns} {model} {ver}".format(
397 helm
=self
._helm
_command
,
409 def _get_upgrade_command(self
, kdu_model
: str, kdu_instance
: str, namespace
: str,
410 params_str
: str, version
: str, atomic
: bool, timeout
: float) -> str:
414 timeout_str
= "--timeout {}s".format(timeout
)
419 atomic_str
= "--atomic"
424 version_str
= "--version {}".format(version
)
429 namespace_str
= "--namespace {}".format(namespace
)
432 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
433 "{timeout} {ver}".format(
434 helm
=self
._helm
_command
,
436 namespace
=namespace_str
,
446 def _get_rollback_command(self
, kdu_instance
: str, namespace
: str, revision
: float) -> str:
447 return "{} rollback {} {} --namespace={} --wait".format(
448 self
._helm
_command
, kdu_instance
, revision
, namespace
451 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
453 return "{} uninstall {} --namespace={}".format(
454 self
._helm
_command
, kdu_instance
, namespace
)
456 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
458 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
459 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
461 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
465 "k8cluster with helm-id : {} not found".format(cluster_uuid
)