06e57887b2e4e7c251ee15976945e63235bd6e90
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 vca_config: dict = None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 vca_config=vca_config,
68 )
69
70 self.log.info("K8S Helm3 connector initialized")
71
72 async def install(
73 self,
74 cluster_uuid: str,
75 kdu_model: str,
76 kdu_instance: str,
77 atomic: bool = True,
78 timeout: float = 300,
79 params: dict = None,
80 db_dict: dict = None,
81 kdu_name: str = None,
82 namespace: str = None,
83 **kwargs,
84 ):
85 """Install a helm chart
86
87 :param cluster_uuid str: The UUID of the cluster to install to
88 :param kdu_model str: The name or path of a bundle to install
89 :param kdu_instance: Kdu instance name
90 :param atomic bool: If set, waits until the model is active and resets
91 the cluster on failure.
92 :param timeout int: The time, in seconds, to wait for the install
93 to finish
94 :param params dict: Key-value pairs of instantiation parameters
95 :param kdu_name: Name of the KDU instance to be installed
96 :param namespace: K8s namespace to use for the KDU instance
97
98 :param kwargs: Additional parameters (None yet)
99
100 :return: True if successful
101 """
102 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
103 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
104
105 # sync local dir
106 self.fs.sync(from_path=cluster_id)
107
108 # init env, paths
109 paths, env = self._init_paths_env(
110 cluster_name=cluster_id, create_if_not_exist=True
111 )
112
113 # for helm3 if namespace does not exist must create it
114 if namespace and namespace != "kube-system":
115 namespaces = await self._get_namespaces(cluster_id)
116 if namespace not in namespaces:
117 await self._create_namespace(cluster_id, namespace)
118
119 await self._install_impl(
120 cluster_id,
121 kdu_model,
122 paths,
123 env,
124 kdu_instance,
125 atomic=atomic,
126 timeout=timeout,
127 params=params,
128 db_dict=db_dict,
129 kdu_name=kdu_name,
130 namespace=namespace,
131 )
132
133 # sync fs
134 self.fs.reverse_sync(from_path=cluster_id)
135
136 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
137 return True
138
139 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
140
141 self.log.debug(
142 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
143 )
144
145 return await self._exec_inspect_comand(
146 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
147 )
148
149 """
150 ####################################################################################
151 ################################### P R I V A T E ##################################
152 ####################################################################################
153 """
154
155 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
156 """
157 Creates and returns base cluster and kube dirs and returns them.
158 Also created helm3 dirs according to new directory specification, paths are
159 returned and also environment variables that must be provided to execute commands
160
161 Helm 3 directory specification uses XDG categories for variable support:
162 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
163 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
164 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
165
166 The variables assigned for this paths are:
167 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
168 $HELM_PATH_DATA but looking and helm env the variable names are different)
169 - Cache: $HELM_CACHE_HOME
170 - Config: $HELM_CONFIG_HOME
171 - Data: $HELM_DATA_HOME
172 - helm kubeconfig: $KUBECONFIG
173
174 :param cluster_name: cluster_name
175 :return: Dictionary with config_paths and dictionary with helm environment variables
176 """
177
178 base = self.fs.path
179 if base.endswith("/") or base.endswith("\\"):
180 base = base[:-1]
181
182 # base dir for cluster
183 cluster_dir = base + "/" + cluster_name
184
185 # kube dir
186 kube_dir = cluster_dir + "/" + ".kube"
187 if create_if_not_exist and not os.path.exists(kube_dir):
188 self.log.debug("Creating dir {}".format(kube_dir))
189 os.makedirs(kube_dir)
190
191 helm_path_cache = cluster_dir + "/.cache/helm"
192 if create_if_not_exist and not os.path.exists(helm_path_cache):
193 self.log.debug("Creating dir {}".format(helm_path_cache))
194 os.makedirs(helm_path_cache)
195
196 helm_path_config = cluster_dir + "/.config/helm"
197 if create_if_not_exist and not os.path.exists(helm_path_config):
198 self.log.debug("Creating dir {}".format(helm_path_config))
199 os.makedirs(helm_path_config)
200
201 helm_path_data = cluster_dir + "/.local/share/helm"
202 if create_if_not_exist and not os.path.exists(helm_path_data):
203 self.log.debug("Creating dir {}".format(helm_path_data))
204 os.makedirs(helm_path_data)
205
206 config_filename = kube_dir + "/config"
207
208 # 2 - Prepare dictionary with paths
209 paths = {
210 "kube_dir": kube_dir,
211 "kube_config": config_filename,
212 "cluster_dir": cluster_dir,
213 }
214
215 # 3 - Prepare environment variables
216 env = {
217 "HELM_CACHE_HOME": helm_path_cache,
218 "HELM_CONFIG_HOME": helm_path_config,
219 "HELM_DATA_HOME": helm_path_data,
220 "KUBECONFIG": config_filename,
221 }
222
223 for file_name, file in paths.items():
224 if "dir" in file_name and not os.path.exists(file):
225 err_msg = "{} dir does not exist".format(file)
226 self.log.error(err_msg)
227 raise K8sException(err_msg)
228
229 return paths, env
230
231 async def _get_namespaces(self, cluster_id: str):
232
233 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
234
235 # init config, env
236 paths, env = self._init_paths_env(
237 cluster_name=cluster_id, create_if_not_exist=True
238 )
239
240 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
241 self.kubectl_command, paths["kube_config"]
242 )
243 output, _rc = await self._local_async_exec(
244 command=command, raise_exception_on_error=True, env=env
245 )
246
247 data = yaml.load(output, Loader=yaml.SafeLoader)
248 namespaces = [item["metadata"]["name"] for item in data["items"]]
249 self.log.debug(f"namespaces {namespaces}")
250
251 return namespaces
252
253 async def _create_namespace(self, cluster_id: str, namespace: str):
254
255 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
256
257 # init config, env
258 paths, env = self._init_paths_env(
259 cluster_name=cluster_id, create_if_not_exist=True
260 )
261
262 command = "{} --kubeconfig={} create namespace {}".format(
263 self.kubectl_command, paths["kube_config"], namespace
264 )
265 _, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268 self.log.debug(f"namespace {namespace} created")
269
270 return _rc
271
272 async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
273
274 # init config, env
275 paths, env = self._init_paths_env(
276 cluster_name=cluster_id, create_if_not_exist=True
277 )
278
279 command1 = "{} get manifest {} --namespace={}".format(
280 self._helm_command, kdu_instance, namespace
281 )
282 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
283 output, _rc = await self._local_async_exec_pipe(
284 command1, command2, env=env, raise_exception_on_error=True
285 )
286 services = self._parse_services(output)
287
288 return services
289
290 async def _cluster_init(self, cluster_id, namespace, paths, env):
291 """
292 Implements the helm version dependent cluster initialization:
293 For helm3 it creates the namespace if it is not created
294 """
295 if namespace != "kube-system":
296 namespaces = await self._get_namespaces(cluster_id)
297 if namespace not in namespaces:
298 await self._create_namespace(cluster_id, namespace)
299
300 # If default repo is not included add
301 cluster_uuid = "{}:{}".format(namespace, cluster_id)
302 repo_list = await self.repo_list(cluster_uuid)
303 for repo in repo_list:
304 self.log.debug("repo")
305 if repo["name"] == "stable":
306 self.log.debug("Default repo already present")
307 break
308 else:
309 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
310
311 # Returns False as no software needs to be uninstalled
312 return False
313
314 async def _uninstall_sw(self, cluster_id: str, namespace: str):
315 # nothing to do to uninstall sw
316 pass
317
318 async def _instances_list(self, cluster_id: str):
319
320 # init paths, env
321 paths, env = self._init_paths_env(
322 cluster_name=cluster_id, create_if_not_exist=True
323 )
324
325 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
326 output, _rc = await self._local_async_exec(
327 command=command, raise_exception_on_error=True, env=env
328 )
329
330 if output and len(output) > 0:
331 self.log.debug("instances list output: {}".format(output))
332 return yaml.load(output, Loader=yaml.SafeLoader)
333 else:
334 return []
335
336 def _get_inspect_command(
337 self, inspect_command: str, kdu_model: str, repo_str: str, version: str
338 ):
339 inspect_command = "{} show {} {}{} {}".format(
340 self._helm_command, inspect_command, kdu_model, repo_str, version
341 )
342 return inspect_command
343
344 async def _status_kdu(
345 self,
346 cluster_id: str,
347 kdu_instance: str,
348 namespace: str = None,
349 show_error_log: bool = False,
350 return_text: bool = False,
351 ):
352
353 self.log.debug(
354 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
355 )
356
357 if not namespace:
358 namespace = "kube-system"
359
360 # init config, env
361 paths, env = self._init_paths_env(
362 cluster_name=cluster_id, create_if_not_exist=True
363 )
364 command = "{} status {} --namespace={} --output yaml".format(
365 self._helm_command, kdu_instance, namespace
366 )
367
368 output, rc = await self._local_async_exec(
369 command=command,
370 raise_exception_on_error=True,
371 show_error_log=show_error_log,
372 env=env,
373 )
374
375 if return_text:
376 return str(output)
377
378 if rc != 0:
379 return None
380
381 data = yaml.load(output, Loader=yaml.SafeLoader)
382
383 # remove field 'notes' and manifest
384 try:
385 del data.get("info")["notes"]
386 del data["manifest"]
387 except KeyError:
388 pass
389
390 # unable to parse 'resources' as currently it is not included in helm3
391 return data
392
393 def _get_install_command(
394 self,
395 kdu_model: str,
396 kdu_instance: str,
397 namespace: str,
398 params_str: str,
399 version: str,
400 atomic: bool,
401 timeout: float,
402 ) -> str:
403
404 timeout_str = ""
405 if timeout:
406 timeout_str = "--timeout {}s".format(timeout)
407
408 # atomic
409 atomic_str = ""
410 if atomic:
411 atomic_str = "--atomic"
412 # namespace
413 namespace_str = ""
414 if namespace:
415 namespace_str = "--namespace {}".format(namespace)
416
417 # version
418 version_str = ""
419 if version:
420 version_str = "--version {}".format(version)
421
422 command = (
423 "{helm} install {name} {atomic} --output yaml "
424 "{params} {timeout} {ns} {model} {ver}".format(
425 helm=self._helm_command,
426 name=kdu_instance,
427 atomic=atomic_str,
428 params=params_str,
429 timeout=timeout_str,
430 ns=namespace_str,
431 model=kdu_model,
432 ver=version_str,
433 )
434 )
435 return command
436
437 def _get_upgrade_command(
438 self,
439 kdu_model: str,
440 kdu_instance: str,
441 namespace: str,
442 params_str: str,
443 version: str,
444 atomic: bool,
445 timeout: float,
446 ) -> str:
447
448 timeout_str = ""
449 if timeout:
450 timeout_str = "--timeout {}s".format(timeout)
451
452 # atomic
453 atomic_str = ""
454 if atomic:
455 atomic_str = "--atomic"
456
457 # version
458 version_str = ""
459 if version:
460 version_str = "--version {}".format(version)
461
462 # namespace
463 namespace_str = ""
464 if namespace:
465 namespace_str = "--namespace {}".format(namespace)
466
467 command = (
468 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
469 "{timeout} {ver}".format(
470 helm=self._helm_command,
471 name=kdu_instance,
472 namespace=namespace_str,
473 atomic=atomic_str,
474 params=params_str,
475 timeout=timeout_str,
476 model=kdu_model,
477 ver=version_str,
478 )
479 )
480 return command
481
482 def _get_rollback_command(
483 self, kdu_instance: str, namespace: str, revision: float
484 ) -> str:
485 return "{} rollback {} {} --namespace={} --wait".format(
486 self._helm_command, kdu_instance, revision, namespace
487 )
488
489 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
490
491 return "{} uninstall {} --namespace={}".format(
492 self._helm_command, kdu_instance, namespace
493 )
494
495 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
496 repo_ids = []
497 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
498 cluster = self.db.get_one("k8sclusters", cluster_filter)
499 if cluster:
500 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
501 return repo_ids
502 else:
503 raise K8sException(
504 "k8cluster with helm-id : {} not found".format(cluster_uuid)
505 )