Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm3",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("K8S Helm3 connector initialized")
70
71 async def install(
72 self,
73 cluster_uuid: str,
74 kdu_model: str,
75 kdu_instance: str,
76 atomic: bool = True,
77 timeout: float = 300,
78 params: dict = None,
79 db_dict: dict = None,
80 kdu_name: str = None,
81 namespace: str = None,
82 **kwargs,
83 ):
84 """Install a helm chart
85
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
88 of these options:
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
97 to finish
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
101
102 :param kwargs: Additional parameters (None yet)
103
104 :return: True if successful
105 """
106
107 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
108
109 # sync local dir
110 self.fs.sync(from_path=cluster_uuid)
111
112 # init env, paths
113 paths, env = self._init_paths_env(
114 cluster_name=cluster_uuid, create_if_not_exist=True
115 )
116
117 # for helm3 if namespace does not exist must create it
118 if namespace and namespace != "kube-system":
119 if not await self._namespace_exists(cluster_uuid, namespace):
120 try:
121 await self._create_namespace(cluster_uuid, namespace)
122 except Exception as e:
123 if not await self._namespace_exists(cluster_uuid, namespace):
124 err_msg = (
125 "namespace {} does not exist in cluster_id {} "
126 "error message: ".format(namespace, e)
127 )
128 self.log.error(err_msg)
129 raise K8sException(err_msg)
130
131 await self._install_impl(
132 cluster_uuid,
133 kdu_model,
134 paths,
135 env,
136 kdu_instance,
137 atomic=atomic,
138 timeout=timeout,
139 params=params,
140 db_dict=db_dict,
141 kdu_name=kdu_name,
142 namespace=namespace,
143 )
144
145 # sync fs
146 self.fs.reverse_sync(from_path=cluster_uuid)
147
148 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
149 return True
150
151 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
152 self.log.debug(
153 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
154 )
155
156 return await self._exec_inspect_command(
157 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
158 )
159
160 """
161 ####################################################################################
162 ################################### P R I V A T E ##################################
163 ####################################################################################
164 """
165
166 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
167 """
168 Creates and returns base cluster and kube dirs and returns them.
169 Also created helm3 dirs according to new directory specification, paths are
170 returned and also environment variables that must be provided to execute commands
171
172 Helm 3 directory specification uses XDG categories for variable support:
173 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
174 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
175 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
176
177 The variables assigned for this paths are:
178 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
179 $HELM_PATH_DATA but looking and helm env the variable names are different)
180 - Cache: $HELM_CACHE_HOME
181 - Config: $HELM_CONFIG_HOME
182 - Data: $HELM_DATA_HOME
183 - helm kubeconfig: $KUBECONFIG
184
185 :param cluster_name: cluster_name
186 :return: Dictionary with config_paths and dictionary with helm environment variables
187 """
188
189 base = self.fs.path
190 if base.endswith("/") or base.endswith("\\"):
191 base = base[:-1]
192
193 # base dir for cluster
194 cluster_dir = base + "/" + cluster_name
195
196 # kube dir
197 kube_dir = cluster_dir + "/" + ".kube"
198 if create_if_not_exist and not os.path.exists(kube_dir):
199 self.log.debug("Creating dir {}".format(kube_dir))
200 os.makedirs(kube_dir)
201
202 helm_path_cache = cluster_dir + "/.cache/helm"
203 if create_if_not_exist and not os.path.exists(helm_path_cache):
204 self.log.debug("Creating dir {}".format(helm_path_cache))
205 os.makedirs(helm_path_cache)
206
207 helm_path_config = cluster_dir + "/.config/helm"
208 if create_if_not_exist and not os.path.exists(helm_path_config):
209 self.log.debug("Creating dir {}".format(helm_path_config))
210 os.makedirs(helm_path_config)
211
212 helm_path_data = cluster_dir + "/.local/share/helm"
213 if create_if_not_exist and not os.path.exists(helm_path_data):
214 self.log.debug("Creating dir {}".format(helm_path_data))
215 os.makedirs(helm_path_data)
216
217 config_filename = kube_dir + "/config"
218
219 # 2 - Prepare dictionary with paths
220 paths = {
221 "kube_dir": kube_dir,
222 "kube_config": config_filename,
223 "cluster_dir": cluster_dir,
224 }
225
226 # 3 - Prepare environment variables
227 env = {
228 "HELM_CACHE_HOME": helm_path_cache,
229 "HELM_CONFIG_HOME": helm_path_config,
230 "HELM_DATA_HOME": helm_path_data,
231 "KUBECONFIG": config_filename,
232 }
233
234 for file_name, file in paths.items():
235 if "dir" in file_name and not os.path.exists(file):
236 err_msg = "{} dir does not exist".format(file)
237 self.log.error(err_msg)
238 raise K8sException(err_msg)
239
240 return paths, env
241
242 async def _namespace_exists(self, cluster_id, namespace) -> bool:
243 self.log.debug(
244 "checking if namespace {} exists cluster_id {}".format(
245 namespace, cluster_id
246 )
247 )
248 namespaces = await self._get_namespaces(cluster_id)
249 return namespace in namespaces if namespaces else False
250
251 async def _get_namespaces(self, cluster_id: str):
252 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
253
254 # init config, env
255 paths, env = self._init_paths_env(
256 cluster_name=cluster_id, create_if_not_exist=True
257 )
258
259 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
260 self.kubectl_command, paths["kube_config"]
261 )
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=True, env=env
264 )
265
266 data = yaml.load(output, Loader=yaml.SafeLoader)
267 namespaces = [item["metadata"]["name"] for item in data["items"]]
268 self.log.debug(f"namespaces {namespaces}")
269
270 return namespaces
271
272 async def _create_namespace(self, cluster_id: str, namespace: str):
273 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
274
275 # init config, env
276 paths, env = self._init_paths_env(
277 cluster_name=cluster_id, create_if_not_exist=True
278 )
279
280 command = "{} --kubeconfig={} create namespace {}".format(
281 self.kubectl_command, paths["kube_config"], namespace
282 )
283 _, _rc = await self._local_async_exec(
284 command=command, raise_exception_on_error=True, env=env
285 )
286 self.log.debug(f"namespace {namespace} created")
287
288 return _rc
289
290 async def _get_services(
291 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
292 ):
293 # init config, env
294 paths, env = self._init_paths_env(
295 cluster_name=cluster_id, create_if_not_exist=True
296 )
297
298 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
299 kubeconfig, self._helm_command, kdu_instance, namespace
300 )
301 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
302 output, _rc = await self._local_async_exec_pipe(
303 command1, command2, env=env, raise_exception_on_error=True
304 )
305 services = self._parse_services(output)
306
307 return services
308
309 async def _cluster_init(self, cluster_id, namespace, paths, env):
310 """
311 Implements the helm version dependent cluster initialization:
312 For helm3 it creates the namespace if it is not created
313 """
314 if namespace != "kube-system":
315 namespaces = await self._get_namespaces(cluster_id)
316 if namespace not in namespaces:
317 await self._create_namespace(cluster_id, namespace)
318
319 repo_list = await self.repo_list(cluster_id)
320 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
321 if not stable_repo and self._stable_repo_url:
322 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
323
324 # Returns False as no software needs to be uninstalled
325 return False
326
327 async def _uninstall_sw(self, cluster_id: str, namespace: str):
328 # nothing to do to uninstall sw
329 pass
330
331 async def _instances_list(self, cluster_id: str):
332 # init paths, env
333 paths, env = self._init_paths_env(
334 cluster_name=cluster_id, create_if_not_exist=True
335 )
336
337 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341
342 if output and len(output) > 0:
343 self.log.debug("instances list output: {}".format(output))
344 return yaml.load(output, Loader=yaml.SafeLoader)
345 else:
346 return []
347
348 def _get_inspect_command(
349 self, show_command: str, kdu_model: str, repo_str: str, version: str
350 ):
351 """Generates the command to obtain the information about an Helm Chart package
352 (´helm show ...´ command)
353
354 Args:
355 show_command: the second part of the command (`helm show <show_command>`)
356 kdu_model: The name or path of an Helm Chart
357 repo_url: Helm Chart repository url
358 version: constraint with specific version of the Chart to use
359
360 Returns:
361 str: the generated Helm Chart command
362 """
363
364 inspect_command = "{} show {} {}{} {}".format(
365 self._helm_command, show_command, kdu_model, repo_str, version
366 )
367 return inspect_command
368
369 def _get_get_command(
370 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
371 ):
372 get_command = (
373 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
374 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
375 )
376 )
377 return get_command
378
379 async def _status_kdu(
380 self,
381 cluster_id: str,
382 kdu_instance: str,
383 namespace: str = None,
384 yaml_format: bool = False,
385 show_error_log: bool = False,
386 ) -> Union[str, dict]:
387 self.log.debug(
388 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
389 )
390
391 if not namespace:
392 namespace = "kube-system"
393
394 # init config, env
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
399 paths["kube_config"], self._helm_command, kdu_instance, namespace
400 )
401
402 output, rc = await self._local_async_exec(
403 command=command,
404 raise_exception_on_error=True,
405 show_error_log=show_error_log,
406 env=env,
407 )
408
409 if yaml_format:
410 return str(output)
411
412 if rc != 0:
413 return None
414
415 data = yaml.load(output, Loader=yaml.SafeLoader)
416
417 # remove field 'notes' and manifest
418 try:
419 del data.get("info")["notes"]
420 except KeyError:
421 pass
422
423 # parse the manifest to a list of dictionaries
424 if "manifest" in data:
425 manifest_str = data.get("manifest")
426 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
427
428 data["manifest"] = []
429 for doc in manifest_docs:
430 data["manifest"].append(doc)
431
432 return data
433
434 def _get_install_command(
435 self,
436 kdu_model: str,
437 kdu_instance: str,
438 namespace: str,
439 params_str: str,
440 version: str,
441 atomic: bool,
442 timeout: float,
443 kubeconfig: str,
444 ) -> str:
445 timeout_str = ""
446 if timeout:
447 timeout_str = "--timeout {}s".format(timeout)
448
449 # atomic
450 atomic_str = ""
451 if atomic:
452 atomic_str = "--atomic"
453 # namespace
454 namespace_str = ""
455 if namespace:
456 namespace_str = "--namespace {}".format(namespace)
457
458 # version
459 version_str = ""
460 if version:
461 version_str = "--version {}".format(version)
462
463 command = (
464 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
465 "{params} {timeout} {ns} {model} {ver}".format(
466 kubeconfig=kubeconfig,
467 helm=self._helm_command,
468 name=kdu_instance,
469 atomic=atomic_str,
470 params=params_str,
471 timeout=timeout_str,
472 ns=namespace_str,
473 model=kdu_model,
474 ver=version_str,
475 )
476 )
477 return command
478
479 def _get_upgrade_scale_command(
480 self,
481 kdu_model: str,
482 kdu_instance: str,
483 namespace: str,
484 scale: int,
485 version: str,
486 atomic: bool,
487 replica_str: str,
488 timeout: float,
489 resource_name: str,
490 kubeconfig: str,
491 ) -> str:
492 """Generates the command to scale a Helm Chart release
493
494 Args:
495 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
496 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
497 namespace (str): Namespace where this KDU instance is deployed
498 scale (int): Scale count
499 version (str): Constraint with specific version of the Chart to use
500 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
501 The --wait flag will be set automatically if --atomic is used
502 replica_str (str): The key under resource_name key where the scale count is stored
503 timeout (float): The time, in seconds, to wait
504 resource_name (str): The KDU's resource to scale
505 kubeconfig (str): Kubeconfig file path
506
507 Returns:
508 str: command to scale a Helm Chart release
509 """
510
511 # scale
512 if resource_name:
513 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
514 else:
515 scale_dict = {replica_str: scale}
516
517 scale_str = self._params_to_set_option(scale_dict)
518
519 return self._get_upgrade_command(
520 kdu_model=kdu_model,
521 kdu_instance=kdu_instance,
522 namespace=namespace,
523 params_str=scale_str,
524 version=version,
525 atomic=atomic,
526 timeout=timeout,
527 kubeconfig=kubeconfig,
528 )
529
530 def _get_upgrade_command(
531 self,
532 kdu_model: str,
533 kdu_instance: str,
534 namespace: str,
535 params_str: str,
536 version: str,
537 atomic: bool,
538 timeout: float,
539 kubeconfig: str,
540 force: bool = False,
541 ) -> str:
542 """Generates the command to upgrade a Helm Chart release
543
544 Args:
545 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
546 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
547 namespace (str): Namespace where this KDU instance is deployed
548 params_str (str): Params used to upgrade the Helm Chart release
549 version (str): Constraint with specific version of the Chart to use
550 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
551 The --wait flag will be set automatically if --atomic is used
552 timeout (float): The time, in seconds, to wait
553 kubeconfig (str): Kubeconfig file path
554 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
555 Returns:
556 str: command to upgrade a Helm Chart release
557 """
558
559 timeout_str = ""
560 if timeout:
561 timeout_str = "--timeout {}s".format(timeout)
562
563 # atomic
564 atomic_str = ""
565 if atomic:
566 atomic_str = "--atomic"
567
568 # force
569 force_str = ""
570 if force:
571 force_str = "--force "
572
573 # version
574 version_str = ""
575 if version:
576 version_str = "--version {}".format(version)
577
578 # namespace
579 namespace_str = ""
580 if namespace:
581 namespace_str = "--namespace {}".format(namespace)
582
583 command = (
584 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
585 "--output yaml {params} {timeout} --reuse-values {ver}"
586 ).format(
587 kubeconfig=kubeconfig,
588 helm=self._helm_command,
589 name=kdu_instance,
590 namespace=namespace_str,
591 atomic=atomic_str,
592 force=force_str,
593 params=params_str,
594 timeout=timeout_str,
595 model=kdu_model,
596 ver=version_str,
597 )
598 return command
599
600 def _get_rollback_command(
601 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
602 ) -> str:
603 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
604 kubeconfig, self._helm_command, kdu_instance, revision, namespace
605 )
606
607 def _get_uninstall_command(
608 self, kdu_instance: str, namespace: str, kubeconfig: str
609 ) -> str:
610 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
611 kubeconfig, self._helm_command, kdu_instance, namespace
612 )
613
614 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
615 repo_ids = []
616 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
617 cluster = self.db.get_one("k8sclusters", cluster_filter)
618 if cluster:
619 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
620 return repo_ids
621 else:
622 raise K8sException(
623 "k8cluster with helm-id : {} not found".format(cluster_uuid)
624 )