Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm3_conn.py

Trend

Classes100%
 
Lines79%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_helm3_conn.py
100%
1/1
79%
129/164
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm3_conn.py
79%
129/164
N/A

Source

n2vc/k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 import os
23 1 import yaml
24
25 1 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 1 from n2vc.exceptions import K8sException
27
28
29 1 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31     """
32     ####################################################################################
33     ################################### P U B L I C ####################################
34     ####################################################################################
35     """
36
37 1     def __init__(
38             self,
39             fs: object,
40             db: object,
41             kubectl_command: str = "/usr/bin/kubectl",
42             helm_command: str = "/usr/bin/helm3",
43             log: object = None,
44             on_update_db=None,
45             vca_config: dict = None,
46     ):
47         """
48         Initializes helm connector for helm v3
49
50         :param fs: file system for kubernetes and helm configuration
51         :param db: database object to write current operation status
52         :param kubectl_command: path to kubectl executable
53         :param helm_command: path to helm executable
54         :param log: logger
55         :param on_update_db: callback called when k8s connector updates database
56         """
57
58         # parent class
59 1         K8sHelmBaseConnector.__init__(self,
60                                       db=db,
61                                       log=log,
62                                       fs=fs,
63                                       kubectl_command=kubectl_command,
64                                       helm_command=helm_command,
65                                       on_update_db=on_update_db,
66                                       vca_config=vca_config)
67
68 1         self.log.info("K8S Helm3 connector initialized")
69
70 1     async def install(
71             self,
72             cluster_uuid: str,
73             kdu_model: str,
74             kdu_instance: str,
75             atomic: bool = True,
76             timeout: float = 300,
77             params: dict = None,
78             db_dict: dict = None,
79             kdu_name: str = None,
80             namespace: str = None,
81     ):
82 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
83 1         self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
84
85         # sync local dir
86 1         self.fs.sync(from_path=cluster_id)
87
88         # init env, paths
89 1         paths, env = self._init_paths_env(
90             cluster_name=cluster_id, create_if_not_exist=True
91         )
92
93         # for helm3 if namespace does not exist must create it
94 1         if namespace and namespace != "kube-system":
95 1             namespaces = await self._get_namespaces(cluster_id)
96 1             if namespace not in namespaces:
97 1                 await self._create_namespace(cluster_id, namespace)
98
99 1         await self._install_impl(
100             cluster_id,
101             kdu_model,
102             paths,
103             env,
104             kdu_instance,
105             atomic=atomic,
106             timeout=timeout,
107             params=params,
108             db_dict=db_dict,
109             kdu_name=kdu_name,
110             namespace=namespace,
111         )
112
113         # sync fs
114 1         self.fs.reverse_sync(from_path=cluster_id)
115
116 1         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
117 1         return True
118
119 1     async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
120
121 1         self.log.debug(
122             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
123         )
124
125 1         return await self._exec_inspect_comand(
126             inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
127         )
128
129     """
130     ####################################################################################
131     ################################### P R I V A T E ##################################
132     ####################################################################################
133     """
134
135 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
136         """
137         Creates and returns base cluster and kube dirs and returns them.
138         Also created helm3 dirs according to new directory specification, paths are
139         returned and also environment variables that must be provided to execute commands
140
141         Helm 3 directory specification uses XDG categories for variable support:
142         - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
143         - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
144         - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
145
146         The variables assigned for this paths are:
147         (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
148         $HELM_PATH_DATA but looking and helm env the variable names are different)
149         - Cache: $HELM_CACHE_HOME
150         - Config: $HELM_CONFIG_HOME
151         - Data: $HELM_DATA_HOME
152         - helm kubeconfig: $KUBECONFIG
153
154         :param cluster_name:  cluster_name
155         :return: Dictionary with config_paths and dictionary with helm environment variables
156         """
157
158 1         base = self.fs.path
159 1         if base.endswith("/") or base.endswith("\\"):
160 1             base = base[:-1]
161
162         # base dir for cluster
163 1         cluster_dir = base + "/" + cluster_name
164
165         # kube dir
166 1         kube_dir = cluster_dir + "/" + ".kube"
167 1         if create_if_not_exist and not os.path.exists(kube_dir):
168 1             self.log.debug("Creating dir {}".format(kube_dir))
169 1             os.makedirs(kube_dir)
170
171 1         helm_path_cache = cluster_dir + "/.cache/helm"
172 1         if create_if_not_exist and not os.path.exists(helm_path_cache):
173 1             self.log.debug("Creating dir {}".format(helm_path_cache))
174 1             os.makedirs(helm_path_cache)
175
176 1         helm_path_config = cluster_dir + "/.config/helm"
177 1         if create_if_not_exist and not os.path.exists(helm_path_config):
178 1             self.log.debug("Creating dir {}".format(helm_path_config))
179 1             os.makedirs(helm_path_config)
180
181 1         helm_path_data = cluster_dir + "/.local/share/helm"
182 1         if create_if_not_exist and not os.path.exists(helm_path_data):
183 1             self.log.debug("Creating dir {}".format(helm_path_data))
184 1             os.makedirs(helm_path_data)
185
186 1         config_filename = kube_dir + "/config"
187
188         # 2 - Prepare dictionary with paths
189 1         paths = {
190             "kube_dir": kube_dir,
191             "kube_config": config_filename,
192             "cluster_dir": cluster_dir
193         }
194
195         # 3 - Prepare environment variables
196 1         env = {
197             "HELM_CACHE_HOME": helm_path_cache,
198             "HELM_CONFIG_HOME": helm_path_config,
199             "HELM_DATA_HOME": helm_path_data,
200             "KUBECONFIG": config_filename
201         }
202
203 1         for file_name, file in paths.items():
204 1             if "dir" in file_name and not os.path.exists(file):
205 0                 err_msg = "{} dir does not exist".format(file)
206 0                 self.log.error(err_msg)
207 0                 raise K8sException(err_msg)
208
209 1         return paths, env
210
211 1     async def _get_namespaces(self,
212                               cluster_id: str):
213
214 0         self.log.debug("get namespaces cluster_id {}".format(cluster_id))
215
216         # init config, env
217 0         paths, env = self._init_paths_env(
218             cluster_name=cluster_id, create_if_not_exist=True
219         )
220
221 0         command = "{} --kubeconfig={} get namespaces -o=yaml".format(
222             self.kubectl_command, paths["kube_config"]
223         )
224 0         output, _rc = await self._local_async_exec(
225             command=command, raise_exception_on_error=True, env=env
226         )
227
228 0         data = yaml.load(output, Loader=yaml.SafeLoader)
229 0         namespaces = [item["metadata"]["name"] for item in data["items"]]
230 0         self.log.debug(f"namespaces {namespaces}")
231
232 0         return namespaces
233
234 1     async def _create_namespace(self,
235                                 cluster_id: str,
236                                 namespace: str):
237
238 0         self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
239
240         # init config, env
241 0         paths, env = self._init_paths_env(
242             cluster_name=cluster_id, create_if_not_exist=True
243         )
244
245 0         command = "{} --kubeconfig={} create namespace {}".format(
246             self.kubectl_command, paths["kube_config"], namespace
247         )
248 0         _, _rc = await self._local_async_exec(
249             command=command, raise_exception_on_error=True, env=env
250         )
251 0         self.log.debug(f"namespace {namespace} created")
252
253 0         return _rc
254
255 1     async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
256
257         # init config, env
258 1         paths, env = self._init_paths_env(
259             cluster_name=cluster_id, create_if_not_exist=True
260         )
261
262 1         command1 = "{} get manifest {} --namespace={}".format(
263             self._helm_command, kdu_instance, namespace
264         )
265 1         command2 = "{} get --namespace={} -f -".format(
266             self.kubectl_command, namespace
267         )
268 1         output, _rc = await self._local_async_exec_pipe(
269             command1, command2, env=env, raise_exception_on_error=True
270         )
271 1         services = self._parse_services(output)
272
273 1         return services
274
275 1     async def _cluster_init(self, cluster_id, namespace, paths, env):
276         """
277         Implements the helm version dependent cluster initialization:
278         For helm3 it creates the namespace if it is not created
279         """
280 1         if namespace != "kube-system":
281 1             namespaces = await self._get_namespaces(cluster_id)
282 1             if namespace not in namespaces:
283 1                 await self._create_namespace(cluster_id, namespace)
284
285         # If default repo is not included add
286 1         cluster_uuid = "{}:{}".format(namespace, cluster_id)
287 1         repo_list = await self.repo_list(cluster_uuid)
288 1         for repo in repo_list:
289 0             self.log.debug("repo")
290 0             if repo["name"] == "stable":
291 0                 self.log.debug("Default repo already present")
292 0                 break
293         else:
294 1             await self.repo_add(cluster_uuid,
295                                 "stable",
296                                 self._stable_repo_url)
297
298         # Returns False as no software needs to be uninstalled
299 1         return False
300
301 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
302         # nothing to do to uninstall sw
303 0         pass
304
305 1     async def _instances_list(self, cluster_id: str):
306
307         # init paths, env
308 1         paths, env = self._init_paths_env(
309             cluster_name=cluster_id, create_if_not_exist=True
310         )
311
312 1         command = "{} list --all-namespaces  --output yaml".format(
313             self._helm_command
314         )
315 1         output, _rc = await self._local_async_exec(
316             command=command, raise_exception_on_error=True, env=env
317         )
318
319 1         if output and len(output) > 0:
320 0             self.log.debug("instances list output: {}".format(output))
321 0             return yaml.load(output, Loader=yaml.SafeLoader)
322         else:
323 1             return []
324
325 1     def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
326                              version: str):
327 1         inspect_command = "{} show {} {}{} {}".format(
328             self._helm_command, inspect_command, kdu_model, repo_str, version
329         )
330 1         return inspect_command
331
332 1     async def _status_kdu(
333         self,
334         cluster_id: str,
335         kdu_instance: str,
336         namespace: str = None,
337         show_error_log: bool = False,
338         return_text: bool = False,
339     ):
340
341 1         self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
342
343 1         if not namespace:
344 0             namespace = "kube-system"
345
346         # init config, env
347 1         paths, env = self._init_paths_env(
348             cluster_name=cluster_id, create_if_not_exist=True
349         )
350 1         command = "{} status {} --namespace={} --output yaml".format(
351             self._helm_command, kdu_instance, namespace
352         )
353
354 1         output, rc = await self._local_async_exec(
355             command=command,
356             raise_exception_on_error=True,
357             show_error_log=show_error_log,
358             env=env
359         )
360
361 1         if return_text:
362 1             return str(output)
363
364 0         if rc != 0:
365 0             return None
366
367 0         data = yaml.load(output, Loader=yaml.SafeLoader)
368
369         # remove field 'notes' and manifest
370 0         try:
371 0             del data.get("info")["notes"]
372 0             del data["manifest"]
373 0         except KeyError:
374 0             pass
375
376         # unable to parse 'resources' as currently it is not included in helm3
377 0         return data
378
379 1     def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
380                              params_str: str, version: str, atomic: bool, timeout: float) -> str:
381
382 1         timeout_str = ""
383 1         if timeout:
384 1             timeout_str = "--timeout {}s".format(timeout)
385
386         # atomic
387 1         atomic_str = ""
388 1         if atomic:
389 1             atomic_str = "--atomic"
390         # namespace
391 1         namespace_str = ""
392 1         if namespace:
393 1             namespace_str = "--namespace {}".format(namespace)
394
395         # version
396 1         version_str = ""
397 1         if version:
398 1             version_str = "--version {}".format(version)
399
400 1         command = (
401             "{helm} install {name} {atomic} --output yaml  "
402             "{params} {timeout} {ns} {model} {ver}".format(
403                 helm=self._helm_command,
404                 name=kdu_instance,
405                 atomic=atomic_str,
406                 params=params_str,
407                 timeout=timeout_str,
408                 ns=namespace_str,
409                 model=kdu_model,
410                 ver=version_str,
411             )
412         )
413 1         return command
414
415 1     def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
416                              params_str: str, version: str, atomic: bool, timeout: float) -> str:
417
418 1         timeout_str = ""
419 1         if timeout:
420 1             timeout_str = "--timeout {}s".format(timeout)
421
422         # atomic
423 1         atomic_str = ""
424 1         if atomic:
425 1             atomic_str = "--atomic"
426
427         # version
428 1         version_str = ""
429 1         if version:
430 1             version_str = "--version {}".format(version)
431
432         # namespace
433 1         namespace_str = ""
434 1         if namespace:
435 1             namespace_str = "--namespace {}".format(namespace)
436
437 1         command = (
438             "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
439             "{timeout}  {ver}".format(
440                 helm=self._helm_command,
441                 name=kdu_instance,
442                 namespace=namespace_str,
443                 atomic=atomic_str,
444                 params=params_str,
445                 timeout=timeout_str,
446                 model=kdu_model,
447                 ver=version_str,
448             )
449         )
450 1         return command
451
452 1     def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
453 1         return "{} rollback {} {} --namespace={} --wait".format(
454             self._helm_command, kdu_instance, revision, namespace
455         )
456
457 1     def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
458
459 1         return "{} uninstall {} --namespace={}".format(
460             self._helm_command, kdu_instance, namespace)
461
462 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
463 1         repo_ids = []
464 1         cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
465 1         cluster = self.db.get_one("k8sclusters", cluster_filter)
466 1         if cluster:
467 1             repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
468 1             return repo_ids
469         else:
470 0             raise K8sException(
471                 "k8cluster with helm-id : {} not found".format(cluster_uuid)
472             )