2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
22 from typing
import Union
23 from shlex
import quote
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelm3Connector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm3",
49 Initializes helm connector for helm v3
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("K8S Helm3 connector initialized")
82 namespace
: str = None,
85 """Install a helm chart
87 :param cluster_uuid str: The UUID of the cluster to install to
88 :param kdu_model str: chart/reference (string), which can be either
90 - a name of chart available via the repos known by OSM
91 (e.g. stable/openldap, stable/openldap:1.2.4)
92 - a path to a packaged chart (e.g. mychart.tgz)
93 - a path to an unpacked chart directory or a URL (e.g. mychart)
94 :param kdu_instance: Kdu instance name
95 :param atomic bool: If set, waits until the model is active and resets
96 the cluster on failure.
97 :param timeout int: The time, in seconds, to wait for the install
99 :param params dict: Key-value pairs of instantiation parameters
100 :param kdu_name: Name of the KDU instance to be installed
101 :param namespace: K8s namespace to use for the KDU instance
103 :param kwargs: Additional parameters (None yet)
105 :return: True if successful
108 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
111 self
.fs
.sync(from_path
=cluster_uuid
)
114 paths
, env
= self
._init
_paths
_env
(
115 cluster_name
=cluster_uuid
, create_if_not_exist
=True
118 # for helm3 if namespace does not exist must create it
119 if namespace
and namespace
!= "kube-system":
120 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
122 await self
._create
_namespace
(cluster_uuid
, namespace
)
123 except Exception as e
:
124 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
126 "namespace {} does not exist in cluster_id {} "
127 "error message: ".format(namespace
, e
)
129 self
.log
.error(err_msg
)
130 raise K8sException(err_msg
)
132 await self
._install
_impl
(
147 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
149 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
152 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
157 return await self
._exec
_inspect
_command
(
158 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
167 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
191 if base
.endswith("/") or base
.endswith("\\"):
194 # base dir for cluster
195 cluster_dir
= base
+ "/" + cluster_name
198 kube_dir
= cluster_dir
+ "/" + ".kube"
199 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
200 self
.log
.debug("Creating dir {}".format(kube_dir
))
201 os
.makedirs(kube_dir
)
203 helm_path_cache
= cluster_dir
+ "/.cache/helm"
204 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
205 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
206 os
.makedirs(helm_path_cache
)
208 helm_path_config
= cluster_dir
+ "/.config/helm"
209 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
210 self
.log
.debug("Creating dir {}".format(helm_path_config
))
211 os
.makedirs(helm_path_config
)
213 helm_path_data
= cluster_dir
+ "/.local/share/helm"
214 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
215 self
.log
.debug("Creating dir {}".format(helm_path_data
))
216 os
.makedirs(helm_path_data
)
218 config_filename
= kube_dir
+ "/config"
220 # 2 - Prepare dictionary with paths
222 "kube_dir": kube_dir
,
223 "kube_config": config_filename
,
224 "cluster_dir": cluster_dir
,
227 # 3 - Prepare environment variables
229 "HELM_CACHE_HOME": helm_path_cache
,
230 "HELM_CONFIG_HOME": helm_path_config
,
231 "HELM_DATA_HOME": helm_path_data
,
232 "KUBECONFIG": config_filename
,
235 for file_name
, file in paths
.items():
236 if "dir" in file_name
and not os
.path
.exists(file):
237 err_msg
= "{} dir does not exist".format(file)
238 self
.log
.error(err_msg
)
239 raise K8sException(err_msg
)
243 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace
, cluster_id
249 namespaces
= await self
._get
_namespaces
(cluster_id
)
250 return namespace
in namespaces
if namespaces
else False
252 async def _get_namespaces(self
, cluster_id
: str):
253 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
256 paths
, env
= self
._init
_paths
_env
(
257 cluster_name
=cluster_id
, create_if_not_exist
=True
260 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
261 self
.kubectl_command
, quote(paths
["kube_config"])
263 output
, _rc
= await self
._local
_async
_exec
(
264 command
=command
, raise_exception_on_error
=True, env
=env
267 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
268 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
269 self
.log
.debug(f
"namespaces {namespaces}")
273 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
274 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
277 paths
, env
= self
._init
_paths
_env
(
278 cluster_name
=cluster_id
, create_if_not_exist
=True
281 command
= "{} --kubeconfig={} create namespace {}".format(
282 self
.kubectl_command
, quote(paths
["kube_config"]), quote(namespace
)
284 _
, _rc
= await self
._local
_async
_exec
(
285 command
=command
, raise_exception_on_error
=True, env
=env
287 self
.log
.debug(f
"namespace {namespace} created")
291 async def _get_services(
292 self
, cluster_id
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
295 paths
, env
= self
._init
_paths
_env
(
296 cluster_name
=cluster_id
, create_if_not_exist
=True
299 command1
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
300 kubeconfig
, self
._helm
_command
, quote(kdu_instance
), quote(namespace
)
302 command2
= "{} get --namespace={} -f -".format(
303 self
.kubectl_command
, quote(namespace
)
305 output
, _rc
= await self
._local
_async
_exec
_pipe
(
306 command1
, command2
, env
=env
, raise_exception_on_error
=True
308 services
= self
._parse
_services
(output
)
312 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
314 Implements the helm version dependent cluster initialization:
315 For helm3 it creates the namespace if it is not created
317 if namespace
!= "kube-system":
318 namespaces
= await self
._get
_namespaces
(cluster_id
)
319 if namespace
not in namespaces
:
320 await self
._create
_namespace
(cluster_id
, namespace
)
322 repo_list
= await self
.repo_list(cluster_id
)
323 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
324 if not stable_repo
and self
._stable
_repo
_url
:
325 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
327 # Returns False as no software needs to be uninstalled
330 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
331 # nothing to do to uninstall sw
334 async def _instances_list(self
, cluster_id
: str):
336 paths
, env
= self
._init
_paths
_env
(
337 cluster_name
=cluster_id
, create_if_not_exist
=True
340 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
341 output
, _rc
= await self
._local
_async
_exec
(
342 command
=command
, raise_exception_on_error
=True, env
=env
345 if output
and len(output
) > 0:
346 self
.log
.debug("instances list output: {}".format(output
))
347 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
351 def _get_inspect_command(
352 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
354 """Generates the command to obtain the information about an Helm Chart package
355 (´helm show ...´ command)
358 show_command: the second part of the command (`helm show <show_command>`)
359 kdu_model: The name or path of an Helm Chart
360 repo_url: Helm Chart repository url
361 version: constraint with specific version of the Chart to use
364 str: the generated Helm Chart command
367 inspect_command
= "{} show {} {}{} {}".format(
368 self
._helm
_command
, show_command
, quote(kdu_model
), repo_str
, version
370 return inspect_command
372 def _get_get_command(
373 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
376 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
386 async def _status_kdu(
390 namespace
: str = None,
391 yaml_format
: bool = False,
392 show_error_log
: bool = False,
393 ) -> Union
[str, dict]:
395 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
399 namespace
= "kube-system"
402 paths
, env
= self
._init
_paths
_env
(
403 cluster_name
=cluster_id
, create_if_not_exist
=True
405 command
= "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
406 paths
["kube_config"],
412 output
, rc
= await self
._local
_async
_exec
(
414 raise_exception_on_error
=True,
415 show_error_log
=show_error_log
,
425 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
427 # remove field 'notes' and manifest
429 del data
.get("info")["notes"]
433 # parse the manifest to a list of dictionaries
434 if "manifest" in data
:
435 manifest_str
= data
.get("manifest")
436 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
438 data
["manifest"] = []
439 for doc
in manifest_docs
:
440 data
["manifest"].append(doc
)
444 def _get_install_command(
457 timeout_str
= "--timeout {}s".format(timeout
)
462 atomic_str
= "--atomic"
466 namespace_str
= "--namespace {}".format(quote(namespace
))
471 version_str
= "--version {}".format(version
)
474 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
475 "{params} {timeout} {ns} {model} {ver}".format(
476 kubeconfig
=kubeconfig
,
477 helm
=self
._helm
_command
,
478 name
=quote(kdu_instance
),
483 model
=quote(kdu_model
),
489 def _get_upgrade_scale_command(
502 """Generates the command to scale a Helm Chart release
505 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
506 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
507 namespace (str): Namespace where this KDU instance is deployed
508 scale (int): Scale count
509 version (str): Constraint with specific version of the Chart to use
510 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
511 The --wait flag will be set automatically if --atomic is used
512 replica_str (str): The key under resource_name key where the scale count is stored
513 timeout (float): The time, in seconds, to wait
514 resource_name (str): The KDU's resource to scale
515 kubeconfig (str): Kubeconfig file path
518 str: command to scale a Helm Chart release
523 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
525 scale_dict
= {replica_str
: scale
}
527 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
529 return self
._get
_upgrade
_command
(
531 kdu_instance
=kdu_instance
,
533 params_str
=scale_str
,
537 kubeconfig
=kubeconfig
,
540 def _get_upgrade_command(
551 """Generates the command to upgrade a Helm Chart release
554 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
555 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
556 namespace (str): Namespace where this KDU instance is deployed
557 params_str (str): Params used to upgrade the Helm Chart release
558 version (str): Constraint with specific version of the Chart to use
559 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
560 The --wait flag will be set automatically if --atomic is used
561 timeout (float): The time, in seconds, to wait
562 kubeconfig (str): Kubeconfig file path
565 str: command to upgrade a Helm Chart release
570 timeout_str
= "--timeout {}s".format(timeout
)
575 atomic_str
= "--atomic"
580 version_str
= "--version {}".format(quote(version
))
585 namespace_str
= "--namespace {}".format(quote(namespace
))
588 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
589 "--output yaml {params} {timeout} --reuse-values {ver}"
591 kubeconfig
=kubeconfig
,
592 helm
=self
._helm
_command
,
593 name
=quote(kdu_instance
),
594 namespace
=namespace_str
,
598 model
=quote(kdu_model
),
603 def _get_rollback_command(
604 self
, kdu_instance
: str, namespace
: str, revision
: float, kubeconfig
: str
606 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
614 def _get_uninstall_command(
615 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
617 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
618 kubeconfig
, self
._helm
_command
, quote(kdu_instance
), quote(namespace
)
621 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
623 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
624 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
626 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
630 "k8cluster with helm-id : {} not found".format(cluster_uuid
)