2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
22 from typing
import Union
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelm3Connector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm3",
48 Initializes helm connector for helm v3
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("K8S Helm3 connector initialized")
81 namespace
: str = None,
84 """Install a helm chart
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: The name or path of a bundle to install
88 :param kdu_instance: Kdu instance name
89 :param atomic bool: If set, waits until the model is active and resets
90 the cluster on failure.
91 :param timeout int: The time, in seconds, to wait for the install
93 :param params dict: Key-value pairs of instantiation parameters
94 :param kdu_name: Name of the KDU instance to be installed
95 :param namespace: K8s namespace to use for the KDU instance
97 :param kwargs: Additional parameters (None yet)
99 :return: True if successful
102 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
105 self
.fs
.sync(from_path
=cluster_uuid
)
108 paths
, env
= self
._init
_paths
_env
(
109 cluster_name
=cluster_uuid
, create_if_not_exist
=True
112 # for helm3 if namespace does not exist must create it
113 if namespace
and namespace
!= "kube-system":
114 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
116 await self
._create
_namespace
(cluster_uuid
, namespace
)
117 except Exception as e
:
118 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
120 "namespace {} does not exist in cluster_id {} "
121 "error message: ".format(namespace
, e
)
123 self
.log
.error(err_msg
)
124 raise K8sException(err_msg
)
126 await self
._install
_impl
(
141 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
143 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
146 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
149 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
152 return await self
._exec
_inspect
_command
(
153 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
157 ####################################################################################
158 ################################### P R I V A T E ##################################
159 ####################################################################################
162 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
164 Creates and returns base cluster and kube dirs and returns them.
165 Also created helm3 dirs according to new directory specification, paths are
166 returned and also environment variables that must be provided to execute commands
168 Helm 3 directory specification uses XDG categories for variable support:
169 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
170 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
171 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
173 The variables assigned for this paths are:
174 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
175 $HELM_PATH_DATA but looking and helm env the variable names are different)
176 - Cache: $HELM_CACHE_HOME
177 - Config: $HELM_CONFIG_HOME
178 - Data: $HELM_DATA_HOME
179 - helm kubeconfig: $KUBECONFIG
181 :param cluster_name: cluster_name
182 :return: Dictionary with config_paths and dictionary with helm environment variables
186 if base
.endswith("/") or base
.endswith("\\"):
189 # base dir for cluster
190 cluster_dir
= base
+ "/" + cluster_name
193 kube_dir
= cluster_dir
+ "/" + ".kube"
194 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
195 self
.log
.debug("Creating dir {}".format(kube_dir
))
196 os
.makedirs(kube_dir
)
198 helm_path_cache
= cluster_dir
+ "/.cache/helm"
199 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
200 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
201 os
.makedirs(helm_path_cache
)
203 helm_path_config
= cluster_dir
+ "/.config/helm"
204 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
205 self
.log
.debug("Creating dir {}".format(helm_path_config
))
206 os
.makedirs(helm_path_config
)
208 helm_path_data
= cluster_dir
+ "/.local/share/helm"
209 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
210 self
.log
.debug("Creating dir {}".format(helm_path_data
))
211 os
.makedirs(helm_path_data
)
213 config_filename
= kube_dir
+ "/config"
215 # 2 - Prepare dictionary with paths
217 "kube_dir": kube_dir
,
218 "kube_config": config_filename
,
219 "cluster_dir": cluster_dir
,
222 # 3 - Prepare environment variables
224 "HELM_CACHE_HOME": helm_path_cache
,
225 "HELM_CONFIG_HOME": helm_path_config
,
226 "HELM_DATA_HOME": helm_path_data
,
227 "KUBECONFIG": config_filename
,
230 for file_name
, file in paths
.items():
231 if "dir" in file_name
and not os
.path
.exists(file):
232 err_msg
= "{} dir does not exist".format(file)
233 self
.log
.error(err_msg
)
234 raise K8sException(err_msg
)
238 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
240 "checking if namespace {} exists cluster_id {}".format(
241 namespace
, cluster_id
244 namespaces
= await self
._get
_namespaces
(cluster_id
)
245 return namespace
in namespaces
if namespaces
else False
247 async def _get_namespaces(self
, cluster_id
: str):
249 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
252 paths
, env
= self
._init
_paths
_env
(
253 cluster_name
=cluster_id
, create_if_not_exist
=True
256 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
257 self
.kubectl_command
, paths
["kube_config"]
259 output
, _rc
= await self
._local
_async
_exec
(
260 command
=command
, raise_exception_on_error
=True, env
=env
263 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
264 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
265 self
.log
.debug(f
"namespaces {namespaces}")
269 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
271 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
274 paths
, env
= self
._init
_paths
_env
(
275 cluster_name
=cluster_id
, create_if_not_exist
=True
278 command
= "{} --kubeconfig={} create namespace {}".format(
279 self
.kubectl_command
, paths
["kube_config"], namespace
281 _
, _rc
= await self
._local
_async
_exec
(
282 command
=command
, raise_exception_on_error
=True, env
=env
284 self
.log
.debug(f
"namespace {namespace} created")
288 async def _get_services(
289 self
, cluster_id
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
293 paths
, env
= self
._init
_paths
_env
(
294 cluster_name
=cluster_id
, create_if_not_exist
=True
297 command1
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
298 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
300 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
301 output
, _rc
= await self
._local
_async
_exec
_pipe
(
302 command1
, command2
, env
=env
, raise_exception_on_error
=True
304 services
= self
._parse
_services
(output
)
308 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
310 Implements the helm version dependent cluster initialization:
311 For helm3 it creates the namespace if it is not created
313 if namespace
!= "kube-system":
314 namespaces
= await self
._get
_namespaces
(cluster_id
)
315 if namespace
not in namespaces
:
316 await self
._create
_namespace
(cluster_id
, namespace
)
318 repo_list
= await self
.repo_list(cluster_id
)
319 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
320 if not stable_repo
and self
._stable
_repo
_url
:
321 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
323 # Returns False as no software needs to be uninstalled
326 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
327 # nothing to do to uninstall sw
330 async def _instances_list(self
, cluster_id
: str):
333 paths
, env
= self
._init
_paths
_env
(
334 cluster_name
=cluster_id
, create_if_not_exist
=True
337 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
338 output
, _rc
= await self
._local
_async
_exec
(
339 command
=command
, raise_exception_on_error
=True, env
=env
342 if output
and len(output
) > 0:
343 self
.log
.debug("instances list output: {}".format(output
))
344 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
348 def _get_inspect_command(
349 self
, inspect_command
: str, kdu_model
: str, repo_str
: str, version
: str
351 inspect_command
= "{} show {} {}{} {}".format(
352 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
, version
354 return inspect_command
356 def _get_get_command(
357 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
360 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
361 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
, namespace
366 async def _status_kdu(
370 namespace
: str = None,
371 yaml_format
: bool = False,
372 show_error_log
: bool = False,
373 ) -> Union
[str, dict]:
376 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
380 namespace
= "kube-system"
383 paths
, env
= self
._init
_paths
_env
(
384 cluster_name
=cluster_id
, create_if_not_exist
=True
386 command
= "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
387 paths
["kube_config"], self
._helm
_command
, kdu_instance
, namespace
390 output
, rc
= await self
._local
_async
_exec
(
392 raise_exception_on_error
=True,
393 show_error_log
=show_error_log
,
403 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
405 # remove field 'notes' and manifest
407 del data
.get("info")["notes"]
411 # parse the manifest to a list of dictionaries
412 if "manifest" in data
:
413 manifest_str
= data
.get("manifest")
414 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
416 data
["manifest"] = []
417 for doc
in manifest_docs
:
418 data
["manifest"].append(doc
)
422 def _get_install_command(
436 timeout_str
= "--timeout {}s".format(timeout
)
441 atomic_str
= "--atomic"
445 namespace_str
= "--namespace {}".format(namespace
)
450 version_str
= "--version {}".format(version
)
453 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
454 "{params} {timeout} {ns} {model} {ver}".format(
455 kubeconfig
=kubeconfig
,
456 helm
=self
._helm
_command
,
468 def _get_upgrade_scale_command(
484 timeout_str
= "--timeout {}s".format(timeout
)
489 atomic_str
= "--atomic"
494 version_str
= "--version {}".format(version
)
499 namespace_str
= "--namespace {}".format(namespace
)
503 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
505 scale_dict
= {replica_str
: scale
}
507 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
510 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
513 helm
=self
._helm
_command
,
515 namespace
=namespace_str
,
521 kubeconfig
=kubeconfig
,
525 def _get_upgrade_command(
539 timeout_str
= "--timeout {}s".format(timeout
)
544 atomic_str
= "--atomic"
549 version_str
= "--version {}".format(version
)
554 namespace_str
= "--namespace {}".format(namespace
)
557 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
558 "--output yaml {params} {timeout} {ver}"
560 kubeconfig
=kubeconfig
,
561 helm
=self
._helm
_command
,
563 namespace
=namespace_str
,
572 def _get_rollback_command(
573 self
, kdu_instance
: str, namespace
: str, revision
: float, kubeconfig
: str
575 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
576 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
, namespace
579 def _get_uninstall_command(
580 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
583 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
584 kubeconfig
, self
._helm
_command
, kdu_instance
, namespace
587 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
589 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
590 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
592 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
596 "k8cluster with helm-id : {} not found".format(cluster_uuid
)