2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
25 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
26 from n2vc
.exceptions
import K8sException
29 class K8sHelm3Connector(K8sHelmBaseConnector
):
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
41 kubectl_command
: str = "/usr/bin/kubectl",
42 helm_command
: str = "/usr/bin/helm3",
45 vca_config
: dict = None,
48 Initializes helm connector for helm v3
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(self
,
63 kubectl_command
=kubectl_command
,
64 helm_command
=helm_command
,
65 on_update_db
=on_update_db
,
66 vca_config
=vca_config
)
68 self
.log
.info("K8S Helm3 connector initialized")
79 namespace
: str = None,
81 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
82 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
85 self
.fs
.sync(from_path
=cluster_id
)
88 paths
, env
= self
._init
_paths
_env
(
89 cluster_name
=cluster_id
, create_if_not_exist
=True
92 # for helm3 if namespace does not exist must create it
93 if namespace
and namespace
!= "kube-system":
94 namespaces
= await self
._get
_namespaces
(cluster_id
)
95 if namespace
not in namespaces
:
96 await self
._create
_namespace
(cluster_id
, namespace
)
98 kdu_instance
= await self
._install
_impl
(cluster_id
,
110 self
.fs
.reverse_sync(from_path
=cluster_id
)
112 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
115 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
118 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
121 return await self
._exec
_inspect
_comand
(
122 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
126 ####################################################################################
127 ################################### P R I V A T E ##################################
128 ####################################################################################
131 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
133 Creates and returns base cluster and kube dirs and returns them.
134 Also created helm3 dirs according to new directory specification, paths are
135 returned and also environment variables that must be provided to execute commands
137 Helm 3 directory specification uses XDG categories for variable support:
138 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
139 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
140 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
142 The variables assigned for this paths are:
143 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
144 $HELM_PATH_DATA but looking and helm env the variable names are different)
145 - Cache: $HELM_CACHE_HOME
146 - Config: $HELM_CONFIG_HOME
147 - Data: $HELM_DATA_HOME
148 - helm kubeconfig: $KUBECONFIG
150 :param cluster_name: cluster_name
151 :return: Dictionary with config_paths and dictionary with helm environment variables
155 if base
.endswith("/") or base
.endswith("\\"):
158 # base dir for cluster
159 cluster_dir
= base
+ "/" + cluster_name
162 kube_dir
= cluster_dir
+ "/" + ".kube"
163 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
164 self
.log
.debug("Creating dir {}".format(kube_dir
))
165 os
.makedirs(kube_dir
)
167 helm_path_cache
= cluster_dir
+ "/.cache/helm"
168 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
169 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
170 os
.makedirs(helm_path_cache
)
172 helm_path_config
= cluster_dir
+ "/.config/helm"
173 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
174 self
.log
.debug("Creating dir {}".format(helm_path_config
))
175 os
.makedirs(helm_path_config
)
177 helm_path_data
= cluster_dir
+ "/.local/share/helm"
178 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
179 self
.log
.debug("Creating dir {}".format(helm_path_data
))
180 os
.makedirs(helm_path_data
)
182 config_filename
= kube_dir
+ "/config"
184 # 2 - Prepare dictionary with paths
186 "kube_dir": kube_dir
,
187 "kube_config": config_filename
,
188 "cluster_dir": cluster_dir
191 # 3 - Prepare environment variables
193 "HELM_CACHE_HOME": helm_path_cache
,
194 "HELM_CONFIG_HOME": helm_path_config
,
195 "HELM_DATA_HOME": helm_path_data
,
196 "KUBECONFIG": config_filename
199 for file_name
, file in paths
.items():
200 if "dir" in file_name
and not os
.path
.exists(file):
201 err_msg
= "{} dir does not exist".format(file)
202 self
.log
.error(err_msg
)
203 raise K8sException(err_msg
)
207 async def _get_namespaces(self
,
210 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
213 paths
, env
= self
._init
_paths
_env
(
214 cluster_name
=cluster_id
, create_if_not_exist
=True
217 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
218 self
.kubectl_command
, paths
["kube_config"]
220 output
, _rc
= await self
._local
_async
_exec
(
221 command
=command
, raise_exception_on_error
=True, env
=env
224 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
225 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
226 self
.log
.debug(f
"namespaces {namespaces}")
230 async def _create_namespace(self
,
234 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
237 paths
, env
= self
._init
_paths
_env
(
238 cluster_name
=cluster_id
, create_if_not_exist
=True
241 command
= "{} --kubeconfig={} create namespace {}".format(
242 self
.kubectl_command
, paths
["kube_config"], namespace
244 _
, _rc
= await self
._local
_async
_exec
(
245 command
=command
, raise_exception_on_error
=True, env
=env
247 self
.log
.debug(f
"namespace {namespace} created")
251 async def _get_services(self
, cluster_id
: str, kdu_instance
: str, namespace
: str):
254 paths
, env
= self
._init
_paths
_env
(
255 cluster_name
=cluster_id
, create_if_not_exist
=True
258 command1
= "{} get manifest {} --namespace={}".format(
259 self
._helm
_command
, kdu_instance
, namespace
261 command2
= "{} get --namespace={} -f -".format(
262 self
.kubectl_command
, namespace
264 output
, _rc
= await self
._local
_async
_exec
_pipe
(
265 command1
, command2
, env
=env
, raise_exception_on_error
=True
267 services
= self
._parse
_services
(output
)
271 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
273 Implements the helm version dependent cluster initialization:
274 For helm3 it creates the namespace if it is not created
276 if namespace
!= "kube-system":
277 namespaces
= await self
._get
_namespaces
(cluster_id
)
278 if namespace
not in namespaces
:
279 await self
._create
_namespace
(cluster_id
, namespace
)
281 # If default repo is not included add
282 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
283 repo_list
= await self
.repo_list(cluster_uuid
)
284 for repo
in repo_list
:
285 self
.log
.debug("repo")
286 if repo
["name"] == "stable":
287 self
.log
.debug("Default repo already present")
290 await self
.repo_add(cluster_uuid
,
292 self
._stable
_repo
_url
)
294 # Returns False as no software needs to be uninstalled
297 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
298 # nothing to do to uninstall sw
301 async def _instances_list(self
, cluster_id
: str):
304 paths
, env
= self
._init
_paths
_env
(
305 cluster_name
=cluster_id
, create_if_not_exist
=True
308 command
= "{} list --all-namespaces --output yaml".format(
311 output
, _rc
= await self
._local
_async
_exec
(
312 command
=command
, raise_exception_on_error
=True, env
=env
315 if output
and len(output
) > 0:
316 self
.log
.debug("instances list output: {}".format(output
))
317 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
321 def _get_inspect_command(self
, inspect_command
: str, kdu_model
: str, repo_str
: str,
323 inspect_command
= "{} show {} {}{} {}".format(
324 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
, version
326 return inspect_command
328 async def _status_kdu(
332 namespace
: str = None,
333 show_error_log
: bool = False,
334 return_text
: bool = False,
337 self
.log
.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
))
340 namespace
= "kube-system"
343 paths
, env
= self
._init
_paths
_env
(
344 cluster_name
=cluster_id
, create_if_not_exist
=True
346 command
= "{} status {} --namespace={} --output yaml".format(
347 self
._helm
_command
, kdu_instance
, namespace
350 output
, rc
= await self
._local
_async
_exec
(
352 raise_exception_on_error
=True,
353 show_error_log
=show_error_log
,
363 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
365 # remove field 'notes' and manifest
367 del data
.get("info")["notes"]
372 # unable to parse 'resources' as currently it is not included in helm3
375 def _get_install_command(self
, kdu_model
: str, kdu_instance
: str, namespace
: str,
376 params_str
: str, version
: str, atomic
: bool, timeout
: float) -> str:
380 timeout_str
= "--timeout {}s".format(timeout
)
385 atomic_str
= "--atomic"
389 namespace_str
= "--namespace {}".format(namespace
)
394 version_str
= "--version {}".format(version
)
397 "{helm} install {name} {atomic} --output yaml "
398 "{params} {timeout} {ns} {model} {ver}".format(
399 helm
=self
._helm
_command
,
411 def _get_upgrade_command(self
, kdu_model
: str, kdu_instance
: str, namespace
: str,
412 params_str
: str, version
: str, atomic
: bool, timeout
: float) -> str:
416 timeout_str
= "--timeout {}s".format(timeout
)
421 atomic_str
= "--atomic"
426 version_str
= "--version {}".format(version
)
431 namespace_str
= "--namespace {}".format(namespace
)
434 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
435 "{timeout} {ver}".format(
436 helm
=self
._helm
_command
,
438 namespace
=namespace_str
,
448 def _get_rollback_command(self
, kdu_instance
: str, namespace
: str, revision
: float) -> str:
449 return "{} rollback {} {} --namespace={} --wait".format(
450 self
._helm
_command
, kdu_instance
, revision
, namespace
453 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
455 return "{} uninstall {} --namespace={}".format(
456 self
._helm
_command
, kdu_instance
, namespace
)
458 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
460 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
461 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
463 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
467 "k8cluster with helm-id : {} not found".format(cluster_uuid
)