Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 from shlex import quote
24 import os
25 import yaml
26
27 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 from n2vc.exceptions import K8sException
29
30
31 class K8sHelm3Connector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm3",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v3
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("K8S Helm3 connector initialized")
71
72 async def install(
73 self,
74 cluster_uuid: str,
75 kdu_model: str,
76 kdu_instance: str,
77 atomic: bool = True,
78 timeout: float = 300,
79 params: dict = None,
80 db_dict: dict = None,
81 kdu_name: str = None,
82 namespace: str = None,
83 **kwargs,
84 ):
85 """Install a helm chart
86
87 :param cluster_uuid str: The UUID of the cluster to install to
88 :param kdu_model str: chart/reference (string), which can be either
89 of these options:
90 - a name of chart available via the repos known by OSM
91 (e.g. stable/openldap, stable/openldap:1.2.4)
92 - a path to a packaged chart (e.g. mychart.tgz)
93 - a path to an unpacked chart directory or a URL (e.g. mychart)
94 :param kdu_instance: Kdu instance name
95 :param atomic bool: If set, waits until the model is active and resets
96 the cluster on failure.
97 :param timeout int: The time, in seconds, to wait for the install
98 to finish
99 :param params dict: Key-value pairs of instantiation parameters
100 :param kdu_name: Name of the KDU instance to be installed
101 :param namespace: K8s namespace to use for the KDU instance
102
103 :param kwargs: Additional parameters (None yet)
104
105 :return: True if successful
106 """
107
108 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
109
110 # sync local dir
111 self.fs.sync(from_path=cluster_uuid)
112
113 # init env, paths
114 paths, env = self._init_paths_env(
115 cluster_name=cluster_uuid, create_if_not_exist=True
116 )
117
118 # for helm3 if namespace does not exist must create it
119 if namespace and namespace != "kube-system":
120 if not await self._namespace_exists(cluster_uuid, namespace):
121 try:
122 # TODO: refactor to use kubernetes API client
123 await self._create_namespace(cluster_uuid, namespace)
124 except Exception as e:
125 if not await self._namespace_exists(cluster_uuid, namespace):
126 err_msg = (
127 "namespace {} does not exist in cluster_id {} "
128 "error message: ".format(namespace, e)
129 )
130 self.log.error(err_msg)
131 raise K8sException(err_msg)
132
133 await self._install_impl(
134 cluster_uuid,
135 kdu_model,
136 paths,
137 env,
138 kdu_instance,
139 atomic=atomic,
140 timeout=timeout,
141 params=params,
142 db_dict=db_dict,
143 kdu_name=kdu_name,
144 namespace=namespace,
145 )
146
147 # sync fs
148 self.fs.reverse_sync(from_path=cluster_uuid)
149
150 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
151 return True
152
153 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
154 self.log.debug(
155 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
156 )
157
158 return await self._exec_inspect_command(
159 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
160 )
161
162 """
163 ####################################################################################
164 ################################### P R I V A T E ##################################
165 ####################################################################################
166 """
167
168 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
169 """
170 Creates and returns base cluster and kube dirs and returns them.
171 Also created helm3 dirs according to new directory specification, paths are
172 returned and also environment variables that must be provided to execute commands
173
174 Helm 3 directory specification uses XDG categories for variable support:
175 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
176 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
177 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
178
179 The variables assigned for this paths are:
180 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
181 $HELM_PATH_DATA but looking and helm env the variable names are different)
182 - Cache: $HELM_CACHE_HOME
183 - Config: $HELM_CONFIG_HOME
184 - Data: $HELM_DATA_HOME
185 - helm kubeconfig: $KUBECONFIG
186
187 :param cluster_name: cluster_name
188 :return: Dictionary with config_paths and dictionary with helm environment variables
189 """
190
191 base = self.fs.path
192 if base.endswith("/") or base.endswith("\\"):
193 base = base[:-1]
194
195 # base dir for cluster
196 cluster_dir = base + "/" + cluster_name
197
198 # kube dir
199 kube_dir = cluster_dir + "/" + ".kube"
200 if create_if_not_exist and not os.path.exists(kube_dir):
201 self.log.debug("Creating dir {}".format(kube_dir))
202 os.makedirs(kube_dir)
203
204 helm_path_cache = cluster_dir + "/.cache/helm"
205 if create_if_not_exist and not os.path.exists(helm_path_cache):
206 self.log.debug("Creating dir {}".format(helm_path_cache))
207 os.makedirs(helm_path_cache)
208
209 helm_path_config = cluster_dir + "/.config/helm"
210 if create_if_not_exist and not os.path.exists(helm_path_config):
211 self.log.debug("Creating dir {}".format(helm_path_config))
212 os.makedirs(helm_path_config)
213
214 helm_path_data = cluster_dir + "/.local/share/helm"
215 if create_if_not_exist and not os.path.exists(helm_path_data):
216 self.log.debug("Creating dir {}".format(helm_path_data))
217 os.makedirs(helm_path_data)
218
219 config_filename = kube_dir + "/config"
220
221 # 2 - Prepare dictionary with paths
222 paths = {
223 "kube_dir": kube_dir,
224 "kube_config": config_filename,
225 "cluster_dir": cluster_dir,
226 }
227
228 # 3 - Prepare environment variables
229 env = {
230 "HELM_CACHE_HOME": helm_path_cache,
231 "HELM_CONFIG_HOME": helm_path_config,
232 "HELM_DATA_HOME": helm_path_data,
233 "KUBECONFIG": config_filename,
234 }
235
236 for file_name, file in paths.items():
237 if "dir" in file_name and not os.path.exists(file):
238 err_msg = "{} dir does not exist".format(file)
239 self.log.error(err_msg)
240 raise K8sException(err_msg)
241
242 return paths, env
243
244 async def _namespace_exists(self, cluster_id, namespace) -> bool:
245 self.log.debug(
246 "checking if namespace {} exists cluster_id {}".format(
247 namespace, cluster_id
248 )
249 )
250 namespaces = await self._get_namespaces(cluster_id)
251 return namespace in namespaces if namespaces else False
252
253 async def _get_namespaces(self, cluster_id: str):
254 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
255
256 # init config, env
257 paths, env = self._init_paths_env(
258 cluster_name=cluster_id, create_if_not_exist=True
259 )
260
261 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
262 self.kubectl_command, quote(paths["kube_config"])
263 )
264 output, _rc = await self._local_async_exec(
265 command=command, raise_exception_on_error=True, env=env
266 )
267
268 data = yaml.load(output, Loader=yaml.SafeLoader)
269 namespaces = [item["metadata"]["name"] for item in data["items"]]
270 self.log.debug(f"namespaces {namespaces}")
271
272 return namespaces
273
274 async def _create_namespace(self, cluster_id: str, namespace: str):
275 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
276
277 # init config, env
278 paths, env = self._init_paths_env(
279 cluster_name=cluster_id, create_if_not_exist=True
280 )
281
282 command = "{} --kubeconfig={} create namespace {}".format(
283 self.kubectl_command, quote(paths["kube_config"]), quote(namespace)
284 )
285 _, _rc = await self._local_async_exec(
286 command=command, raise_exception_on_error=True, env=env
287 )
288 self.log.debug(f"namespace {namespace} created")
289
290 return _rc
291
292 async def _get_services(
293 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
294 ):
295 # init config, env
296 paths, env = self._init_paths_env(
297 cluster_name=cluster_id, create_if_not_exist=True
298 )
299
300 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
301 kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
302 )
303 command2 = "{} get --namespace={} -f -".format(
304 self.kubectl_command, quote(namespace)
305 )
306 output, _rc = await self._local_async_exec_pipe(
307 command1, command2, env=env, raise_exception_on_error=True
308 )
309 services = self._parse_services(output)
310
311 return services
312
313 async def _cluster_init(self, cluster_id, namespace, paths, env):
314 """
315 Implements the helm version dependent cluster initialization:
316 For helm3 it creates the namespace if it is not created
317 """
318 if namespace != "kube-system":
319 namespaces = await self._get_namespaces(cluster_id)
320 if namespace not in namespaces:
321 # TODO: refactor to use kubernetes API client
322 await self._create_namespace(cluster_id, namespace)
323
324 repo_list = await self.repo_list(cluster_id)
325 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
326 if not stable_repo and self._stable_repo_url:
327 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
328
329 # Returns False as no software needs to be uninstalled
330 return False
331
332 async def _uninstall_sw(self, cluster_id: str, namespace: str):
333 # nothing to do to uninstall sw
334 pass
335
336 async def _instances_list(self, cluster_id: str):
337 # init paths, env
338 paths, env = self._init_paths_env(
339 cluster_name=cluster_id, create_if_not_exist=True
340 )
341
342 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
343 output, _rc = await self._local_async_exec(
344 command=command, raise_exception_on_error=True, env=env
345 )
346
347 if output and len(output) > 0:
348 self.log.debug("instances list output: {}".format(output))
349 return yaml.load(output, Loader=yaml.SafeLoader)
350 else:
351 return []
352
353 def _get_inspect_command(
354 self, show_command: str, kdu_model: str, repo_str: str, version: str
355 ):
356 """Generates the command to obtain the information about an Helm Chart package
357 (´helm show ...´ command)
358
359 Args:
360 show_command: the second part of the command (`helm show <show_command>`)
361 kdu_model: The name or path of a Helm Chart
362 repo_str: Helm Chart repository url
363 version: constraint with specific version of the Chart to use
364
365 Returns:
366 str: the generated Helm Chart command
367 """
368
369 inspect_command = "{} show {} {}{} {}".format(
370 self._helm_command, show_command, quote(kdu_model), repo_str, version
371 )
372 return inspect_command
373
374 def _get_get_command(
375 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
376 ):
377 get_command = (
378 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
379 kubeconfig,
380 self._helm_command,
381 get_command,
382 quote(kdu_instance),
383 quote(namespace),
384 )
385 )
386 return get_command
387
388 async def _status_kdu(
389 self,
390 cluster_id: str,
391 kdu_instance: str,
392 namespace: str = None,
393 yaml_format: bool = False,
394 show_error_log: bool = False,
395 ) -> Union[str, dict]:
396 self.log.debug(
397 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
398 )
399
400 if not namespace:
401 namespace = "kube-system"
402
403 # init config, env
404 paths, env = self._init_paths_env(
405 cluster_name=cluster_id, create_if_not_exist=True
406 )
407 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
408 paths["kube_config"],
409 self._helm_command,
410 quote(kdu_instance),
411 quote(namespace),
412 )
413
414 output, rc = await self._local_async_exec(
415 command=command,
416 raise_exception_on_error=True,
417 show_error_log=show_error_log,
418 env=env,
419 )
420
421 if yaml_format:
422 return str(output)
423
424 if rc != 0:
425 return None
426
427 data = yaml.load(output, Loader=yaml.SafeLoader)
428
429 # remove field 'notes' and manifest
430 try:
431 del data.get("info")["notes"]
432 except KeyError:
433 pass
434
435 # parse the manifest to a list of dictionaries
436 if "manifest" in data:
437 manifest_str = data.get("manifest")
438 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
439
440 data["manifest"] = []
441 for doc in manifest_docs:
442 data["manifest"].append(doc)
443
444 return data
445
446 def _get_install_command(
447 self,
448 kdu_model: str,
449 kdu_instance: str,
450 namespace: str,
451 params_str: str,
452 version: str,
453 atomic: bool,
454 timeout: float,
455 kubeconfig: str,
456 ) -> str:
457 timeout_str = ""
458 if timeout:
459 timeout_str = "--timeout {}s".format(timeout)
460
461 # atomic
462 atomic_str = ""
463 if atomic:
464 atomic_str = "--atomic"
465 # namespace
466 namespace_str = ""
467 if namespace:
468 namespace_str = "--namespace {}".format(quote(namespace))
469
470 # version
471 version_str = ""
472 if version:
473 version_str = "--version {}".format(version)
474
475 command = (
476 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
477 "{params} {timeout} {ns} {model} {ver}".format(
478 kubeconfig=kubeconfig,
479 helm=self._helm_command,
480 name=quote(kdu_instance),
481 atomic=atomic_str,
482 params=params_str,
483 timeout=timeout_str,
484 ns=namespace_str,
485 model=quote(kdu_model),
486 ver=version_str,
487 )
488 )
489 return command
490
491 def _get_upgrade_scale_command(
492 self,
493 kdu_model: str,
494 kdu_instance: str,
495 namespace: str,
496 scale: int,
497 version: str,
498 atomic: bool,
499 replica_str: str,
500 timeout: float,
501 resource_name: str,
502 kubeconfig: str,
503 ) -> str:
504 """Generates the command to scale a Helm Chart release
505
506 Args:
507 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
508 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
509 namespace (str): Namespace where this KDU instance is deployed
510 scale (int): Scale count
511 version (str): Constraint with specific version of the Chart to use
512 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
513 The --wait flag will be set automatically if --atomic is used
514 replica_str (str): The key under resource_name key where the scale count is stored
515 timeout (float): The time, in seconds, to wait
516 resource_name (str): The KDU's resource to scale
517 kubeconfig (str): Kubeconfig file path
518
519 Returns:
520 str: command to scale a Helm Chart release
521 """
522
523 # scale
524 if resource_name:
525 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
526 else:
527 scale_dict = {replica_str: scale}
528
529 scale_str = self._params_to_set_option(scale_dict)
530
531 return self._get_upgrade_command(
532 kdu_model=kdu_model,
533 kdu_instance=kdu_instance,
534 namespace=namespace,
535 params_str=scale_str,
536 version=version,
537 atomic=atomic,
538 timeout=timeout,
539 kubeconfig=kubeconfig,
540 )
541
542 def _get_upgrade_command(
543 self,
544 kdu_model: str,
545 kdu_instance: str,
546 namespace: str,
547 params_str: str,
548 version: str,
549 atomic: bool,
550 timeout: float,
551 kubeconfig: str,
552 force: bool = False,
553 ) -> str:
554 """Generates the command to upgrade a Helm Chart release
555
556 Args:
557 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
558 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
559 namespace (str): Namespace where this KDU instance is deployed
560 params_str (str): Params used to upgrade the Helm Chart release
561 version (str): Constraint with specific version of the Chart to use
562 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
563 The --wait flag will be set automatically if --atomic is used
564 timeout (float): The time, in seconds, to wait
565 kubeconfig (str): Kubeconfig file path
566 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
567 Returns:
568 str: command to upgrade a Helm Chart release
569 """
570
571 timeout_str = ""
572 if timeout:
573 timeout_str = "--timeout {}s".format(timeout)
574
575 # atomic
576 atomic_str = ""
577 if atomic:
578 atomic_str = "--atomic"
579
580 # force
581 force_str = ""
582 if force:
583 force_str = "--force "
584
585 # version
586 version_str = ""
587 if version:
588 version_str = "--version {}".format(quote(version))
589
590 # namespace
591 namespace_str = ""
592 if namespace:
593 namespace_str = "--namespace {}".format(quote(namespace))
594
595 command = (
596 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
597 "--output yaml {params} {timeout} --reuse-values {ver}"
598 ).format(
599 kubeconfig=kubeconfig,
600 helm=self._helm_command,
601 name=quote(kdu_instance),
602 namespace=namespace_str,
603 atomic=atomic_str,
604 force=force_str,
605 params=params_str,
606 timeout=timeout_str,
607 model=quote(kdu_model),
608 ver=version_str,
609 )
610 return command
611
612 def _get_rollback_command(
613 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
614 ) -> str:
615 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
616 kubeconfig,
617 self._helm_command,
618 quote(kdu_instance),
619 revision,
620 quote(namespace),
621 )
622
623 def _get_uninstall_command(
624 self, kdu_instance: str, namespace: str, kubeconfig: str
625 ) -> str:
626 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
627 kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
628 )
629
630 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
631 repo_ids = []
632 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
633 cluster = self.db.get_one("k8sclusters", cluster_filter)
634 if cluster:
635 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
636 return repo_ids
637 else:
638 raise K8sException(
639 "k8cluster with helm-id : {} not found".format(cluster_uuid)
640 )