2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
22 from typing
import Union
23 from shlex
import quote
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelm3Connector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm3",
49 Initializes helm connector for helm v3
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("K8S Helm3 connector initialized")
82 namespace
: str = None,
85 """Install a helm chart
87 :param cluster_uuid str: The UUID of the cluster to install to
88 :param kdu_model str: chart/reference (string), which can be either
90 - a name of chart available via the repos known by OSM
91 (e.g. stable/openldap, stable/openldap:1.2.4)
92 - a path to a packaged chart (e.g. mychart.tgz)
93 - a path to an unpacked chart directory or a URL (e.g. mychart)
94 :param kdu_instance: Kdu instance name
95 :param atomic bool: If set, waits until the model is active and resets
96 the cluster on failure.
97 :param timeout int: The time, in seconds, to wait for the install
99 :param params dict: Key-value pairs of instantiation parameters
100 :param kdu_name: Name of the KDU instance to be installed
101 :param namespace: K8s namespace to use for the KDU instance
103 :param kwargs: Additional parameters (None yet)
105 :return: True if successful
108 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
111 self
.fs
.sync(from_path
=cluster_uuid
)
114 paths
, env
= self
._init
_paths
_env
(
115 cluster_name
=cluster_uuid
, create_if_not_exist
=True
118 # for helm3 if namespace does not exist must create it
119 if namespace
and namespace
!= "kube-system":
120 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
122 # TODO: refactor to use kubernetes API client
123 await self
._create
_namespace
(cluster_uuid
, namespace
)
124 except Exception as e
:
125 if not await self
._namespace
_exists
(cluster_uuid
, namespace
):
127 "namespace {} does not exist in cluster_id {} "
128 "error message: ".format(namespace
, e
)
130 self
.log
.error(err_msg
)
131 raise K8sException(err_msg
)
133 await self
._install
_impl
(
148 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
150 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
153 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
155 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
158 return await self
._exec
_inspect
_command
(
159 inspect_command
="all", kdu_model
=kdu_model
, repo_url
=repo_url
163 ####################################################################################
164 ################################### P R I V A T E ##################################
165 ####################################################################################
168 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
170 Creates and returns base cluster and kube dirs and returns them.
171 Also created helm3 dirs according to new directory specification, paths are
172 returned and also environment variables that must be provided to execute commands
174 Helm 3 directory specification uses XDG categories for variable support:
175 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
176 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
177 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
179 The variables assigned for this paths are:
180 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
181 $HELM_PATH_DATA but looking and helm env the variable names are different)
182 - Cache: $HELM_CACHE_HOME
183 - Config: $HELM_CONFIG_HOME
184 - Data: $HELM_DATA_HOME
185 - helm kubeconfig: $KUBECONFIG
187 :param cluster_name: cluster_name
188 :return: Dictionary with config_paths and dictionary with helm environment variables
192 if base
.endswith("/") or base
.endswith("\\"):
195 # base dir for cluster
196 cluster_dir
= base
+ "/" + cluster_name
199 kube_dir
= cluster_dir
+ "/" + ".kube"
200 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
201 self
.log
.debug("Creating dir {}".format(kube_dir
))
202 os
.makedirs(kube_dir
)
204 helm_path_cache
= cluster_dir
+ "/.cache/helm"
205 if create_if_not_exist
and not os
.path
.exists(helm_path_cache
):
206 self
.log
.debug("Creating dir {}".format(helm_path_cache
))
207 os
.makedirs(helm_path_cache
)
209 helm_path_config
= cluster_dir
+ "/.config/helm"
210 if create_if_not_exist
and not os
.path
.exists(helm_path_config
):
211 self
.log
.debug("Creating dir {}".format(helm_path_config
))
212 os
.makedirs(helm_path_config
)
214 helm_path_data
= cluster_dir
+ "/.local/share/helm"
215 if create_if_not_exist
and not os
.path
.exists(helm_path_data
):
216 self
.log
.debug("Creating dir {}".format(helm_path_data
))
217 os
.makedirs(helm_path_data
)
219 config_filename
= kube_dir
+ "/config"
221 # 2 - Prepare dictionary with paths
223 "kube_dir": kube_dir
,
224 "kube_config": config_filename
,
225 "cluster_dir": cluster_dir
,
228 # 3 - Prepare environment variables
230 "HELM_CACHE_HOME": helm_path_cache
,
231 "HELM_CONFIG_HOME": helm_path_config
,
232 "HELM_DATA_HOME": helm_path_data
,
233 "KUBECONFIG": config_filename
,
236 for file_name
, file in paths
.items():
237 if "dir" in file_name
and not os
.path
.exists(file):
238 err_msg
= "{} dir does not exist".format(file)
239 self
.log
.error(err_msg
)
240 raise K8sException(err_msg
)
244 async def _namespace_exists(self
, cluster_id
, namespace
) -> bool:
246 "checking if namespace {} exists cluster_id {}".format(
247 namespace
, cluster_id
250 namespaces
= await self
._get
_namespaces
(cluster_id
)
251 return namespace
in namespaces
if namespaces
else False
253 async def _get_namespaces(self
, cluster_id
: str):
254 self
.log
.debug("get namespaces cluster_id {}".format(cluster_id
))
257 paths
, env
= self
._init
_paths
_env
(
258 cluster_name
=cluster_id
, create_if_not_exist
=True
261 command
= "{} --kubeconfig={} get namespaces -o=yaml".format(
262 self
.kubectl_command
, quote(paths
["kube_config"])
264 output
, _rc
= await self
._local
_async
_exec
(
265 command
=command
, raise_exception_on_error
=True, env
=env
268 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
269 namespaces
= [item
["metadata"]["name"] for item
in data
["items"]]
270 self
.log
.debug(f
"namespaces {namespaces}")
274 async def _create_namespace(self
, cluster_id
: str, namespace
: str):
275 self
.log
.debug(f
"create namespace: {cluster_id} for cluster_id: {namespace}")
278 paths
, env
= self
._init
_paths
_env
(
279 cluster_name
=cluster_id
, create_if_not_exist
=True
282 command
= "{} --kubeconfig={} create namespace {}".format(
283 self
.kubectl_command
, quote(paths
["kube_config"]), quote(namespace
)
285 _
, _rc
= await self
._local
_async
_exec
(
286 command
=command
, raise_exception_on_error
=True, env
=env
288 self
.log
.debug(f
"namespace {namespace} created")
292 async def _get_services(
293 self
, cluster_id
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
296 paths
, env
= self
._init
_paths
_env
(
297 cluster_name
=cluster_id
, create_if_not_exist
=True
300 command1
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
301 kubeconfig
, self
._helm
_command
, quote(kdu_instance
), quote(namespace
)
303 command2
= "{} get --namespace={} -f -".format(
304 self
.kubectl_command
, quote(namespace
)
306 output
, _rc
= await self
._local
_async
_exec
_pipe
(
307 command1
, command2
, env
=env
, raise_exception_on_error
=True
309 services
= self
._parse
_services
(output
)
313 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
315 Implements the helm version dependent cluster initialization:
316 For helm3 it creates the namespace if it is not created
318 if namespace
!= "kube-system":
319 namespaces
= await self
._get
_namespaces
(cluster_id
)
320 if namespace
not in namespaces
:
321 # TODO: refactor to use kubernetes API client
322 await self
._create
_namespace
(cluster_id
, namespace
)
324 repo_list
= await self
.repo_list(cluster_id
)
325 stable_repo
= [repo
for repo
in repo_list
if repo
["name"] == "stable"]
326 if not stable_repo
and self
._stable
_repo
_url
:
327 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
329 # Returns False as no software needs to be uninstalled
332 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
333 # nothing to do to uninstall sw
336 async def _instances_list(self
, cluster_id
: str):
338 paths
, env
= self
._init
_paths
_env
(
339 cluster_name
=cluster_id
, create_if_not_exist
=True
342 command
= "{} list --all-namespaces --output yaml".format(self
._helm
_command
)
343 output
, _rc
= await self
._local
_async
_exec
(
344 command
=command
, raise_exception_on_error
=True, env
=env
347 if output
and len(output
) > 0:
348 self
.log
.debug("instances list output: {}".format(output
))
349 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
353 def _get_inspect_command(
354 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
356 """Generates the command to obtain the information about an Helm Chart package
357 (´helm show ...´ command)
360 show_command: the second part of the command (`helm show <show_command>`)
361 kdu_model: The name or path of an Helm Chart
362 repo_url: Helm Chart repository url
363 version: constraint with specific version of the Chart to use
366 str: the generated Helm Chart command
369 inspect_command
= "{} show {} {}{} {}".format(
370 self
._helm
_command
, show_command
, quote(kdu_model
), repo_str
, version
372 return inspect_command
374 def _get_get_command(
375 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
378 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
388 async def _status_kdu(
392 namespace
: str = None,
393 yaml_format
: bool = False,
394 show_error_log
: bool = False,
395 ) -> Union
[str, dict]:
397 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
401 namespace
= "kube-system"
404 paths
, env
= self
._init
_paths
_env
(
405 cluster_name
=cluster_id
, create_if_not_exist
=True
407 command
= "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
408 paths
["kube_config"],
414 output
, rc
= await self
._local
_async
_exec
(
416 raise_exception_on_error
=True,
417 show_error_log
=show_error_log
,
427 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
429 # remove field 'notes' and manifest
431 del data
.get("info")["notes"]
435 # parse the manifest to a list of dictionaries
436 if "manifest" in data
:
437 manifest_str
= data
.get("manifest")
438 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
440 data
["manifest"] = []
441 for doc
in manifest_docs
:
442 data
["manifest"].append(doc
)
446 def _get_install_command(
459 timeout_str
= "--timeout {}s".format(timeout
)
464 atomic_str
= "--atomic"
468 namespace_str
= "--namespace {}".format(quote(namespace
))
473 version_str
= "--version {}".format(version
)
476 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
477 "{params} {timeout} {ns} {model} {ver}".format(
478 kubeconfig
=kubeconfig
,
479 helm
=self
._helm
_command
,
480 name
=quote(kdu_instance
),
485 model
=quote(kdu_model
),
491 def _get_upgrade_scale_command(
504 """Generates the command to scale a Helm Chart release
507 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
508 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
509 namespace (str): Namespace where this KDU instance is deployed
510 scale (int): Scale count
511 version (str): Constraint with specific version of the Chart to use
512 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
513 The --wait flag will be set automatically if --atomic is used
514 replica_str (str): The key under resource_name key where the scale count is stored
515 timeout (float): The time, in seconds, to wait
516 resource_name (str): The KDU's resource to scale
517 kubeconfig (str): Kubeconfig file path
520 str: command to scale a Helm Chart release
525 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
527 scale_dict
= {replica_str
: scale
}
529 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
531 return self
._get
_upgrade
_command
(
533 kdu_instance
=kdu_instance
,
535 params_str
=scale_str
,
539 kubeconfig
=kubeconfig
,
542 def _get_upgrade_command(
554 """Generates the command to upgrade a Helm Chart release
557 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
558 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
559 namespace (str): Namespace where this KDU instance is deployed
560 params_str (str): Params used to upgrade the Helm Chart release
561 version (str): Constraint with specific version of the Chart to use
562 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
563 The --wait flag will be set automatically if --atomic is used
564 timeout (float): The time, in seconds, to wait
565 kubeconfig (str): Kubeconfig file path
566 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
568 str: command to upgrade a Helm Chart release
573 timeout_str
= "--timeout {}s".format(timeout
)
578 atomic_str
= "--atomic"
583 force_str
= "--force "
588 version_str
= "--version {}".format(quote(version
))
593 namespace_str
= "--namespace {}".format(quote(namespace
))
596 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
597 "--output yaml {params} {timeout} --reuse-values {ver}"
599 kubeconfig
=kubeconfig
,
600 helm
=self
._helm
_command
,
601 name
=quote(kdu_instance
),
602 namespace
=namespace_str
,
607 model
=quote(kdu_model
),
612 def _get_rollback_command(
613 self
, kdu_instance
: str, namespace
: str, revision
: float, kubeconfig
: str
615 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
623 def _get_uninstall_command(
624 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
626 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
627 kubeconfig
, self
._helm
_command
, quote(kdu_instance
), quote(namespace
)
630 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
632 cluster_filter
= {"_admin.helm-chart-v3.id": cluster_uuid
}
633 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
635 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
639 "k8cluster with helm-id : {} not found".format(cluster_uuid
)