8bfd1738cc3f06b3f3e85d500f0fe630e1d4d98c
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
25 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
26 from n2vc
.exceptions
import K8sException
29 class K8sHelm3Connector(K8sHelmBaseConnector
):
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
41 kubectl_command
: str = "/usr/bin/kubectl",
42 helm_command
: str = "/usr/bin/helm3",
47 Initializes helm connector for helm v3
49 :param fs: file system for kubernetes and helm configuration
50 :param db: database object to write current operation status
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
54 :param on_update_db: callback called when k8s connector updates database
58 K8sHelmBaseConnector
.__init
__(
63 kubectl_command
=kubectl_command
,
64 helm_command
=helm_command
,
65 on_update_db
=on_update_db
,
68 self
.log
.info("K8S Helm3 connector initialized")
80 namespace
: str = None,
83 """Install a helm chart
85 :param cluster_uuid str: The UUID of the cluster to install to
86 :param kdu_model str: The name or path of a bundle to install
87 :param kdu_instance: Kdu instance name
88 :param atomic bool: If set, waits until the model is active and resets
89 the cluster on failure.
90 :param timeout int: The time, in seconds, to wait for the install
92 :param params dict: Key-value pairs of instantiation parameters
93 :param kdu_name: Name of the KDU instance to be installed
94 :param namespace: K8s namespace to use for the KDU instance
96 :param kwargs: Additional parameters (None yet)
98 :return: True if successful
101 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
104 self
.fs
.sync(from_path
=cluster_uuid
)
107 paths
, env
= self
._init
_paths
_env
(
108 cluster_name
=cluster_uuid
, create_if_not_exist
=True
111 # for helm3 if namespace does not exist must create it
112 if namespace
and namespace
!= "kube-system":
113 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
115 await self
._create
_namespace
(cluster_uuid
, namespace
)
116 except Exception as e
:
117 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
119 "namespace {} does not exist in cluster_id {} "
120 "error message: ".format(namespace
, e
)
122 self
.log
.error(err_msg
)
123 raise K8sException(err_msg
)
125 await self
._install
_impl
(
140 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
142 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
145 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
148 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
151 return await self
._exec
_inspect
_command
(
152 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
156 ####################################################################################
157 ################################### P R I V A T E ##################################
158 ####################################################################################
161 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
163 Creates and returns base cluster and kube dirs and returns them.
164 Also created helm3 dirs according to new directory specification, paths are
165 returned and also environment variables that must be provided to execute commands
167 Helm 3 directory specification uses XDG categories for variable support:
168 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
169 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
170 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
172 The variables assigned for this paths are:
173 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
174 $HELM_PATH_DATA but looking and helm env the variable names are different)
175 - Cache: $HELM_CACHE_HOME
176 - Config: $HELM_CONFIG_HOME
177 - Data: $HELM_DATA_HOME
178 - helm kubeconfig: $KUBECONFIG
180 :param cluster_name: cluster_name
181 :return: Dictionary with config_paths and dictionary with helm environment variables
185 if base
.endswith("/") or base
.endswith("\\"):
188 # base dir for cluster
189 cluster_dir
= base
+ "/" + cluster_name
192 kube_dir
= cluster_dir
+ "/" + ".kube"
193 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
194 self
.log
.debug("Creating dir {}".format(kube_dir
))
195 os
.makedirs(kube_dir
)
197 helm_path_cache
= cluster_dir
+ "/.cache/helm"
198 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
199 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
200 os
.makedirs(helm_path_cache
)
202 helm_path_config
= cluster_dir
+ "/.config/helm"
203 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
204 self
.log
.debug("Creating dir {}".format(helm_path_config
))
205 os
.makedirs(helm_path_config
)
207 helm_path_data
= cluster_dir
+ "/.local/share/helm"
208 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
209 self
.log
.debug("Creating dir {}".format(helm_path_data
))
210 os
.makedirs(helm_path_data
)
212 config_filename
= kube_dir
+ "/config"
214 # 2 - Prepare dictionary with paths
216 "kube_dir": kube_dir
,
217 "kube_config": config_filename
,
218 "cluster_dir": cluster_dir
,
221 # 3 - Prepare environment variables
223 "HELM_CACHE_HOME": helm_path_cache
,
224 "HELM_CONFIG_HOME": helm_path_config
,
225 "HELM_DATA_HOME": helm_path_data
,
226 "KUBECONFIG": config_filename
,
229 for file_name
, file in paths
.items():
230 if "dir" in file_name
and not os
.path
.exists(file):
231 err_msg
= "{} dir does not exist".format(file)
232 self
.log
.error(err_msg
)
233 raise K8sException(err_msg
)
237 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
239 "checking if namespace {} exists cluster_id {}".format(
240 namespace
, cluster_id
243 namespaces
= await self
._get
_namespaces
(cluster_id
)
244 return namespace
in namespaces
if namespaces
else False
246 async def _get_namespaces(self
, cluster_id
: str):
248 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
251 paths
, env
= self
._init
_paths
_env
(
252 cluster_name
=cluster_id
, create_if_not_exist
=True
255 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
256 self
.kubectl_command
, paths
["kube_config"]
258 output
, _rc
= await self
._local
_async
_exec
(
259 command
=command
, raise_exception_on_error
=True, env
=env
262 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
263 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
264 self
.log
.debug(f
"namespaces {namespaces}")
268 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
270 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
273 paths
, env
= self
._init
_paths
_env
(
274 cluster_name
=cluster_id
, create_if_not_exist
=True
277 command
= "{} --kubeconfig={} create namespace {}".format(
278 self
.kubectl_command
, paths
["kube_config"], namespace
280 _
, _rc
= await self
._local
_async
_exec
(
281 command
=command
, raise_exception_on_error
=True, env
=env
283 self
.log
.debug(f
"namespace {namespace} created")
287 async def _get_services(
288 self
, cluster_id
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
292 paths
, env
= self
._init
_paths
_env
(
293 cluster_name
=cluster_id
, create_if_not_exist
=True
296 command1
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
297 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
299 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
300 output
, _rc
= await self
._local
_async
_exec
_pipe
(
301 command1
, command2
, env
=env
, raise_exception_on_error
=True
303 services
= self
._parse
_services
(output
)
307 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
309 Implements the helm version dependent cluster initialization:
310 For helm3 it creates the namespace if it is not created
312 if namespace
!= "kube-system":
313 namespaces
= await self
._get
_namespaces
(cluster_id
)
314 if namespace
not in namespaces
:
315 await self
._create
_namespace
(cluster_id
, namespace
)
317 repo_list
= await self
.repo_list(cluster_id
)
318 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
319 if not stable_repo
and self
._stable
_repo
_url
:
320 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
322 # Returns False as no software needs to be uninstalled
325 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
326 # nothing to do to uninstall sw
329 async def _instances_list(self
, cluster_id
: str):
332 paths
, env
= self
._init
_paths
_env
(
333 cluster_name
=cluster_id
, create_if_not_exist
=True
336 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
337 output
, _rc
= await self
._local
_async
_exec
(
338 command
=command
, raise_exception_on_error
=True, env
=env
341 if output
and len(output
) > 0:
342 self
.log
.debug("instances list output: {}".format(output
))
343 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
347 def _get_inspect_command(
348 self
, inspect_command
: str, kdu_model
: str, repo_str
: str, version
: str
350 inspect_command
= "{} show {} {}{} {}".format(
351 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
, version
353 return inspect_command
355 def _get_get_command(
356 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
359 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
360 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
, namespace
365 async def _status_kdu(
369 namespace
: str = None,
370 show_error_log
: bool = False,
371 return_text
: bool = False,
375 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
379 namespace
= "kube-system"
382 paths
, env
= self
._init
_paths
_env
(
383 cluster_name
=cluster_id
, create_if_not_exist
=True
385 command
= "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
386 paths
["kube_config"], self
._helm
_command
, kdu_instance
, namespace
389 output
, rc
= await self
._local
_async
_exec
(
391 raise_exception_on_error
=True,
392 show_error_log
=show_error_log
,
402 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
404 # remove field 'notes' and manifest
406 del data
.get("info")["notes"]
410 # parse the manifest to a list of dictionaries
411 if "manifest" in data
:
412 manifest_str
= data
.get("manifest")
413 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
415 data
["manifest"] = []
416 for doc
in manifest_docs
:
417 data
["manifest"].append(doc
)
421 def _get_install_command(
435 timeout_str
= "--timeout {}s".format(timeout
)
440 atomic_str
= "--atomic"
444 namespace_str
= "--namespace {}".format(namespace
)
449 version_str
= "--version {}".format(version
)
452 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
453 "{params} {timeout} {ns} {model} {ver}".format(
454 kubeconfig
=kubeconfig
,
455 helm
=self
._helm
_command
,
467 def _get_upgrade_scale_command(
483 timeout_str
= "--timeout {}s".format(timeout
)
488 atomic_str
= "--atomic"
493 version_str
= "--version {}".format(version
)
498 namespace_str
= "--namespace {}".format(namespace
)
502 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
504 scale_dict
= {replica_str
: scale
}
506 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
509 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
512 helm
=self
._helm
_command
,
514 namespace
=namespace_str
,
520 kubeconfig
=kubeconfig
,
524 def _get_upgrade_command(
538 timeout_str
= "--timeout {}s".format(timeout
)
543 atomic_str
= "--atomic"
548 version_str
= "--version {}".format(version
)
553 namespace_str
= "--namespace {}".format(namespace
)
556 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
557 "--output yaml {params} {timeout} {ver}"
559 kubeconfig
=kubeconfig
,
560 helm
=self
._helm
_command
,
562 namespace
=namespace_str
,
571 def _get_rollback_command(
572 self
, kdu_instance
: str, namespace
: str, revision
: float, kubeconfig
: str
574 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
575 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
, namespace
578 def _get_uninstall_command(
579 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
582 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
583 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
586 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
588 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
589 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
591 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
595 "k8cluster with helm-id : {} not found".format(cluster_uuid
)