107f95486c68973351a9ab789ed5af2816c0262c
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm3",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("K8S Helm3 connector initialized")
70
71 async def install(
72 self,
73 cluster_uuid: str,
74 kdu_model: str,
75 kdu_instance: str,
76 atomic: bool = True,
77 timeout: float = 300,
78 params: dict = None,
79 db_dict: dict = None,
80 kdu_name: str = None,
81 namespace: str = None,
82 **kwargs,
83 ):
84 """Install a helm chart
85
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
88 of these options:
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
97 to finish
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
101
102 :param kwargs: Additional parameters (None yet)
103
104 :return: True if successful
105 """
106
107 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
108
109 # sync local dir
110 self.fs.sync(from_path=cluster_uuid)
111
112 # init env, paths
113 paths, env = self._init_paths_env(
114 cluster_name=cluster_uuid, create_if_not_exist=True
115 )
116
117 # for helm3 if namespace does not exist must create it
118 if namespace and namespace != "kube-system":
119 if not await self._namespace_exists(cluster_uuid, namespace):
120 try:
121 await self._create_namespace(cluster_uuid, namespace)
122 except Exception as e:
123 if not await self._namespace_exists(cluster_uuid, namespace):
124 err_msg = (
125 "namespace {} does not exist in cluster_id {} "
126 "error message: ".format(namespace, e)
127 )
128 self.log.error(err_msg)
129 raise K8sException(err_msg)
130
131 await self._install_impl(
132 cluster_uuid,
133 kdu_model,
134 paths,
135 env,
136 kdu_instance,
137 atomic=atomic,
138 timeout=timeout,
139 params=params,
140 db_dict=db_dict,
141 kdu_name=kdu_name,
142 namespace=namespace,
143 )
144
145 # sync fs
146 self.fs.reverse_sync(from_path=cluster_uuid)
147
148 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
149 return True
150
151 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
152
153 self.log.debug(
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
155 )
156
157 return await self._exec_inspect_command(
158 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
159 )
160
161 """
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
165 """
166
167 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
168 """
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
172
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
177
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
185
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
188 """
189
190 base = self.fs.path
191 if base.endswith("/") or base.endswith("\\"):
192 base = base[:-1]
193
194 # base dir for cluster
195 cluster_dir = base + "/" + cluster_name
196
197 # kube dir
198 kube_dir = cluster_dir + "/" + ".kube"
199 if create_if_not_exist and not os.path.exists(kube_dir):
200 self.log.debug("Creating dir {}".format(kube_dir))
201 os.makedirs(kube_dir)
202
203 helm_path_cache = cluster_dir + "/.cache/helm"
204 if create_if_not_exist and not os.path.exists(helm_path_cache):
205 self.log.debug("Creating dir {}".format(helm_path_cache))
206 os.makedirs(helm_path_cache)
207
208 helm_path_config = cluster_dir + "/.config/helm"
209 if create_if_not_exist and not os.path.exists(helm_path_config):
210 self.log.debug("Creating dir {}".format(helm_path_config))
211 os.makedirs(helm_path_config)
212
213 helm_path_data = cluster_dir + "/.local/share/helm"
214 if create_if_not_exist and not os.path.exists(helm_path_data):
215 self.log.debug("Creating dir {}".format(helm_path_data))
216 os.makedirs(helm_path_data)
217
218 config_filename = kube_dir + "/config"
219
220 # 2 - Prepare dictionary with paths
221 paths = {
222 "kube_dir": kube_dir,
223 "kube_config": config_filename,
224 "cluster_dir": cluster_dir,
225 }
226
227 # 3 - Prepare environment variables
228 env = {
229 "HELM_CACHE_HOME": helm_path_cache,
230 "HELM_CONFIG_HOME": helm_path_config,
231 "HELM_DATA_HOME": helm_path_data,
232 "KUBECONFIG": config_filename,
233 }
234
235 for file_name, file in paths.items():
236 if "dir" in file_name and not os.path.exists(file):
237 err_msg = "{} dir does not exist".format(file)
238 self.log.error(err_msg)
239 raise K8sException(err_msg)
240
241 return paths, env
242
243 async def _namespace_exists(self, cluster_id, namespace) -> bool:
244 self.log.debug(
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace, cluster_id
247 )
248 )
249 namespaces = await self._get_namespaces(cluster_id)
250 return namespace in namespaces if namespaces else False
251
252 async def _get_namespaces(self, cluster_id: str):
253
254 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
255
256 # init config, env
257 paths, env = self._init_paths_env(
258 cluster_name=cluster_id, create_if_not_exist=True
259 )
260
261 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
262 self.kubectl_command, paths["kube_config"]
263 )
264 output, _rc = await self._local_async_exec(
265 command=command, raise_exception_on_error=True, env=env
266 )
267
268 data = yaml.load(output, Loader=yaml.SafeLoader)
269 namespaces = [item["metadata"]["name"] for item in data["items"]]
270 self.log.debug(f"namespaces {namespaces}")
271
272 return namespaces
273
274 async def _create_namespace(self, cluster_id: str, namespace: str):
275
276 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
277
278 # init config, env
279 paths, env = self._init_paths_env(
280 cluster_name=cluster_id, create_if_not_exist=True
281 )
282
283 command = "{} --kubeconfig={} create namespace {}".format(
284 self.kubectl_command, paths["kube_config"], namespace
285 )
286 _, _rc = await self._local_async_exec(
287 command=command, raise_exception_on_error=True, env=env
288 )
289 self.log.debug(f"namespace {namespace} created")
290
291 return _rc
292
293 async def _get_services(
294 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
295 ):
296
297 # init config, env
298 paths, env = self._init_paths_env(
299 cluster_name=cluster_id, create_if_not_exist=True
300 )
301
302 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
303 kubeconfig, self._helm_command, kdu_instance, namespace
304 )
305 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
306 output, _rc = await self._local_async_exec_pipe(
307 command1, command2, env=env, raise_exception_on_error=True
308 )
309 services = self._parse_services(output)
310
311 return services
312
313 async def _cluster_init(self, cluster_id, namespace, paths, env):
314 """
315 Implements the helm version dependent cluster initialization:
316 For helm3 it creates the namespace if it is not created
317 """
318 if namespace != "kube-system":
319 namespaces = await self._get_namespaces(cluster_id)
320 if namespace not in namespaces:
321 await self._create_namespace(cluster_id, namespace)
322
323 repo_list = await self.repo_list(cluster_id)
324 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
325 if not stable_repo and self._stable_repo_url:
326 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
327
328 # Returns False as no software needs to be uninstalled
329 return False
330
331 async def _uninstall_sw(self, cluster_id: str, namespace: str):
332 # nothing to do to uninstall sw
333 pass
334
335 async def _instances_list(self, cluster_id: str):
336
337 # init paths, env
338 paths, env = self._init_paths_env(
339 cluster_name=cluster_id, create_if_not_exist=True
340 )
341
342 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
343 output, _rc = await self._local_async_exec(
344 command=command, raise_exception_on_error=True, env=env
345 )
346
347 if output and len(output) > 0:
348 self.log.debug("instances list output: {}".format(output))
349 return yaml.load(output, Loader=yaml.SafeLoader)
350 else:
351 return []
352
353 def _get_inspect_command(
354 self, inspect_command: str, kdu_model: str, repo_str: str, version: str
355 ):
356 inspect_command = "{} show {} {}{} {}".format(
357 self._helm_command, inspect_command, kdu_model, repo_str, version
358 )
359 return inspect_command
360
361 def _get_get_command(
362 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
363 ):
364 get_command = (
365 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
366 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
367 )
368 )
369 return get_command
370
371 async def _status_kdu(
372 self,
373 cluster_id: str,
374 kdu_instance: str,
375 namespace: str = None,
376 yaml_format: bool = False,
377 show_error_log: bool = False,
378 ) -> Union[str, dict]:
379
380 self.log.debug(
381 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
382 )
383
384 if not namespace:
385 namespace = "kube-system"
386
387 # init config, env
388 paths, env = self._init_paths_env(
389 cluster_name=cluster_id, create_if_not_exist=True
390 )
391 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
392 paths["kube_config"], self._helm_command, kdu_instance, namespace
393 )
394
395 output, rc = await self._local_async_exec(
396 command=command,
397 raise_exception_on_error=True,
398 show_error_log=show_error_log,
399 env=env,
400 )
401
402 if yaml_format:
403 return str(output)
404
405 if rc != 0:
406 return None
407
408 data = yaml.load(output, Loader=yaml.SafeLoader)
409
410 # remove field 'notes' and manifest
411 try:
412 del data.get("info")["notes"]
413 except KeyError:
414 pass
415
416 # parse the manifest to a list of dictionaries
417 if "manifest" in data:
418 manifest_str = data.get("manifest")
419 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
420
421 data["manifest"] = []
422 for doc in manifest_docs:
423 data["manifest"].append(doc)
424
425 return data
426
427 def _get_install_command(
428 self,
429 kdu_model: str,
430 kdu_instance: str,
431 namespace: str,
432 params_str: str,
433 version: str,
434 atomic: bool,
435 timeout: float,
436 kubeconfig: str,
437 ) -> str:
438
439 timeout_str = ""
440 if timeout:
441 timeout_str = "--timeout {}s".format(timeout)
442
443 # atomic
444 atomic_str = ""
445 if atomic:
446 atomic_str = "--atomic"
447 # namespace
448 namespace_str = ""
449 if namespace:
450 namespace_str = "--namespace {}".format(namespace)
451
452 # version
453 version_str = ""
454 if version:
455 version_str = "--version {}".format(version)
456
457 command = (
458 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
459 "{params} {timeout} {ns} {model} {ver}".format(
460 kubeconfig=kubeconfig,
461 helm=self._helm_command,
462 name=kdu_instance,
463 atomic=atomic_str,
464 params=params_str,
465 timeout=timeout_str,
466 ns=namespace_str,
467 model=kdu_model,
468 ver=version_str,
469 )
470 )
471 return command
472
473 def _get_upgrade_scale_command(
474 self,
475 kdu_model: str,
476 kdu_instance: str,
477 namespace: str,
478 scale: int,
479 version: str,
480 atomic: bool,
481 replica_str: str,
482 timeout: float,
483 resource_name: str,
484 kubeconfig: str,
485 ) -> str:
486
487 timeout_str = ""
488 if timeout:
489 timeout_str = "--timeout {}s".format(timeout)
490
491 # atomic
492 atomic_str = ""
493 if atomic:
494 atomic_str = "--atomic"
495
496 # version
497 version_str = ""
498 if version:
499 version_str = "--version {}".format(version)
500
501 # namespace
502 namespace_str = ""
503 if namespace:
504 namespace_str = "--namespace {}".format(namespace)
505
506 # scale
507 if resource_name:
508 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
509 else:
510 scale_dict = {replica_str: scale}
511
512 scale_str = self._params_to_set_option(scale_dict)
513
514 command = (
515 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
516 "{timeout} {ver}"
517 ).format(
518 helm=self._helm_command,
519 name=kdu_instance,
520 namespace=namespace_str,
521 atomic=atomic_str,
522 scale=scale_str,
523 timeout=timeout_str,
524 model=kdu_model,
525 ver=version_str,
526 kubeconfig=kubeconfig,
527 )
528 return command
529
530 def _get_upgrade_command(
531 self,
532 kdu_model: str,
533 kdu_instance: str,
534 namespace: str,
535 params_str: str,
536 version: str,
537 atomic: bool,
538 timeout: float,
539 kubeconfig: str,
540 ) -> str:
541
542 timeout_str = ""
543 if timeout:
544 timeout_str = "--timeout {}s".format(timeout)
545
546 # atomic
547 atomic_str = ""
548 if atomic:
549 atomic_str = "--atomic"
550
551 # version
552 version_str = ""
553 if version:
554 version_str = "--version {}".format(version)
555
556 # namespace
557 namespace_str = ""
558 if namespace:
559 namespace_str = "--namespace {}".format(namespace)
560
561 command = (
562 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
563 "--output yaml {params} {timeout} {ver}"
564 ).format(
565 kubeconfig=kubeconfig,
566 helm=self._helm_command,
567 name=kdu_instance,
568 namespace=namespace_str,
569 atomic=atomic_str,
570 params=params_str,
571 timeout=timeout_str,
572 model=kdu_model,
573 ver=version_str,
574 )
575 return command
576
577 def _get_rollback_command(
578 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
579 ) -> str:
580 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
581 kubeconfig, self._helm_command, kdu_instance, revision, namespace
582 )
583
584 def _get_uninstall_command(
585 self, kdu_instance: str, namespace: str, kubeconfig: str
586 ) -> str:
587
588 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
589 kubeconfig, self._helm_command, kdu_instance, namespace
590 )
591
592 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
593 repo_ids = []
594 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
595 cluster = self.db.get_one("k8sclusters", cluster_filter)
596 if cluster:
597 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
598 return repo_ids
599 else:
600 raise K8sException(
601 "k8cluster with helm-id : {} not found".format(cluster_uuid)
602 )