Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm3_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
k8s_helm3_conn.py
100%
1/1
81%
150/186
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm3_conn.py
81%
150/186
N/A

Source

n2vc/k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 from typing import Union
23 1 import os
24 1 import yaml
25
26 1 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 1 from n2vc.exceptions import K8sException
28
29
30 1 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32     """
33     ####################################################################################
34     ################################### P U B L I C ####################################
35     ####################################################################################
36     """
37
38 1     def __init__(
39         self,
40         fs: object,
41         db: object,
42         kubectl_command: str = "/usr/bin/kubectl",
43         helm_command: str = "/usr/bin/helm3",
44         log: object = None,
45         on_update_db=None,
46     ):
47         """
48         Initializes helm connector for helm v3
49
50         :param fs: file system for kubernetes and helm configuration
51         :param db: database object to write current operation status
52         :param kubectl_command: path to kubectl executable
53         :param helm_command: path to helm executable
54         :param log: logger
55         :param on_update_db: callback called when k8s connector updates database
56         """
57
58         # parent class
59 1         K8sHelmBaseConnector.__init__(
60             self,
61             db=db,
62             log=log,
63             fs=fs,
64             kubectl_command=kubectl_command,
65             helm_command=helm_command,
66             on_update_db=on_update_db,
67         )
68
69 1         self.log.info("K8S Helm3 connector initialized")
70
71 1     async def install(
72         self,
73         cluster_uuid: str,
74         kdu_model: str,
75         kdu_instance: str,
76         atomic: bool = True,
77         timeout: float = 300,
78         params: dict = None,
79         db_dict: dict = None,
80         kdu_name: str = None,
81         namespace: str = None,
82         **kwargs,
83     ):
84         """Install a helm chart
85
86         :param cluster_uuid str: The UUID of the cluster to install to
87         :param kdu_model str: chart/reference (string), which can be either
88             of these options:
89             - a name of chart available via the repos known by OSM
90               (e.g. stable/openldap, stable/openldap:1.2.4)
91             - a path to a packaged chart (e.g. mychart.tgz)
92             - a path to an unpacked chart directory or a URL (e.g. mychart)
93         :param kdu_instance: Kdu instance name
94         :param atomic bool: If set, waits until the model is active and resets
95                             the cluster on failure.
96         :param timeout int: The time, in seconds, to wait for the install
97                             to finish
98         :param params dict: Key-value pairs of instantiation parameters
99         :param kdu_name: Name of the KDU instance to be installed
100         :param namespace: K8s namespace to use for the KDU instance
101
102         :param kwargs: Additional parameters (None yet)
103
104         :return: True if successful
105         """
106
107 1         self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
108
109         # sync local dir
110 1         self.fs.sync(from_path=cluster_uuid)
111
112         # init env, paths
113 1         paths, env = self._init_paths_env(
114             cluster_name=cluster_uuid, create_if_not_exist=True
115         )
116
117         # for helm3 if namespace does not exist must create it
118 1         if namespace and namespace != "kube-system":
119 1             if not await self._namespace_exists(cluster_uuid, namespace):
120 1                 try:
121 1                     await self._create_namespace(cluster_uuid, namespace)
122 1                 except Exception as e:
123 1                     if not await self._namespace_exists(cluster_uuid, namespace):
124 1                         err_msg = (
125                             "namespace {} does not exist in cluster_id {} "
126                             "error message: ".format(namespace, e)
127                         )
128 1                         self.log.error(err_msg)
129 1                         raise K8sException(err_msg)
130
131 1         await self._install_impl(
132             cluster_uuid,
133             kdu_model,
134             paths,
135             env,
136             kdu_instance,
137             atomic=atomic,
138             timeout=timeout,
139             params=params,
140             db_dict=db_dict,
141             kdu_name=kdu_name,
142             namespace=namespace,
143         )
144
145         # sync fs
146 1         self.fs.reverse_sync(from_path=cluster_uuid)
147
148 1         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
149 1         return True
150
151 1     async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
152 1         self.log.debug(
153             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
154         )
155
156 1         return await self._exec_inspect_command(
157             inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
158         )
159
160     """
161     ####################################################################################
162     ################################### P R I V A T E ##################################
163     ####################################################################################
164     """
165
166 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
167         """
168         Creates and returns base cluster and kube dirs and returns them.
169         Also created helm3 dirs according to new directory specification, paths are
170         returned and also environment variables that must be provided to execute commands
171
172         Helm 3 directory specification uses XDG categories for variable support:
173         - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
174         - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
175         - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
176
177         The variables assigned for this paths are:
178         (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
179         $HELM_PATH_DATA but looking and helm env the variable names are different)
180         - Cache: $HELM_CACHE_HOME
181         - Config: $HELM_CONFIG_HOME
182         - Data: $HELM_DATA_HOME
183         - helm kubeconfig: $KUBECONFIG
184
185         :param cluster_name:  cluster_name
186         :return: Dictionary with config_paths and dictionary with helm environment variables
187         """
188
189 1         base = self.fs.path
190 1         if base.endswith("/") or base.endswith("\\"):
191 1             base = base[:-1]
192
193         # base dir for cluster
194 1         cluster_dir = base + "/" + cluster_name
195
196         # kube dir
197 1         kube_dir = cluster_dir + "/" + ".kube"
198 1         if create_if_not_exist and not os.path.exists(kube_dir):
199 1             self.log.debug("Creating dir {}".format(kube_dir))
200 1             os.makedirs(kube_dir)
201
202 1         helm_path_cache = cluster_dir + "/.cache/helm"
203 1         if create_if_not_exist and not os.path.exists(helm_path_cache):
204 1             self.log.debug("Creating dir {}".format(helm_path_cache))
205 1             os.makedirs(helm_path_cache)
206
207 1         helm_path_config = cluster_dir + "/.config/helm"
208 1         if create_if_not_exist and not os.path.exists(helm_path_config):
209 1             self.log.debug("Creating dir {}".format(helm_path_config))
210 1             os.makedirs(helm_path_config)
211
212 1         helm_path_data = cluster_dir + "/.local/share/helm"
213 1         if create_if_not_exist and not os.path.exists(helm_path_data):
214 1             self.log.debug("Creating dir {}".format(helm_path_data))
215 1             os.makedirs(helm_path_data)
216
217 1         config_filename = kube_dir + "/config"
218
219         # 2 - Prepare dictionary with paths
220 1         paths = {
221             "kube_dir": kube_dir,
222             "kube_config": config_filename,
223             "cluster_dir": cluster_dir,
224         }
225
226         # 3 - Prepare environment variables
227 1         env = {
228             "HELM_CACHE_HOME": helm_path_cache,
229             "HELM_CONFIG_HOME": helm_path_config,
230             "HELM_DATA_HOME": helm_path_data,
231             "KUBECONFIG": config_filename,
232         }
233
234 1         for file_name, file in paths.items():
235 1             if "dir" in file_name and not os.path.exists(file):
236 0                 err_msg = "{} dir does not exist".format(file)
237 0                 self.log.error(err_msg)
238 0                 raise K8sException(err_msg)
239
240 1         return paths, env
241
242 1     async def _namespace_exists(self, cluster_id, namespace) -> bool:
243 1         self.log.debug(
244             "checking if namespace {} exists cluster_id {}".format(
245                 namespace, cluster_id
246             )
247         )
248 1         namespaces = await self._get_namespaces(cluster_id)
249 1         return namespace in namespaces if namespaces else False
250
251 1     async def _get_namespaces(self, cluster_id: str):
252 0         self.log.debug("get namespaces cluster_id {}".format(cluster_id))
253
254         # init config, env
255 0         paths, env = self._init_paths_env(
256             cluster_name=cluster_id, create_if_not_exist=True
257         )
258
259 0         command = "{} --kubeconfig={} get namespaces -o=yaml".format(
260             self.kubectl_command, paths["kube_config"]
261         )
262 0         output, _rc = await self._local_async_exec(
263             command=command, raise_exception_on_error=True, env=env
264         )
265
266 0         data = yaml.load(output, Loader=yaml.SafeLoader)
267 0         namespaces = [item["metadata"]["name"] for item in data["items"]]
268 0         self.log.debug(f"namespaces {namespaces}")
269
270 0         return namespaces
271
272 1     async def _create_namespace(self, cluster_id: str, namespace: str):
273 0         self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
274
275         # init config, env
276 0         paths, env = self._init_paths_env(
277             cluster_name=cluster_id, create_if_not_exist=True
278         )
279
280 0         command = "{} --kubeconfig={} create namespace {}".format(
281             self.kubectl_command, paths["kube_config"], namespace
282         )
283 0         _, _rc = await self._local_async_exec(
284             command=command, raise_exception_on_error=True, env=env
285         )
286 0         self.log.debug(f"namespace {namespace} created")
287
288 0         return _rc
289
290 1     async def _get_services(
291         self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
292     ):
293         # init config, env
294 1         paths, env = self._init_paths_env(
295             cluster_name=cluster_id, create_if_not_exist=True
296         )
297
298 1         command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
299             kubeconfig, self._helm_command, kdu_instance, namespace
300         )
301 1         command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
302 1         output, _rc = await self._local_async_exec_pipe(
303             command1, command2, env=env, raise_exception_on_error=True
304         )
305 1         services = self._parse_services(output)
306
307 1         return services
308
309 1     async def _cluster_init(self, cluster_id, namespace, paths, env):
310         """
311         Implements the helm version dependent cluster initialization:
312         For helm3 it creates the namespace if it is not created
313         """
314 1         if namespace != "kube-system":
315 1             namespaces = await self._get_namespaces(cluster_id)
316 1             if namespace not in namespaces:
317 1                 await self._create_namespace(cluster_id, namespace)
318
319 1         repo_list = await self.repo_list(cluster_id)
320 1         stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
321 1         if not stable_repo and self._stable_repo_url:
322 1             await self.repo_add(cluster_id, "stable", self._stable_repo_url)
323
324         # Returns False as no software needs to be uninstalled
325 1         return False
326
327 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
328         # nothing to do to uninstall sw
329 0         pass
330
331 1     async def _instances_list(self, cluster_id: str):
332         # init paths, env
333 1         paths, env = self._init_paths_env(
334             cluster_name=cluster_id, create_if_not_exist=True
335         )
336
337 1         command = "{} list --all-namespaces  --output yaml".format(self._helm_command)
338 1         output, _rc = await self._local_async_exec(
339             command=command, raise_exception_on_error=True, env=env
340         )
341
342 1         if output and len(output) > 0:
343 0             self.log.debug("instances list output: {}".format(output))
344 0             return yaml.load(output, Loader=yaml.SafeLoader)
345         else:
346 1             return []
347
348 1     def _get_inspect_command(
349         self, show_command: str, kdu_model: str, repo_str: str, version: str
350     ):
351         """Generates the command to obtain the information about an Helm Chart package
352             (´helm show ...´ command)
353
354         Args:
355             show_command: the second part of the command (`helm show <show_command>`)
356             kdu_model: The name or path of an Helm Chart
357             repo_url: Helm Chart repository url
358             version: constraint with specific version of the Chart to use
359
360         Returns:
361             str: the generated Helm Chart command
362         """
363
364 1         inspect_command = "{} show {} {}{} {}".format(
365             self._helm_command, show_command, kdu_model, repo_str, version
366         )
367 1         return inspect_command
368
369 1     def _get_get_command(
370         self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
371     ):
372 1         get_command = (
373             "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
374                 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
375             )
376         )
377 1         return get_command
378
379 1     async def _status_kdu(
380         self,
381         cluster_id: str,
382         kdu_instance: str,
383         namespace: str = None,
384         yaml_format: bool = False,
385         show_error_log: bool = False,
386     ) -> Union[str, dict]:
387 1         self.log.debug(
388             "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
389         )
390
391 1         if not namespace:
392 0             namespace = "kube-system"
393
394         # init config, env
395 1         paths, env = self._init_paths_env(
396             cluster_name=cluster_id, create_if_not_exist=True
397         )
398 1         command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
399             paths["kube_config"], self._helm_command, kdu_instance, namespace
400         )
401
402 1         output, rc = await self._local_async_exec(
403             command=command,
404             raise_exception_on_error=True,
405             show_error_log=show_error_log,
406             env=env,
407         )
408
409 1         if yaml_format:
410 1             return str(output)
411
412 0         if rc != 0:
413 0             return None
414
415 0         data = yaml.load(output, Loader=yaml.SafeLoader)
416
417         # remove field 'notes' and manifest
418 0         try:
419 0             del data.get("info")["notes"]
420 0         except KeyError:
421 0             pass
422
423         # parse the manifest to a list of dictionaries
424 0         if "manifest" in data:
425 0             manifest_str = data.get("manifest")
426 0             manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
427
428 0             data["manifest"] = []
429 0             for doc in manifest_docs:
430 0                 data["manifest"].append(doc)
431
432 0         return data
433
434 1     def _get_install_command(
435         self,
436         kdu_model: str,
437         kdu_instance: str,
438         namespace: str,
439         params_str: str,
440         version: str,
441         atomic: bool,
442         timeout: float,
443         kubeconfig: str,
444     ) -> str:
445 1         timeout_str = ""
446 1         if timeout:
447 1             timeout_str = "--timeout {}s".format(timeout)
448
449         # atomic
450 1         atomic_str = ""
451 1         if atomic:
452 1             atomic_str = "--atomic"
453         # namespace
454 1         namespace_str = ""
455 1         if namespace:
456 1             namespace_str = "--namespace {}".format(namespace)
457
458         # version
459 1         version_str = ""
460 1         if version:
461 1             version_str = "--version {}".format(version)
462
463 1         command = (
464             "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml  "
465             "{params} {timeout} {ns} {model} {ver}".format(
466                 kubeconfig=kubeconfig,
467                 helm=self._helm_command,
468                 name=kdu_instance,
469                 atomic=atomic_str,
470                 params=params_str,
471                 timeout=timeout_str,
472                 ns=namespace_str,
473                 model=kdu_model,
474                 ver=version_str,
475             )
476         )
477 1         return command
478
479 1     def _get_upgrade_scale_command(
480         self,
481         kdu_model: str,
482         kdu_instance: str,
483         namespace: str,
484         scale: int,
485         version: str,
486         atomic: bool,
487         replica_str: str,
488         timeout: float,
489         resource_name: str,
490         kubeconfig: str,
491     ) -> str:
492         """Generates the command to scale a Helm Chart release
493
494         Args:
495             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
496             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
497             namespace (str): Namespace where this KDU instance is deployed
498             scale (int): Scale count
499             version (str): Constraint with specific version of the Chart to use
500             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
501                 The --wait flag will be set automatically if --atomic is used
502             replica_str (str): The key under resource_name key where the scale count is stored
503             timeout (float): The time, in seconds, to wait
504             resource_name (str): The KDU's resource to scale
505             kubeconfig (str): Kubeconfig file path
506
507         Returns:
508             str: command to scale a Helm Chart release
509         """
510
511         # scale
512 1         if resource_name:
513 1             scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
514         else:
515 1             scale_dict = {replica_str: scale}
516
517 1         scale_str = self._params_to_set_option(scale_dict)
518
519 1         return self._get_upgrade_command(
520             kdu_model=kdu_model,
521             kdu_instance=kdu_instance,
522             namespace=namespace,
523             params_str=scale_str,
524             version=version,
525             atomic=atomic,
526             timeout=timeout,
527             kubeconfig=kubeconfig,
528         )
529
530 1     def _get_upgrade_command(
531         self,
532         kdu_model: str,
533         kdu_instance: str,
534         namespace: str,
535         params_str: str,
536         version: str,
537         atomic: bool,
538         timeout: float,
539         kubeconfig: str,
540         force: bool = False,
541     ) -> str:
542         """Generates the command to upgrade a Helm Chart release
543
544         Args:
545             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
546             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
547             namespace (str): Namespace where this KDU instance is deployed
548             params_str (str): Params used to upgrade the Helm Chart release
549             version (str): Constraint with specific version of the Chart to use
550             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
551                 The --wait flag will be set automatically if --atomic is used
552             timeout (float): The time, in seconds, to wait
553             kubeconfig (str): Kubeconfig file path
554             force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
555         Returns:
556             str: command to upgrade a Helm Chart release
557         """
558
559 1         timeout_str = ""
560 1         if timeout:
561 1             timeout_str = "--timeout {}s".format(timeout)
562
563         # atomic
564 1         atomic_str = ""
565 1         if atomic:
566 1             atomic_str = "--atomic"
567
568         # force
569 1         force_str = ""
570 1         if force:
571 1             force_str = "--force "
572
573         # version
574 1         version_str = ""
575 1         if version:
576 1             version_str = "--version {}".format(version)
577
578         # namespace
579 1         namespace_str = ""
580 1         if namespace:
581 1             namespace_str = "--namespace {}".format(namespace)
582
583 1         command = (
584             "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
585             "--output yaml {params} {timeout} --reuse-values {ver}"
586         ).format(
587             kubeconfig=kubeconfig,
588             helm=self._helm_command,
589             name=kdu_instance,
590             namespace=namespace_str,
591             atomic=atomic_str,
592             force=force_str,
593             params=params_str,
594             timeout=timeout_str,
595             model=kdu_model,
596             ver=version_str,
597         )
598 1         return command
599
600 1     def _get_rollback_command(
601         self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
602     ) -> str:
603 1         return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
604             kubeconfig, self._helm_command, kdu_instance, revision, namespace
605         )
606
607 1     def _get_uninstall_command(
608         self, kdu_instance: str, namespace: str, kubeconfig: str
609     ) -> str:
610 1         return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
611             kubeconfig, self._helm_command, kdu_instance, namespace
612         )
613
614 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
615 1         repo_ids = []
616 1         cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
617 1         cluster = self.db.get_one("k8sclusters", cluster_filter)
618 1         if cluster:
619 1             repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
620 1             return repo_ids
621         else:
622 0             raise K8sException(
623                 "k8cluster with helm-id : {} not found".format(cluster_uuid)
624             )