7d6916821ab64b8fc0e2e29f600939f9fb964efc
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 vca_config: dict = None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(self,
60 db=db,
61 log=log,
62 fs=fs,
63 kubectl_command=kubectl_command,
64 helm_command=helm_command,
65 on_update_db=on_update_db,
66 vca_config=vca_config)
67
68 self.log.info("K8S Helm3 connector initialized")
69
70 async def install(
71 self,
72 cluster_uuid: str,
73 kdu_model: str,
74 kdu_instance: str,
75 atomic: bool = True,
76 timeout: float = 300,
77 params: dict = None,
78 db_dict: dict = None,
79 kdu_name: str = None,
80 namespace: str = None,
81 **kwargs,
82 ):
83 """Install a helm chart
84
85 :param cluster_uuid str: The UUID of the cluster to install to
86 :param kdu_model str: The name or path of a bundle to install
87 :param kdu_instance: Kdu instance name
88 :param atomic bool: If set, waits until the model is active and resets
89 the cluster on failure.
90 :param timeout int: The time, in seconds, to wait for the install
91 to finish
92 :param params dict: Key-value pairs of instantiation parameters
93 :param kdu_name: Name of the KDU instance to be installed
94 :param namespace: K8s namespace to use for the KDU instance
95
96 :param kwargs: Additional parameters (None yet)
97
98 :return: True if successful
99 """
100 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
101 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
102
103 # sync local dir
104 self.fs.sync(from_path=cluster_id)
105
106 # init env, paths
107 paths, env = self._init_paths_env(
108 cluster_name=cluster_id, create_if_not_exist=True
109 )
110
111 # for helm3 if namespace does not exist must create it
112 if namespace and namespace != "kube-system":
113 namespaces = await self._get_namespaces(cluster_id)
114 if namespace not in namespaces:
115 await self._create_namespace(cluster_id, namespace)
116
117 await self._install_impl(
118 cluster_id,
119 kdu_model,
120 paths,
121 env,
122 kdu_instance,
123 atomic=atomic,
124 timeout=timeout,
125 params=params,
126 db_dict=db_dict,
127 kdu_name=kdu_name,
128 namespace=namespace,
129 )
130
131 # sync fs
132 self.fs.reverse_sync(from_path=cluster_id)
133
134 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
135 return True
136
137 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
138
139 self.log.debug(
140 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
141 )
142
143 return await self._exec_inspect_comand(
144 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
145 )
146
147 """
148 ####################################################################################
149 ################################### P R I V A T E ##################################
150 ####################################################################################
151 """
152
153 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
154 """
155 Creates and returns base cluster and kube dirs and returns them.
156 Also created helm3 dirs according to new directory specification, paths are
157 returned and also environment variables that must be provided to execute commands
158
159 Helm 3 directory specification uses XDG categories for variable support:
160 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
161 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
162 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
163
164 The variables assigned for this paths are:
165 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
166 $HELM_PATH_DATA but looking and helm env the variable names are different)
167 - Cache: $HELM_CACHE_HOME
168 - Config: $HELM_CONFIG_HOME
169 - Data: $HELM_DATA_HOME
170 - helm kubeconfig: $KUBECONFIG
171
172 :param cluster_name: cluster_name
173 :return: Dictionary with config_paths and dictionary with helm environment variables
174 """
175
176 base = self.fs.path
177 if base.endswith("/") or base.endswith("\\"):
178 base = base[:-1]
179
180 # base dir for cluster
181 cluster_dir = base + "/" + cluster_name
182
183 # kube dir
184 kube_dir = cluster_dir + "/" + ".kube"
185 if create_if_not_exist and not os.path.exists(kube_dir):
186 self.log.debug("Creating dir {}".format(kube_dir))
187 os.makedirs(kube_dir)
188
189 helm_path_cache = cluster_dir + "/.cache/helm"
190 if create_if_not_exist and not os.path.exists(helm_path_cache):
191 self.log.debug("Creating dir {}".format(helm_path_cache))
192 os.makedirs(helm_path_cache)
193
194 helm_path_config = cluster_dir + "/.config/helm"
195 if create_if_not_exist and not os.path.exists(helm_path_config):
196 self.log.debug("Creating dir {}".format(helm_path_config))
197 os.makedirs(helm_path_config)
198
199 helm_path_data = cluster_dir + "/.local/share/helm"
200 if create_if_not_exist and not os.path.exists(helm_path_data):
201 self.log.debug("Creating dir {}".format(helm_path_data))
202 os.makedirs(helm_path_data)
203
204 config_filename = kube_dir + "/config"
205
206 # 2 - Prepare dictionary with paths
207 paths = {
208 "kube_dir": kube_dir,
209 "kube_config": config_filename,
210 "cluster_dir": cluster_dir
211 }
212
213 # 3 - Prepare environment variables
214 env = {
215 "HELM_CACHE_HOME": helm_path_cache,
216 "HELM_CONFIG_HOME": helm_path_config,
217 "HELM_DATA_HOME": helm_path_data,
218 "KUBECONFIG": config_filename
219 }
220
221 for file_name, file in paths.items():
222 if "dir" in file_name and not os.path.exists(file):
223 err_msg = "{} dir does not exist".format(file)
224 self.log.error(err_msg)
225 raise K8sException(err_msg)
226
227 return paths, env
228
229 async def _get_namespaces(self,
230 cluster_id: str):
231
232 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
233
234 # init config, env
235 paths, env = self._init_paths_env(
236 cluster_name=cluster_id, create_if_not_exist=True
237 )
238
239 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
240 self.kubectl_command, paths["kube_config"]
241 )
242 output, _rc = await self._local_async_exec(
243 command=command, raise_exception_on_error=True, env=env
244 )
245
246 data = yaml.load(output, Loader=yaml.SafeLoader)
247 namespaces = [item["metadata"]["name"] for item in data["items"]]
248 self.log.debug(f"namespaces {namespaces}")
249
250 return namespaces
251
252 async def _create_namespace(self,
253 cluster_id: str,
254 namespace: str):
255
256 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
257
258 # init config, env
259 paths, env = self._init_paths_env(
260 cluster_name=cluster_id, create_if_not_exist=True
261 )
262
263 command = "{} --kubeconfig={} create namespace {}".format(
264 self.kubectl_command, paths["kube_config"], namespace
265 )
266 _, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=True, env=env
268 )
269 self.log.debug(f"namespace {namespace} created")
270
271 return _rc
272
273 async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
274
275 # init config, env
276 paths, env = self._init_paths_env(
277 cluster_name=cluster_id, create_if_not_exist=True
278 )
279
280 command1 = "{} get manifest {} --namespace={}".format(
281 self._helm_command, kdu_instance, namespace
282 )
283 command2 = "{} get --namespace={} -f -".format(
284 self.kubectl_command, namespace
285 )
286 output, _rc = await self._local_async_exec_pipe(
287 command1, command2, env=env, raise_exception_on_error=True
288 )
289 services = self._parse_services(output)
290
291 return services
292
293 async def _cluster_init(self, cluster_id, namespace, paths, env):
294 """
295 Implements the helm version dependent cluster initialization:
296 For helm3 it creates the namespace if it is not created
297 """
298 if namespace != "kube-system":
299 namespaces = await self._get_namespaces(cluster_id)
300 if namespace not in namespaces:
301 await self._create_namespace(cluster_id, namespace)
302
303 # If default repo is not included add
304 cluster_uuid = "{}:{}".format(namespace, cluster_id)
305 repo_list = await self.repo_list(cluster_uuid)
306 for repo in repo_list:
307 self.log.debug("repo")
308 if repo["name"] == "stable":
309 self.log.debug("Default repo already present")
310 break
311 else:
312 await self.repo_add(cluster_uuid,
313 "stable",
314 self._stable_repo_url)
315
316 # Returns False as no software needs to be uninstalled
317 return False
318
319 async def _uninstall_sw(self, cluster_id: str, namespace: str):
320 # nothing to do to uninstall sw
321 pass
322
323 async def _instances_list(self, cluster_id: str):
324
325 # init paths, env
326 paths, env = self._init_paths_env(
327 cluster_name=cluster_id, create_if_not_exist=True
328 )
329
330 command = "{} list --all-namespaces --output yaml".format(
331 self._helm_command
332 )
333 output, _rc = await self._local_async_exec(
334 command=command, raise_exception_on_error=True, env=env
335 )
336
337 if output and len(output) > 0:
338 self.log.debug("instances list output: {}".format(output))
339 return yaml.load(output, Loader=yaml.SafeLoader)
340 else:
341 return []
342
343 def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
344 version: str):
345 inspect_command = "{} show {} {}{} {}".format(
346 self._helm_command, inspect_command, kdu_model, repo_str, version
347 )
348 return inspect_command
349
350 async def _status_kdu(
351 self,
352 cluster_id: str,
353 kdu_instance: str,
354 namespace: str = None,
355 show_error_log: bool = False,
356 return_text: bool = False,
357 ):
358
359 self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
360
361 if not namespace:
362 namespace = "kube-system"
363
364 # init config, env
365 paths, env = self._init_paths_env(
366 cluster_name=cluster_id, create_if_not_exist=True
367 )
368 command = "{} status {} --namespace={} --output yaml".format(
369 self._helm_command, kdu_instance, namespace
370 )
371
372 output, rc = await self._local_async_exec(
373 command=command,
374 raise_exception_on_error=True,
375 show_error_log=show_error_log,
376 env=env
377 )
378
379 if return_text:
380 return str(output)
381
382 if rc != 0:
383 return None
384
385 data = yaml.load(output, Loader=yaml.SafeLoader)
386
387 # remove field 'notes' and manifest
388 try:
389 del data.get("info")["notes"]
390 del data["manifest"]
391 except KeyError:
392 pass
393
394 # unable to parse 'resources' as currently it is not included in helm3
395 return data
396
397 def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
398 params_str: str, version: str, atomic: bool, timeout: float) -> str:
399
400 timeout_str = ""
401 if timeout:
402 timeout_str = "--timeout {}s".format(timeout)
403
404 # atomic
405 atomic_str = ""
406 if atomic:
407 atomic_str = "--atomic"
408 # namespace
409 namespace_str = ""
410 if namespace:
411 namespace_str = "--namespace {}".format(namespace)
412
413 # version
414 version_str = ""
415 if version:
416 version_str = "--version {}".format(version)
417
418 command = (
419 "{helm} install {name} {atomic} --output yaml "
420 "{params} {timeout} {ns} {model} {ver}".format(
421 helm=self._helm_command,
422 name=kdu_instance,
423 atomic=atomic_str,
424 params=params_str,
425 timeout=timeout_str,
426 ns=namespace_str,
427 model=kdu_model,
428 ver=version_str,
429 )
430 )
431 return command
432
433 def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
434 params_str: str, version: str, atomic: bool, timeout: float) -> str:
435
436 timeout_str = ""
437 if timeout:
438 timeout_str = "--timeout {}s".format(timeout)
439
440 # atomic
441 atomic_str = ""
442 if atomic:
443 atomic_str = "--atomic"
444
445 # version
446 version_str = ""
447 if version:
448 version_str = "--version {}".format(version)
449
450 # namespace
451 namespace_str = ""
452 if namespace:
453 namespace_str = "--namespace {}".format(namespace)
454
455 command = (
456 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
457 "{timeout} {ver}".format(
458 helm=self._helm_command,
459 name=kdu_instance,
460 namespace=namespace_str,
461 atomic=atomic_str,
462 params=params_str,
463 timeout=timeout_str,
464 model=kdu_model,
465 ver=version_str,
466 )
467 )
468 return command
469
470 def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
471 return "{} rollback {} {} --namespace={} --wait".format(
472 self._helm_command, kdu_instance, revision, namespace
473 )
474
475 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
476
477 return "{} uninstall {} --namespace={}".format(
478 self._helm_command, kdu_instance, namespace)
479
480 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
481 repo_ids = []
482 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
483 cluster = self.db.get_one("k8sclusters", cluster_filter)
484 if cluster:
485 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
486 return repo_ids
487 else:
488 raise K8sException(
489 "k8cluster with helm-id : {} not found".format(cluster_uuid)
490 )