Fix Bug 1575
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 vca_config: dict = None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(self,
60 db=db,
61 log=log,
62 fs=fs,
63 kubectl_command=kubectl_command,
64 helm_command=helm_command,
65 on_update_db=on_update_db,
66 vca_config=vca_config)
67
68 self.log.info("K8S Helm3 connector initialized")
69
70 async def install(
71 self,
72 cluster_uuid: str,
73 kdu_model: str,
74 kdu_instance: str,
75 atomic: bool = True,
76 timeout: float = 300,
77 params: dict = None,
78 db_dict: dict = None,
79 kdu_name: str = None,
80 namespace: str = None,
81 ):
82 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
83 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
84
85 # sync local dir
86 self.fs.sync(from_path=cluster_id)
87
88 # init env, paths
89 paths, env = self._init_paths_env(
90 cluster_name=cluster_id, create_if_not_exist=True
91 )
92
93 # for helm3 if namespace does not exist must create it
94 if namespace and namespace != "kube-system":
95 if not await self._namespace_exists(cluster_id, namespace):
96 try:
97 await self._create_namespace(cluster_id, namespace)
98 except Exception as e:
99 if not await self._namespace_exists(cluster_id, namespace):
100 err_msg = (
101 "namespace {} does not exist in cluster_id {} "
102 "error message: ".format(
103 namespace, e
104 )
105 )
106 self.log.error(err_msg)
107 raise K8sException(err_msg)
108
109 await self._install_impl(
110 cluster_id,
111 kdu_model,
112 paths,
113 env,
114 kdu_instance,
115 atomic=atomic,
116 timeout=timeout,
117 params=params,
118 db_dict=db_dict,
119 kdu_name=kdu_name,
120 namespace=namespace,
121 )
122
123 # sync fs
124 self.fs.reverse_sync(from_path=cluster_id)
125
126 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
127 return True
128
129 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
130
131 self.log.debug(
132 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
133 )
134
135 return await self._exec_inspect_comand(
136 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
137 )
138
139 """
140 ####################################################################################
141 ################################### P R I V A T E ##################################
142 ####################################################################################
143 """
144
145 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
146 """
147 Creates and returns base cluster and kube dirs and returns them.
148 Also created helm3 dirs according to new directory specification, paths are
149 returned and also environment variables that must be provided to execute commands
150
151 Helm 3 directory specification uses XDG categories for variable support:
152 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
153 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
154 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
155
156 The variables assigned for this paths are:
157 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
158 $HELM_PATH_DATA but looking and helm env the variable names are different)
159 - Cache: $HELM_CACHE_HOME
160 - Config: $HELM_CONFIG_HOME
161 - Data: $HELM_DATA_HOME
162 - helm kubeconfig: $KUBECONFIG
163
164 :param cluster_name: cluster_name
165 :return: Dictionary with config_paths and dictionary with helm environment variables
166 """
167
168 base = self.fs.path
169 if base.endswith("/") or base.endswith("\\"):
170 base = base[:-1]
171
172 # base dir for cluster
173 cluster_dir = base + "/" + cluster_name
174
175 # kube dir
176 kube_dir = cluster_dir + "/" + ".kube"
177 if create_if_not_exist and not os.path.exists(kube_dir):
178 self.log.debug("Creating dir {}".format(kube_dir))
179 os.makedirs(kube_dir)
180
181 helm_path_cache = cluster_dir + "/.cache/helm"
182 if create_if_not_exist and not os.path.exists(helm_path_cache):
183 self.log.debug("Creating dir {}".format(helm_path_cache))
184 os.makedirs(helm_path_cache)
185
186 helm_path_config = cluster_dir + "/.config/helm"
187 if create_if_not_exist and not os.path.exists(helm_path_config):
188 self.log.debug("Creating dir {}".format(helm_path_config))
189 os.makedirs(helm_path_config)
190
191 helm_path_data = cluster_dir + "/.local/share/helm"
192 if create_if_not_exist and not os.path.exists(helm_path_data):
193 self.log.debug("Creating dir {}".format(helm_path_data))
194 os.makedirs(helm_path_data)
195
196 config_filename = kube_dir + "/config"
197
198 # 2 - Prepare dictionary with paths
199 paths = {
200 "kube_dir": kube_dir,
201 "kube_config": config_filename,
202 "cluster_dir": cluster_dir
203 }
204
205 # 3 - Prepare environment variables
206 env = {
207 "HELM_CACHE_HOME": helm_path_cache,
208 "HELM_CONFIG_HOME": helm_path_config,
209 "HELM_DATA_HOME": helm_path_data,
210 "KUBECONFIG": config_filename
211 }
212
213 for file_name, file in paths.items():
214 if "dir" in file_name and not os.path.exists(file):
215 err_msg = "{} dir does not exist".format(file)
216 self.log.error(err_msg)
217 raise K8sException(err_msg)
218
219 return paths, env
220
221 async def _namespace_exists(self, cluster_id, namespace) -> bool:
222 self.log.debug(
223 "checking if namespace {} exists cluster_id {}".format(
224 namespace, cluster_id
225 )
226 )
227 namespaces = await self._get_namespaces(cluster_id)
228 return namespace in namespaces if namespaces else False
229
230 async def _get_namespaces(self, cluster_id: str):
231
232 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
233
234 # init config, env
235 paths, env = self._init_paths_env(
236 cluster_name=cluster_id, create_if_not_exist=True
237 )
238
239 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
240 self.kubectl_command, paths["kube_config"]
241 )
242 output, _rc = await self._local_async_exec(
243 command=command, raise_exception_on_error=True, env=env
244 )
245
246 data = yaml.load(output, Loader=yaml.SafeLoader)
247 namespaces = [item["metadata"]["name"] for item in data["items"]]
248 self.log.debug(f"namespaces {namespaces}")
249
250 return namespaces
251
252 async def _create_namespace(self,
253 cluster_id: str,
254 namespace: str):
255
256 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
257
258 # init config, env
259 paths, env = self._init_paths_env(
260 cluster_name=cluster_id, create_if_not_exist=True
261 )
262
263 command = "{} --kubeconfig={} create namespace {}".format(
264 self.kubectl_command, paths["kube_config"], namespace
265 )
266 _, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=True, env=env
268 )
269 self.log.debug(f"namespace {namespace} created")
270
271 return _rc
272
273 async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
274
275 # init config, env
276 paths, env = self._init_paths_env(
277 cluster_name=cluster_id, create_if_not_exist=True
278 )
279
280 command1 = "{} get manifest {} --namespace={}".format(
281 self._helm_command, kdu_instance, namespace
282 )
283 command2 = "{} get --namespace={} -f -".format(
284 self.kubectl_command, namespace
285 )
286 output, _rc = await self._local_async_exec_pipe(
287 command1, command2, env=env, raise_exception_on_error=True
288 )
289 services = self._parse_services(output)
290
291 return services
292
293 async def _cluster_init(self, cluster_id, namespace, paths, env):
294 """
295 Implements the helm version dependent cluster initialization:
296 For helm3 it creates the namespace if it is not created
297 """
298 if namespace != "kube-system":
299 namespaces = await self._get_namespaces(cluster_id)
300 if namespace not in namespaces:
301 await self._create_namespace(cluster_id, namespace)
302
303 # If default repo is not included add
304 cluster_uuid = "{}:{}".format(namespace, cluster_id)
305 repo_list = await self.repo_list(cluster_uuid)
306 for repo in repo_list:
307 self.log.debug("repo")
308 if repo["name"] == "stable":
309 self.log.debug("Default repo already present")
310 break
311 else:
312 await self.repo_add(cluster_uuid,
313 "stable",
314 self._stable_repo_url)
315
316 # Returns False as no software needs to be uninstalled
317 return False
318
319 async def _uninstall_sw(self, cluster_id: str, namespace: str):
320 # nothing to do to uninstall sw
321 pass
322
323 async def _instances_list(self, cluster_id: str):
324
325 # init paths, env
326 paths, env = self._init_paths_env(
327 cluster_name=cluster_id, create_if_not_exist=True
328 )
329
330 command = "{} list --all-namespaces --output yaml".format(
331 self._helm_command
332 )
333 output, _rc = await self._local_async_exec(
334 command=command, raise_exception_on_error=True, env=env
335 )
336
337 if output and len(output) > 0:
338 self.log.debug("instances list output: {}".format(output))
339 return yaml.load(output, Loader=yaml.SafeLoader)
340 else:
341 return []
342
343 def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
344 version: str):
345 inspect_command = "{} show {} {}{} {}".format(
346 self._helm_command, inspect_command, kdu_model, repo_str, version
347 )
348 return inspect_command
349
350 async def _status_kdu(
351 self,
352 cluster_id: str,
353 kdu_instance: str,
354 namespace: str = None,
355 show_error_log: bool = False,
356 return_text: bool = False,
357 ):
358
359 self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
360
361 if not namespace:
362 namespace = "kube-system"
363
364 # init config, env
365 paths, env = self._init_paths_env(
366 cluster_name=cluster_id, create_if_not_exist=True
367 )
368 command = "{} status {} --namespace={} --output yaml".format(
369 self._helm_command, kdu_instance, namespace
370 )
371
372 output, rc = await self._local_async_exec(
373 command=command,
374 raise_exception_on_error=True,
375 show_error_log=show_error_log,
376 env=env
377 )
378
379 if return_text:
380 return str(output)
381
382 if rc != 0:
383 return None
384
385 data = yaml.load(output, Loader=yaml.SafeLoader)
386
387 # remove field 'notes' and manifest
388 try:
389 del data.get("info")["notes"]
390 del data["manifest"]
391 except KeyError:
392 pass
393
394 # unable to parse 'resources' as currently it is not included in helm3
395 return data
396
397 def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
398 params_str: str, version: str, atomic: bool, timeout: float) -> str:
399
400 timeout_str = ""
401 if timeout:
402 timeout_str = "--timeout {}s".format(timeout)
403
404 # atomic
405 atomic_str = ""
406 if atomic:
407 atomic_str = "--atomic"
408 # namespace
409 namespace_str = ""
410 if namespace:
411 namespace_str = "--namespace {}".format(namespace)
412
413 # version
414 version_str = ""
415 if version:
416 version_str = "--version {}".format(version)
417
418 command = (
419 "{helm} install {name} {atomic} --output yaml "
420 "{params} {timeout} {ns} {model} {ver}".format(
421 helm=self._helm_command,
422 name=kdu_instance,
423 atomic=atomic_str,
424 params=params_str,
425 timeout=timeout_str,
426 ns=namespace_str,
427 model=kdu_model,
428 ver=version_str,
429 )
430 )
431 return command
432
433 def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
434 params_str: str, version: str, atomic: bool, timeout: float) -> str:
435
436 timeout_str = ""
437 if timeout:
438 timeout_str = "--timeout {}s".format(timeout)
439
440 # atomic
441 atomic_str = ""
442 if atomic:
443 atomic_str = "--atomic"
444
445 # version
446 version_str = ""
447 if version:
448 version_str = "--version {}".format(version)
449
450 # namespace
451 namespace_str = ""
452 if namespace:
453 namespace_str = "--namespace {}".format(namespace)
454
455 command = (
456 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
457 "{timeout} {ver}".format(
458 helm=self._helm_command,
459 name=kdu_instance,
460 namespace=namespace_str,
461 atomic=atomic_str,
462 params=params_str,
463 timeout=timeout_str,
464 model=kdu_model,
465 ver=version_str,
466 )
467 )
468 return command
469
470 def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
471 return "{} rollback {} {} --namespace={} --wait".format(
472 self._helm_command, kdu_instance, revision, namespace
473 )
474
475 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
476
477 return "{} uninstall {} --namespace={}".format(
478 self._helm_command, kdu_instance, namespace)
479
480 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
481 repo_ids = []
482 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
483 cluster = self.db.get_one("k8sclusters", cluster_filter)
484 if cluster:
485 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
486 return repo_ids
487 else:
488 raise K8sException(
489 "k8cluster with helm-id : {} not found".format(cluster_uuid)
490 )