Changed url stable repository for helm3
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 ):
46 """
47 Initializes helm connector for helm v3
48
49 :param fs: file system for kubernetes and helm configuration
50 :param db: database object to write current operation status
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param log: logger
54 :param on_update_db: callback called when k8s connector updates database
55 """
56
57 # parent class
58 K8sHelmBaseConnector.__init__(self,
59 db=db,
60 log=log,
61 fs=fs,
62 kubectl_command=kubectl_command,
63 helm_command=helm_command,
64 on_update_db=on_update_db)
65
66 self.log.info("K8S Helm3 connector initialized")
67
68 async def install(
69 self,
70 cluster_uuid: str,
71 kdu_model: str,
72 atomic: bool = True,
73 timeout: float = 300,
74 params: dict = None,
75 db_dict: dict = None,
76 kdu_name: str = None,
77 namespace: str = None,
78 ):
79 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
80 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
81
82 # sync local dir
83 self.fs.sync(from_path=cluster_id)
84
85 # init env, paths
86 paths, env = self._init_paths_env(
87 cluster_name=cluster_id, create_if_not_exist=True
88 )
89
90 # for helm3 if namespace does not exist must create it
91 if namespace and namespace != "kube-system":
92 namespaces = await self._get_namespaces(cluster_id)
93 if namespace not in namespaces:
94 await self._create_namespace(cluster_id, namespace)
95
96 kdu_instance = await self._install_impl(cluster_id,
97 kdu_model,
98 paths,
99 env,
100 atomic=atomic,
101 timeout=timeout,
102 params=params,
103 db_dict=db_dict,
104 kdu_name=kdu_name,
105 namespace=namespace)
106
107 # sync fs
108 self.fs.reverse_sync(from_path=cluster_id)
109
110 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
111 return kdu_instance
112
113 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
114
115 self.log.debug(
116 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
117 )
118
119 return await self._exec_inspect_comand(
120 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
121 )
122
123 """
124 ####################################################################################
125 ################################### P R I V A T E ##################################
126 ####################################################################################
127 """
128
129 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
130 """
131 Creates and returns base cluster and kube dirs and returns them.
132 Also created helm3 dirs according to new directory specification, paths are
133 returned and also environment variables that must be provided to execute commands
134
135 Helm 3 directory specification uses XDG categories for variable support:
136 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
137 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
138 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
139
140 The variables assigned for this paths are:
141 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
142 $HELM_PATH_DATA but looking and helm env the variable names are different)
143 - Cache: $HELM_CACHE_HOME
144 - Config: $HELM_CONFIG_HOME
145 - Data: $HELM_DATA_HOME
146 - helm kubeconfig: $KUBECONFIG
147
148 :param cluster_name: cluster_name
149 :return: Dictionary with config_paths and dictionary with helm environment variables
150 """
151
152 base = self.fs.path
153 if base.endswith("/") or base.endswith("\\"):
154 base = base[:-1]
155
156 # base dir for cluster
157 cluster_dir = base + "/" + cluster_name
158
159 # kube dir
160 kube_dir = cluster_dir + "/" + ".kube"
161 if create_if_not_exist and not os.path.exists(kube_dir):
162 self.log.debug("Creating dir {}".format(kube_dir))
163 os.makedirs(kube_dir)
164
165 helm_path_cache = cluster_dir + "/.cache/helm"
166 if create_if_not_exist and not os.path.exists(helm_path_cache):
167 self.log.debug("Creating dir {}".format(helm_path_cache))
168 os.makedirs(helm_path_cache)
169
170 helm_path_config = cluster_dir + "/.config/helm"
171 if create_if_not_exist and not os.path.exists(helm_path_config):
172 self.log.debug("Creating dir {}".format(helm_path_config))
173 os.makedirs(helm_path_config)
174
175 helm_path_data = cluster_dir + "/.local/share/helm"
176 if create_if_not_exist and not os.path.exists(helm_path_data):
177 self.log.debug("Creating dir {}".format(helm_path_data))
178 os.makedirs(helm_path_data)
179
180 config_filename = kube_dir + "/config"
181
182 # 2 - Prepare dictionary with paths
183 paths = {
184 "kube_dir": kube_dir,
185 "kube_config": config_filename,
186 "cluster_dir": cluster_dir
187 }
188
189 # 3 - Prepare environment variables
190 env = {
191 "HELM_CACHE_HOME": helm_path_cache,
192 "HELM_CONFIG_HOME": helm_path_config,
193 "HELM_DATA_HOME": helm_path_data,
194 "KUBECONFIG": config_filename
195 }
196
197 for file_name, file in paths.items():
198 if "dir" in file_name and not os.path.exists(file):
199 err_msg = "{} dir does not exist".format(file)
200 self.log.error(err_msg)
201 raise K8sException(err_msg)
202
203 return paths, env
204
205 async def _get_namespaces(self,
206 cluster_id: str):
207
208 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
209
210 # init config, env
211 paths, env = self._init_paths_env(
212 cluster_name=cluster_id, create_if_not_exist=True
213 )
214
215 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
216 self.kubectl_command, paths["kube_config"]
217 )
218 output, _rc = await self._local_async_exec(
219 command=command, raise_exception_on_error=True, env=env
220 )
221
222 data = yaml.load(output, Loader=yaml.SafeLoader)
223 namespaces = [item["metadata"]["name"] for item in data["items"]]
224 self.log.debug(f"namespaces {namespaces}")
225
226 return namespaces
227
228 async def _create_namespace(self,
229 cluster_id: str,
230 namespace: str):
231
232 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
233
234 # init config, env
235 paths, env = self._init_paths_env(
236 cluster_name=cluster_id, create_if_not_exist=True
237 )
238
239 command = "{} --kubeconfig={} create namespace {}".format(
240 self.kubectl_command, paths["kube_config"], namespace
241 )
242 _, _rc = await self._local_async_exec(
243 command=command, raise_exception_on_error=True, env=env
244 )
245 self.log.debug(f"namespace {namespace} created")
246
247 return _rc
248
249 async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
250
251 # init config, env
252 paths, env = self._init_paths_env(
253 cluster_name=cluster_id, create_if_not_exist=True
254 )
255
256 command1 = "{} get manifest {} --namespace={}".format(
257 self._helm_command, kdu_instance, namespace
258 )
259 command2 = "{} get --namespace={} -f -".format(
260 self.kubectl_command, namespace
261 )
262 output, _rc = await self._local_async_exec_pipe(
263 command1, command2, env=env, raise_exception_on_error=True
264 )
265 services = self._parse_services(output)
266
267 return services
268
269 async def _cluster_init(self, cluster_id, namespace, paths, env):
270 """
271 Implements the helm version dependent cluster initialization:
272 For helm3 it creates the namespace if it is not created
273 """
274 if namespace != "kube-system":
275 namespaces = await self._get_namespaces(cluster_id)
276 if namespace not in namespaces:
277 await self._create_namespace(cluster_id, namespace)
278
279 # If default repo is not included add
280 cluster_uuid = "{}:{}".format(namespace, cluster_id)
281 repo_list = await self.repo_list(cluster_uuid)
282 for repo in repo_list:
283 self.log.debug("repo")
284 if repo["name"] == "stable":
285 self.log.debug("Default repo already present")
286 break
287 else:
288 await self.repo_add(cluster_uuid,
289 "stable",
290 "https://charts.helm.sh/stable")
291
292 # Returns False as no software needs to be uninstalled
293 return False
294
295 async def _uninstall_sw(self, cluster_id: str, namespace: str):
296 # nothing to do to uninstall sw
297 pass
298
299 async def _instances_list(self, cluster_id: str):
300
301 # init paths, env
302 paths, env = self._init_paths_env(
303 cluster_name=cluster_id, create_if_not_exist=True
304 )
305
306 command = "{} list --all-namespaces --output yaml".format(
307 self._helm_command
308 )
309 output, _rc = await self._local_async_exec(
310 command=command, raise_exception_on_error=True, env=env
311 )
312
313 if output and len(output) > 0:
314 self.log.debug("instances list output: {}".format(output))
315 return yaml.load(output, Loader=yaml.SafeLoader)
316 else:
317 return []
318
319 def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
320 version: str):
321 inspect_command = "{} show {} {}{} {}".format(
322 self._helm_command, inspect_command, kdu_model, repo_str, version
323 )
324 return inspect_command
325
326 async def _status_kdu(
327 self,
328 cluster_id: str,
329 kdu_instance: str,
330 namespace: str = None,
331 show_error_log: bool = False,
332 return_text: bool = False,
333 ):
334
335 self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
336
337 if not namespace:
338 namespace = "kube-system"
339
340 # init config, env
341 paths, env = self._init_paths_env(
342 cluster_name=cluster_id, create_if_not_exist=True
343 )
344 command = "{} status {} --namespace={} --output yaml".format(
345 self._helm_command, kdu_instance, namespace
346 )
347
348 output, rc = await self._local_async_exec(
349 command=command,
350 raise_exception_on_error=True,
351 show_error_log=show_error_log,
352 env=env
353 )
354
355 if return_text:
356 return str(output)
357
358 if rc != 0:
359 return None
360
361 data = yaml.load(output, Loader=yaml.SafeLoader)
362
363 # remove field 'notes' and manifest
364 try:
365 del data.get("info")["notes"]
366 del data["manifest"]
367 except KeyError:
368 pass
369
370 # unable to parse 'resources' as currently it is not included in helm3
371 return data
372
373 def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
374 params_str: str, version: str, atomic: bool, timeout: float) -> str:
375
376 timeout_str = ""
377 if timeout:
378 timeout_str = "--timeout {}s".format(timeout)
379
380 # atomic
381 atomic_str = ""
382 if atomic:
383 atomic_str = "--atomic"
384 # namespace
385 namespace_str = ""
386 if namespace:
387 namespace_str = "--namespace {}".format(namespace)
388
389 # version
390 version_str = ""
391 if version:
392 version_str = "--version {}".format(version)
393
394 command = (
395 "{helm} install {name} {atomic} --output yaml "
396 "{params} {timeout} {ns} {model} {ver}".format(
397 helm=self._helm_command,
398 name=kdu_instance,
399 atomic=atomic_str,
400 params=params_str,
401 timeout=timeout_str,
402 ns=namespace_str,
403 model=kdu_model,
404 ver=version_str,
405 )
406 )
407 return command
408
409 def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
410 params_str: str, version: str, atomic: bool, timeout: float) -> str:
411
412 timeout_str = ""
413 if timeout:
414 timeout_str = "--timeout {}s".format(timeout)
415
416 # atomic
417 atomic_str = ""
418 if atomic:
419 atomic_str = "--atomic"
420
421 # version
422 version_str = ""
423 if version:
424 version_str = "--version {}".format(version)
425
426 # namespace
427 namespace_str = ""
428 if namespace:
429 namespace_str = "--namespace {}".format(namespace)
430
431 command = (
432 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
433 "{timeout} {ver}".format(
434 helm=self._helm_command,
435 name=kdu_instance,
436 namespace=namespace_str,
437 atomic=atomic_str,
438 params=params_str,
439 timeout=timeout_str,
440 model=kdu_model,
441 ver=version_str,
442 )
443 )
444 return command
445
446 def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
447 return "{} rollback {} {} --namespace={} --wait".format(
448 self._helm_command, kdu_instance, revision, namespace
449 )
450
451 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
452
453 return "{} uninstall {} --namespace={}".format(
454 self._helm_command, kdu_instance, namespace)
455
456 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
457 repo_ids = []
458 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
459 cluster = self.db.get_one("k8sclusters", cluster_filter)
460 if cluster:
461 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
462 return repo_ids
463 else:
464 raise K8sException(
465 "k8cluster with helm-id : {} not found".format(cluster_uuid)
466 )