5bbd39b63b5c4e6e5d7a0ba38b34ab368562a5f1
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 ):
46 """
47 Initializes helm connector for helm v3
48
49 :param fs: file system for kubernetes and helm configuration
50 :param db: database object to write current operation status
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param log: logger
54 :param on_update_db: callback called when k8s connector updates database
55 """
56
57 # parent class
58 K8sHelmBaseConnector.__init__(
59 self,
60 db=db,
61 log=log,
62 fs=fs,
63 kubectl_command=kubectl_command,
64 helm_command=helm_command,
65 on_update_db=on_update_db,
66 )
67
68 self.log.info("K8S Helm3 connector initialized")
69
70 async def install(
71 self,
72 cluster_uuid: str,
73 kdu_model: str,
74 kdu_instance: str,
75 atomic: bool = True,
76 timeout: float = 300,
77 params: dict = None,
78 db_dict: dict = None,
79 kdu_name: str = None,
80 namespace: str = None,
81 **kwargs,
82 ):
83 """Install a helm chart
84
85 :param cluster_uuid str: The UUID of the cluster to install to
86 :param kdu_model str: The name or path of a bundle to install
87 :param kdu_instance: Kdu instance name
88 :param atomic bool: If set, waits until the model is active and resets
89 the cluster on failure.
90 :param timeout int: The time, in seconds, to wait for the install
91 to finish
92 :param params dict: Key-value pairs of instantiation parameters
93 :param kdu_name: Name of the KDU instance to be installed
94 :param namespace: K8s namespace to use for the KDU instance
95
96 :param kwargs: Additional parameters (None yet)
97
98 :return: True if successful
99 """
100 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
101 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
102
103 # sync local dir
104 self.fs.sync(from_path=cluster_id)
105
106 # init env, paths
107 paths, env = self._init_paths_env(
108 cluster_name=cluster_id, create_if_not_exist=True
109 )
110
111 # for helm3 if namespace does not exist must create it
112 if namespace and namespace != "kube-system":
113 if not await self._namespace_exists(cluster_id, namespace):
114 try:
115 await self._create_namespace(cluster_id, namespace)
116 except Exception as e:
117 if not await self._namespace_exists(cluster_id, namespace):
118 err_msg = (
119 "namespace {} does not exist in cluster_id {} "
120 "error message: ".format(
121 namespace, e
122 )
123 )
124 self.log.error(err_msg)
125 raise K8sException(err_msg)
126
127 await self._install_impl(
128 cluster_id,
129 kdu_model,
130 paths,
131 env,
132 kdu_instance,
133 atomic=atomic,
134 timeout=timeout,
135 params=params,
136 db_dict=db_dict,
137 kdu_name=kdu_name,
138 namespace=namespace,
139 )
140
141 # sync fs
142 self.fs.reverse_sync(from_path=cluster_id)
143
144 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
145 return True
146
147 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
148
149 self.log.debug(
150 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
151 )
152
153 return await self._exec_inspect_comand(
154 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
155 )
156
157 """
158 ####################################################################################
159 ################################### P R I V A T E ##################################
160 ####################################################################################
161 """
162
163 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
164 """
165 Creates and returns base cluster and kube dirs and returns them.
166 Also created helm3 dirs according to new directory specification, paths are
167 returned and also environment variables that must be provided to execute commands
168
169 Helm 3 directory specification uses XDG categories for variable support:
170 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
171 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
172 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
173
174 The variables assigned for this paths are:
175 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
176 $HELM_PATH_DATA but looking and helm env the variable names are different)
177 - Cache: $HELM_CACHE_HOME
178 - Config: $HELM_CONFIG_HOME
179 - Data: $HELM_DATA_HOME
180 - helm kubeconfig: $KUBECONFIG
181
182 :param cluster_name: cluster_name
183 :return: Dictionary with config_paths and dictionary with helm environment variables
184 """
185
186 base = self.fs.path
187 if base.endswith("/") or base.endswith("\\"):
188 base = base[:-1]
189
190 # base dir for cluster
191 cluster_dir = base + "/" + cluster_name
192
193 # kube dir
194 kube_dir = cluster_dir + "/" + ".kube"
195 if create_if_not_exist and not os.path.exists(kube_dir):
196 self.log.debug("Creating dir {}".format(kube_dir))
197 os.makedirs(kube_dir)
198
199 helm_path_cache = cluster_dir + "/.cache/helm"
200 if create_if_not_exist and not os.path.exists(helm_path_cache):
201 self.log.debug("Creating dir {}".format(helm_path_cache))
202 os.makedirs(helm_path_cache)
203
204 helm_path_config = cluster_dir + "/.config/helm"
205 if create_if_not_exist and not os.path.exists(helm_path_config):
206 self.log.debug("Creating dir {}".format(helm_path_config))
207 os.makedirs(helm_path_config)
208
209 helm_path_data = cluster_dir + "/.local/share/helm"
210 if create_if_not_exist and not os.path.exists(helm_path_data):
211 self.log.debug("Creating dir {}".format(helm_path_data))
212 os.makedirs(helm_path_data)
213
214 config_filename = kube_dir + "/config"
215
216 # 2 - Prepare dictionary with paths
217 paths = {
218 "kube_dir": kube_dir,
219 "kube_config": config_filename,
220 "cluster_dir": cluster_dir,
221 }
222
223 # 3 - Prepare environment variables
224 env = {
225 "HELM_CACHE_HOME": helm_path_cache,
226 "HELM_CONFIG_HOME": helm_path_config,
227 "HELM_DATA_HOME": helm_path_data,
228 "KUBECONFIG": config_filename,
229 }
230
231 for file_name, file in paths.items():
232 if "dir" in file_name and not os.path.exists(file):
233 err_msg = "{} dir does not exist".format(file)
234 self.log.error(err_msg)
235 raise K8sException(err_msg)
236
237 return paths, env
238
239 async def _namespace_exists(self, cluster_id, namespace) -> bool:
240 self.log.debug(
241 "checking if namespace {} exists cluster_id {}".format(
242 namespace, cluster_id
243 )
244 )
245 namespaces = await self._get_namespaces(cluster_id)
246 return namespace in namespaces if namespaces else False
247
248 async def _get_namespaces(self, cluster_id: str):
249
250 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
251
252 # init config, env
253 paths, env = self._init_paths_env(
254 cluster_name=cluster_id, create_if_not_exist=True
255 )
256
257 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
258 self.kubectl_command, paths["kube_config"]
259 )
260 output, _rc = await self._local_async_exec(
261 command=command, raise_exception_on_error=True, env=env
262 )
263
264 data = yaml.load(output, Loader=yaml.SafeLoader)
265 namespaces = [item["metadata"]["name"] for item in data["items"]]
266 self.log.debug(f"namespaces {namespaces}")
267
268 return namespaces
269
270 async def _create_namespace(self, cluster_id: str, namespace: str):
271
272 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
273
274 # init config, env
275 paths, env = self._init_paths_env(
276 cluster_name=cluster_id, create_if_not_exist=True
277 )
278
279 command = "{} --kubeconfig={} create namespace {}".format(
280 self.kubectl_command, paths["kube_config"], namespace
281 )
282 _, _rc = await self._local_async_exec(
283 command=command, raise_exception_on_error=True, env=env
284 )
285 self.log.debug(f"namespace {namespace} created")
286
287 return _rc
288
289 async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
290
291 # init config, env
292 paths, env = self._init_paths_env(
293 cluster_name=cluster_id, create_if_not_exist=True
294 )
295
296 command1 = "{} get manifest {} --namespace={}".format(
297 self._helm_command, kdu_instance, namespace
298 )
299 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
300 output, _rc = await self._local_async_exec_pipe(
301 command1, command2, env=env, raise_exception_on_error=True
302 )
303 services = self._parse_services(output)
304
305 return services
306
307 async def _cluster_init(self, cluster_id, namespace, paths, env):
308 """
309 Implements the helm version dependent cluster initialization:
310 For helm3 it creates the namespace if it is not created
311 """
312 if namespace != "kube-system":
313 namespaces = await self._get_namespaces(cluster_id)
314 if namespace not in namespaces:
315 await self._create_namespace(cluster_id, namespace)
316
317 # If default repo is not included add
318 cluster_uuid = "{}:{}".format(namespace, cluster_id)
319 repo_list = await self.repo_list(cluster_uuid)
320 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
321 if not stable_repo and self._stable_repo_url:
322 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
323
324 # Returns False as no software needs to be uninstalled
325 return False
326
327 async def _uninstall_sw(self, cluster_id: str, namespace: str):
328 # nothing to do to uninstall sw
329 pass
330
331 async def _instances_list(self, cluster_id: str):
332
333 # init paths, env
334 paths, env = self._init_paths_env(
335 cluster_name=cluster_id, create_if_not_exist=True
336 )
337
338 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
339 output, _rc = await self._local_async_exec(
340 command=command, raise_exception_on_error=True, env=env
341 )
342
343 if output and len(output) > 0:
344 self.log.debug("instances list output: {}".format(output))
345 return yaml.load(output, Loader=yaml.SafeLoader)
346 else:
347 return []
348
349 def _get_inspect_command(
350 self, inspect_command: str, kdu_model: str, repo_str: str, version: str
351 ):
352 inspect_command = "{} show {} {}{} {}".format(
353 self._helm_command, inspect_command, kdu_model, repo_str, version
354 )
355 return inspect_command
356
357 async def _status_kdu(
358 self,
359 cluster_id: str,
360 kdu_instance: str,
361 namespace: str = None,
362 show_error_log: bool = False,
363 return_text: bool = False,
364 ):
365
366 self.log.debug(
367 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
368 )
369
370 if not namespace:
371 namespace = "kube-system"
372
373 # init config, env
374 paths, env = self._init_paths_env(
375 cluster_name=cluster_id, create_if_not_exist=True
376 )
377 command = "{} status {} --namespace={} --output yaml".format(
378 self._helm_command, kdu_instance, namespace
379 )
380
381 output, rc = await self._local_async_exec(
382 command=command,
383 raise_exception_on_error=True,
384 show_error_log=show_error_log,
385 env=env,
386 )
387
388 if return_text:
389 return str(output)
390
391 if rc != 0:
392 return None
393
394 data = yaml.load(output, Loader=yaml.SafeLoader)
395
396 # remove field 'notes' and manifest
397 try:
398 del data.get("info")["notes"]
399 del data["manifest"]
400 except KeyError:
401 pass
402
403 # unable to parse 'resources' as currently it is not included in helm3
404 return data
405
406 def _get_install_command(
407 self,
408 kdu_model: str,
409 kdu_instance: str,
410 namespace: str,
411 params_str: str,
412 version: str,
413 atomic: bool,
414 timeout: float,
415 ) -> str:
416
417 timeout_str = ""
418 if timeout:
419 timeout_str = "--timeout {}s".format(timeout)
420
421 # atomic
422 atomic_str = ""
423 if atomic:
424 atomic_str = "--atomic"
425 # namespace
426 namespace_str = ""
427 if namespace:
428 namespace_str = "--namespace {}".format(namespace)
429
430 # version
431 version_str = ""
432 if version:
433 version_str = "--version {}".format(version)
434
435 command = (
436 "{helm} install {name} {atomic} --output yaml "
437 "{params} {timeout} {ns} {model} {ver}".format(
438 helm=self._helm_command,
439 name=kdu_instance,
440 atomic=atomic_str,
441 params=params_str,
442 timeout=timeout_str,
443 ns=namespace_str,
444 model=kdu_model,
445 ver=version_str,
446 )
447 )
448 return command
449
450 def _get_upgrade_command(
451 self,
452 kdu_model: str,
453 kdu_instance: str,
454 namespace: str,
455 params_str: str,
456 version: str,
457 atomic: bool,
458 timeout: float,
459 ) -> str:
460
461 timeout_str = ""
462 if timeout:
463 timeout_str = "--timeout {}s".format(timeout)
464
465 # atomic
466 atomic_str = ""
467 if atomic:
468 atomic_str = "--atomic"
469
470 # version
471 version_str = ""
472 if version:
473 version_str = "--version {}".format(version)
474
475 # namespace
476 namespace_str = ""
477 if namespace:
478 namespace_str = "--namespace {}".format(namespace)
479
480 command = (
481 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
482 "{timeout} {ver}".format(
483 helm=self._helm_command,
484 name=kdu_instance,
485 namespace=namespace_str,
486 atomic=atomic_str,
487 params=params_str,
488 timeout=timeout_str,
489 model=kdu_model,
490 ver=version_str,
491 )
492 )
493 return command
494
495 def _get_rollback_command(
496 self, kdu_instance: str, namespace: str, revision: float
497 ) -> str:
498 return "{} rollback {} {} --namespace={} --wait".format(
499 self._helm_command, kdu_instance, revision, namespace
500 )
501
502 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
503
504 return "{} uninstall {} --namespace={}".format(
505 self._helm_command, kdu_instance, namespace
506 )
507
508 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
509 repo_ids = []
510 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
511 cluster = self.db.get_one("k8sclusters", cluster_filter)
512 if cluster:
513 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
514 return repo_ids
515 else:
516 raise K8sException(
517 "k8cluster with helm-id : {} not found".format(cluster_uuid)
518 )