Coverage for n2vc/k8s_helm_base_conn.py: 58%
772 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:03 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:03 +0000
1##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
24from typing import Union
25from shlex import quote
26import random
27import time
28import shlex
29import shutil
30import stat
31import os
32import yaml
33from uuid import uuid4
34from urllib.parse import urlparse
36from n2vc.config import EnvironConfig
37from n2vc.exceptions import K8sException
38from n2vc.k8s_conn import K8sConnector
39from n2vc.kubectl import Kubectl
42class K8sHelmBaseConnector(K8sConnector):
44 """
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
48 """
50 service_account = "osm"
52 def __init__(
53 self,
54 fs: object,
55 db: object,
56 kubectl_command: str = "/usr/bin/kubectl",
57 helm_command: str = "/usr/bin/helm",
58 log: object = None,
59 on_update_db=None,
60 ):
61 """
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
67 :param log: logger
68 :param on_update_db: callback called when k8s connector updates database
69 """
71 # parent class
72 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
74 self.log.info("Initializing K8S Helm connector")
76 self.config = EnvironConfig()
77 # random numbers for release name generation
78 random.seed(time.time())
80 # the file system
81 self.fs = fs
83 # exception if kubectl is not installed
84 self.kubectl_command = kubectl_command
85 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
87 # exception if helm is not installed
88 self._helm_command = helm_command
89 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
91 # obtain stable repo url from config or apply default
92 self._stable_repo_url = self.config.get("stablerepourl")
93 if self._stable_repo_url == "None":
94 self._stable_repo_url = None
96 # Lock to avoid concurrent execution of helm commands
97 self.cmd_lock = asyncio.Lock()
99 def _get_namespace(self, cluster_uuid: str) -> str:
100 """
101 Obtains the namespace used by the cluster with the uuid passed by argument
103 param: cluster_uuid: cluster's uuid
104 """
106 # first, obtain the cluster corresponding to the uuid passed by argument
107 k8scluster = self.db.get_one(
108 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
109 )
110 return k8scluster.get("namespace")
112 async def init_env(
113 self,
114 k8s_creds: str,
115 namespace: str = "kube-system",
116 reuse_cluster_uuid=None,
117 **kwargs,
118 ) -> tuple[str, bool]:
119 """
120 It prepares a given K8s cluster environment to run Charts
122 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
123 '.kube/config'
124 :param namespace: optional namespace to be used for helm. By default,
125 'kube-system' will be used
126 :param reuse_cluster_uuid: existing cluster uuid for reuse
127 :param kwargs: Additional parameters (None yet)
128 :return: uuid of the K8s cluster and True if connector has installed some
129 software in the cluster
130 (on error, an exception will be raised)
131 """
133 if reuse_cluster_uuid:
134 cluster_id = reuse_cluster_uuid
135 else:
136 cluster_id = str(uuid4())
138 self.log.debug(
139 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
140 )
142 paths, env = self._init_paths_env(
143 cluster_name=cluster_id, create_if_not_exist=True
144 )
145 mode = stat.S_IRUSR | stat.S_IWUSR
146 with open(paths["kube_config"], "w", mode) as f:
147 f.write(k8s_creds)
148 os.chmod(paths["kube_config"], 0o600)
150 # Code with initialization specific of helm version
151 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
153 # sync fs with local data
154 self.fs.reverse_sync(from_path=cluster_id)
156 self.log.info("Cluster {} initialized".format(cluster_id))
158 return cluster_id, n2vc_installed_sw
160 async def repo_add(
161 self,
162 cluster_uuid: str,
163 name: str,
164 url: str,
165 repo_type: str = "chart",
166 cert: str = None,
167 user: str = None,
168 password: str = None,
169 oci: bool = False,
170 ):
171 self.log.debug(
172 "Cluster {}, adding {} repository {}. URL: {}".format(
173 cluster_uuid, repo_type, name, url
174 )
175 )
177 # init_env
178 paths, env = self._init_paths_env(
179 cluster_name=cluster_uuid, create_if_not_exist=True
180 )
182 # sync local dir
183 self.fs.sync(from_path=cluster_uuid)
185 if oci:
186 if user and password:
187 host_port = urlparse(url).netloc if url.startswith("oci://") else url
188 # helm registry login url
189 command = "env KUBECONFIG={} {} registry login {}".format(
190 paths["kube_config"], self._helm_command, quote(host_port)
191 )
192 else:
193 self.log.debug(
194 "OCI registry login is not needed for repo: {}".format(name)
195 )
196 return
197 else:
198 # helm repo add name url
199 command = "env KUBECONFIG={} {} repo add {} {}".format(
200 paths["kube_config"], self._helm_command, quote(name), quote(url)
201 )
203 if cert:
204 temp_cert_file = os.path.join(
205 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
206 )
207 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
208 with open(temp_cert_file, "w") as the_cert:
209 the_cert.write(cert)
210 command += " --ca-file {}".format(quote(temp_cert_file))
212 if user:
213 command += " --username={}".format(quote(user))
215 if password:
216 command += " --password={}".format(quote(password))
218 self.log.debug("adding repo: {}".format(command))
219 await self._local_async_exec(
220 command=command, raise_exception_on_error=True, env=env
221 )
223 if not oci:
224 # helm repo update
225 command = "env KUBECONFIG={} {} repo update {}".format(
226 paths["kube_config"], self._helm_command, quote(name)
227 )
228 self.log.debug("updating repo: {}".format(command))
229 await self._local_async_exec(
230 command=command, raise_exception_on_error=False, env=env
231 )
233 # sync fs
234 self.fs.reverse_sync(from_path=cluster_uuid)
236 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
237 self.log.debug(
238 "Cluster {}, updating {} repository {}".format(
239 cluster_uuid, repo_type, name
240 )
241 )
243 # init_env
244 paths, env = self._init_paths_env(
245 cluster_name=cluster_uuid, create_if_not_exist=True
246 )
248 # sync local dir
249 self.fs.sync(from_path=cluster_uuid)
251 # helm repo update
252 command = "{} repo update {}".format(self._helm_command, quote(name))
253 self.log.debug("updating repo: {}".format(command))
254 await self._local_async_exec(
255 command=command, raise_exception_on_error=False, env=env
256 )
258 # sync fs
259 self.fs.reverse_sync(from_path=cluster_uuid)
261 async def repo_list(self, cluster_uuid: str) -> list:
262 """
263 Get the list of registered repositories
265 :return: list of registered repositories: [ (name, url) .... ]
266 """
268 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
270 # config filename
271 paths, env = self._init_paths_env(
272 cluster_name=cluster_uuid, create_if_not_exist=True
273 )
275 # sync local dir
276 self.fs.sync(from_path=cluster_uuid)
278 command = "env KUBECONFIG={} {} repo list --output yaml".format(
279 paths["kube_config"], self._helm_command
280 )
282 # Set exception to false because if there are no repos just want an empty list
283 output, _rc = await self._local_async_exec(
284 command=command, raise_exception_on_error=False, env=env
285 )
287 # sync fs
288 self.fs.reverse_sync(from_path=cluster_uuid)
290 if _rc == 0:
291 if output and len(output) > 0:
292 repos = yaml.load(output, Loader=yaml.SafeLoader)
293 # unify format between helm2 and helm3 setting all keys lowercase
294 return self._lower_keys_list(repos)
295 else:
296 return []
297 else:
298 return []
300 async def repo_remove(self, cluster_uuid: str, name: str):
301 self.log.debug(
302 "remove {} repositories for cluster {}".format(name, cluster_uuid)
303 )
305 # init env, paths
306 paths, env = self._init_paths_env(
307 cluster_name=cluster_uuid, create_if_not_exist=True
308 )
310 # sync local dir
311 self.fs.sync(from_path=cluster_uuid)
313 command = "env KUBECONFIG={} {} repo remove {}".format(
314 paths["kube_config"], self._helm_command, quote(name)
315 )
316 await self._local_async_exec(
317 command=command, raise_exception_on_error=True, env=env
318 )
320 # sync fs
321 self.fs.reverse_sync(from_path=cluster_uuid)
323 async def reset(
324 self,
325 cluster_uuid: str,
326 force: bool = False,
327 uninstall_sw: bool = False,
328 **kwargs,
329 ) -> bool:
330 """Reset a cluster
332 Resets the Kubernetes cluster by removing the helm deployment that represents it.
334 :param cluster_uuid: The UUID of the cluster to reset
335 :param force: Boolean to force the reset
336 :param uninstall_sw: Boolean to force the reset
337 :param kwargs: Additional parameters (None yet)
338 :return: Returns True if successful or raises an exception.
339 """
340 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
341 self.log.debug(
342 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
343 cluster_uuid, uninstall_sw
344 )
345 )
347 # sync local dir
348 self.fs.sync(from_path=cluster_uuid)
350 # uninstall releases if needed.
351 if uninstall_sw:
352 releases = await self.instances_list(cluster_uuid=cluster_uuid)
353 if len(releases) > 0:
354 if force:
355 for r in releases:
356 try:
357 kdu_instance = r.get("name")
358 chart = r.get("chart")
359 self.log.debug(
360 "Uninstalling {} -> {}".format(chart, kdu_instance)
361 )
362 await self.uninstall(
363 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
364 )
365 except Exception as e:
366 # will not raise exception as it was found
367 # that in some cases of previously installed helm releases it
368 # raised an error
369 self.log.warn(
370 "Error uninstalling release {}: {}".format(
371 kdu_instance, e
372 )
373 )
374 else:
375 msg = (
376 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
377 ).format(cluster_uuid)
378 self.log.warn(msg)
379 uninstall_sw = (
380 False # Allow to remove k8s cluster without removing Tiller
381 )
383 if uninstall_sw:
384 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
386 # delete cluster directory
387 self.log.debug("Removing directory {}".format(cluster_uuid))
388 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
389 # Remove also local directorio if still exist
390 direct = self.fs.path + "/" + cluster_uuid
391 shutil.rmtree(direct, ignore_errors=True)
393 return True
395 def _is_helm_chart_a_file(self, chart_name: str):
396 return chart_name.count("/") > 1
398 @staticmethod
399 def _is_helm_chart_a_url(chart_name: str):
400 result = urlparse(chart_name)
401 return all([result.scheme, result.netloc])
403 async def _install_impl(
404 self,
405 cluster_id: str,
406 kdu_model: str,
407 paths: dict,
408 env: dict,
409 kdu_instance: str,
410 atomic: bool = True,
411 timeout: float = 300,
412 params: dict = None,
413 db_dict: dict = None,
414 kdu_name: str = None,
415 namespace: str = None,
416 ):
417 # init env, paths
418 paths, env = self._init_paths_env(
419 cluster_name=cluster_id, create_if_not_exist=True
420 )
422 # params to str
423 params_str, file_to_delete = self._params_to_file_option(
424 cluster_id=cluster_id, params=params
425 )
427 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
429 command = self._get_install_command(
430 kdu_model,
431 kdu_instance,
432 namespace,
433 params_str,
434 version,
435 atomic,
436 timeout,
437 paths["kube_config"],
438 )
440 self.log.debug("installing: {}".format(command))
442 if atomic:
443 # exec helm in a task
444 exec_task = asyncio.ensure_future(
445 coro_or_future=self._local_async_exec(
446 command=command, raise_exception_on_error=False, env=env
447 )
448 )
450 # write status in another task
451 status_task = asyncio.ensure_future(
452 coro_or_future=self._store_status(
453 cluster_id=cluster_id,
454 kdu_instance=kdu_instance,
455 namespace=namespace,
456 db_dict=db_dict,
457 operation="install",
458 )
459 )
461 # wait for execution task
462 await asyncio.wait([exec_task])
464 # cancel status task
465 status_task.cancel()
467 output, rc = exec_task.result()
469 else:
470 output, rc = await self._local_async_exec(
471 command=command, raise_exception_on_error=False, env=env
472 )
474 # remove temporal values yaml file
475 if file_to_delete:
476 os.remove(file_to_delete)
478 # write final status
479 await self._store_status(
480 cluster_id=cluster_id,
481 kdu_instance=kdu_instance,
482 namespace=namespace,
483 db_dict=db_dict,
484 operation="install",
485 )
487 if rc != 0:
488 msg = "Error executing command: {}\nOutput: {}".format(command, output)
489 self.log.error(msg)
490 raise K8sException(msg)
492 async def upgrade(
493 self,
494 cluster_uuid: str,
495 kdu_instance: str,
496 kdu_model: str = None,
497 atomic: bool = True,
498 timeout: float = 300,
499 params: dict = None,
500 db_dict: dict = None,
501 namespace: str = None,
502 reset_values: bool = False,
503 reuse_values: bool = True,
504 reset_then_reuse_values: bool = False,
505 force: bool = False,
506 ):
507 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
509 # sync local dir
510 self.fs.sync(from_path=cluster_uuid)
512 # look for instance to obtain namespace
514 # set namespace
515 if not namespace:
516 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
517 if not instance_info:
518 raise K8sException("kdu_instance {} not found".format(kdu_instance))
519 namespace = instance_info["namespace"]
521 # init env, paths
522 paths, env = self._init_paths_env(
523 cluster_name=cluster_uuid, create_if_not_exist=True
524 )
526 # sync local dir
527 self.fs.sync(from_path=cluster_uuid)
529 # params to str
530 params_str, file_to_delete = self._params_to_file_option(
531 cluster_id=cluster_uuid, params=params
532 )
534 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
536 command = self._get_upgrade_command(
537 kdu_model,
538 kdu_instance,
539 namespace,
540 params_str,
541 version,
542 atomic,
543 timeout,
544 paths["kube_config"],
545 reset_values,
546 reuse_values,
547 reset_then_reuse_values,
548 force,
549 )
551 self.log.debug("upgrading: {}".format(command))
553 if atomic:
554 # exec helm in a task
555 exec_task = asyncio.ensure_future(
556 coro_or_future=self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559 )
560 # write status in another task
561 status_task = asyncio.ensure_future(
562 coro_or_future=self._store_status(
563 cluster_id=cluster_uuid,
564 kdu_instance=kdu_instance,
565 namespace=namespace,
566 db_dict=db_dict,
567 operation="upgrade",
568 )
569 )
571 # wait for execution task
572 await asyncio.wait([exec_task])
574 # cancel status task
575 status_task.cancel()
576 output, rc = exec_task.result()
578 else:
579 output, rc = await self._local_async_exec(
580 command=command, raise_exception_on_error=False, env=env
581 )
583 # remove temporal values yaml file
584 if file_to_delete:
585 os.remove(file_to_delete)
587 # write final status
588 await self._store_status(
589 cluster_id=cluster_uuid,
590 kdu_instance=kdu_instance,
591 namespace=namespace,
592 db_dict=db_dict,
593 operation="upgrade",
594 )
596 if rc != 0:
597 msg = "Error executing command: {}\nOutput: {}".format(command, output)
598 self.log.error(msg)
599 raise K8sException(msg)
601 # sync fs
602 self.fs.reverse_sync(from_path=cluster_uuid)
604 # return new revision number
605 instance = await self.get_instance_info(
606 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
607 )
608 if instance:
609 revision = int(instance.get("revision"))
610 self.log.debug("New revision: {}".format(revision))
611 return revision
612 else:
613 return 0
615 async def scale(
616 self,
617 kdu_instance: str,
618 scale: int,
619 resource_name: str,
620 total_timeout: float = 1800,
621 cluster_uuid: str = None,
622 kdu_model: str = None,
623 atomic: bool = True,
624 db_dict: dict = None,
625 **kwargs,
626 ):
627 """Scale a resource in a Helm Chart.
629 Args:
630 kdu_instance: KDU instance name
631 scale: Scale to which to set the resource
632 resource_name: Resource name
633 total_timeout: The time, in seconds, to wait
634 cluster_uuid: The UUID of the cluster
635 kdu_model: The chart reference
636 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
637 The --wait flag will be set automatically if --atomic is used
638 db_dict: Dictionary for any additional data
639 kwargs: Additional parameters
641 Returns:
642 True if successful, False otherwise
643 """
645 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
646 if resource_name:
647 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
648 resource_name, kdu_model, cluster_uuid
649 )
651 self.log.debug(debug_mgs)
653 # look for instance to obtain namespace
654 # get_instance_info function calls the sync command
655 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
656 if not instance_info:
657 raise K8sException("kdu_instance {} not found".format(kdu_instance))
659 # init env, paths
660 paths, env = self._init_paths_env(
661 cluster_name=cluster_uuid, create_if_not_exist=True
662 )
664 # version
665 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
667 repo_url = await self._find_repo(kdu_model, cluster_uuid)
669 _, replica_str = await self._get_replica_count_url(
670 kdu_model, repo_url, resource_name
671 )
673 command = self._get_upgrade_scale_command(
674 kdu_model,
675 kdu_instance,
676 instance_info["namespace"],
677 scale,
678 version,
679 atomic,
680 replica_str,
681 total_timeout,
682 resource_name,
683 paths["kube_config"],
684 )
686 self.log.debug("scaling: {}".format(command))
688 if atomic:
689 # exec helm in a task
690 exec_task = asyncio.ensure_future(
691 coro_or_future=self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694 )
695 # write status in another task
696 status_task = asyncio.ensure_future(
697 coro_or_future=self._store_status(
698 cluster_id=cluster_uuid,
699 kdu_instance=kdu_instance,
700 namespace=instance_info["namespace"],
701 db_dict=db_dict,
702 operation="scale",
703 )
704 )
706 # wait for execution task
707 await asyncio.wait([exec_task])
709 # cancel status task
710 status_task.cancel()
711 output, rc = exec_task.result()
713 else:
714 output, rc = await self._local_async_exec(
715 command=command, raise_exception_on_error=False, env=env
716 )
718 # write final status
719 await self._store_status(
720 cluster_id=cluster_uuid,
721 kdu_instance=kdu_instance,
722 namespace=instance_info["namespace"],
723 db_dict=db_dict,
724 operation="scale",
725 )
727 if rc != 0:
728 msg = "Error executing command: {}\nOutput: {}".format(command, output)
729 self.log.error(msg)
730 raise K8sException(msg)
732 # sync fs
733 self.fs.reverse_sync(from_path=cluster_uuid)
735 return True
737 async def get_scale_count(
738 self,
739 resource_name: str,
740 kdu_instance: str,
741 cluster_uuid: str,
742 kdu_model: str,
743 **kwargs,
744 ) -> int:
745 """Get a resource scale count.
747 Args:
748 cluster_uuid: The UUID of the cluster
749 resource_name: Resource name
750 kdu_instance: KDU instance name
751 kdu_model: The name or path of an Helm Chart
752 kwargs: Additional parameters
754 Returns:
755 Resource instance count
756 """
758 self.log.debug(
759 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
760 )
762 # look for instance to obtain namespace
763 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
764 if not instance_info:
765 raise K8sException("kdu_instance {} not found".format(kdu_instance))
767 # init env, paths
768 paths, _ = self._init_paths_env(
769 cluster_name=cluster_uuid, create_if_not_exist=True
770 )
772 replicas = await self._get_replica_count_instance(
773 kdu_instance=kdu_instance,
774 namespace=instance_info["namespace"],
775 kubeconfig=paths["kube_config"],
776 resource_name=resource_name,
777 )
779 self.log.debug(
780 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
781 )
783 # Get default value if scale count is not found from provided values
784 # Important note: this piece of code shall only be executed in the first scaling operation,
785 # since it is expected that the _get_replica_count_instance is able to obtain the number of
786 # replicas when a scale operation was already conducted previously for this KDU/resource!
787 if replicas is None:
788 repo_url = await self._find_repo(
789 kdu_model=kdu_model, cluster_uuid=cluster_uuid
790 )
791 replicas, _ = await self._get_replica_count_url(
792 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
793 )
795 self.log.debug(
796 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
797 f"{resource_name} obtained: {replicas}"
798 )
800 if replicas is None:
801 msg = "Replica count not found. Cannot be scaled"
802 self.log.error(msg)
803 raise K8sException(msg)
805 return int(replicas)
807 async def rollback(
808 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
809 ):
810 self.log.debug(
811 "rollback kdu_instance {} to revision {} from cluster {}".format(
812 kdu_instance, revision, cluster_uuid
813 )
814 )
816 # sync local dir
817 self.fs.sync(from_path=cluster_uuid)
819 # look for instance to obtain namespace
820 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
821 if not instance_info:
822 raise K8sException("kdu_instance {} not found".format(kdu_instance))
824 # init env, paths
825 paths, env = self._init_paths_env(
826 cluster_name=cluster_uuid, create_if_not_exist=True
827 )
829 # sync local dir
830 self.fs.sync(from_path=cluster_uuid)
832 command = self._get_rollback_command(
833 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
834 )
836 self.log.debug("rolling_back: {}".format(command))
838 # exec helm in a task
839 exec_task = asyncio.ensure_future(
840 coro_or_future=self._local_async_exec(
841 command=command, raise_exception_on_error=False, env=env
842 )
843 )
844 # write status in another task
845 status_task = asyncio.ensure_future(
846 coro_or_future=self._store_status(
847 cluster_id=cluster_uuid,
848 kdu_instance=kdu_instance,
849 namespace=instance_info["namespace"],
850 db_dict=db_dict,
851 operation="rollback",
852 )
853 )
855 # wait for execution task
856 await asyncio.wait([exec_task])
858 # cancel status task
859 status_task.cancel()
861 output, rc = exec_task.result()
863 # write final status
864 await self._store_status(
865 cluster_id=cluster_uuid,
866 kdu_instance=kdu_instance,
867 namespace=instance_info["namespace"],
868 db_dict=db_dict,
869 operation="rollback",
870 )
872 if rc != 0:
873 msg = "Error executing command: {}\nOutput: {}".format(command, output)
874 self.log.error(msg)
875 raise K8sException(msg)
877 # sync fs
878 self.fs.reverse_sync(from_path=cluster_uuid)
880 # return new revision number
881 instance = await self.get_instance_info(
882 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
883 )
884 if instance:
885 revision = int(instance.get("revision"))
886 self.log.debug("New revision: {}".format(revision))
887 return revision
888 else:
889 return 0
891 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
892 """
893 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
894 (this call should happen after all _terminate-config-primitive_ of the VNF
895 are invoked).
897 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
898 :param kdu_instance: unique name for the KDU instance to be deleted
899 :param kwargs: Additional parameters (None yet)
900 :return: True if successful
901 """
903 self.log.debug(
904 "uninstall kdu_instance {} from cluster {}".format(
905 kdu_instance, cluster_uuid
906 )
907 )
909 # sync local dir
910 self.fs.sync(from_path=cluster_uuid)
912 # look for instance to obtain namespace
913 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
914 if not instance_info:
915 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
916 return True
917 # init env, paths
918 paths, env = self._init_paths_env(
919 cluster_name=cluster_uuid, create_if_not_exist=True
920 )
922 # sync local dir
923 self.fs.sync(from_path=cluster_uuid)
925 command = self._get_uninstall_command(
926 kdu_instance, instance_info["namespace"], paths["kube_config"]
927 )
928 output, _rc = await self._local_async_exec(
929 command=command, raise_exception_on_error=True, env=env
930 )
932 # sync fs
933 self.fs.reverse_sync(from_path=cluster_uuid)
935 return self._output_to_table(output)
937 async def instances_list(self, cluster_uuid: str) -> list:
938 """
939 returns a list of deployed releases in a cluster
941 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
942 :return:
943 """
945 self.log.debug("list releases for cluster {}".format(cluster_uuid))
947 # sync local dir
948 self.fs.sync(from_path=cluster_uuid)
950 # execute internal command
951 result = await self._instances_list(cluster_uuid)
953 # sync fs
954 self.fs.reverse_sync(from_path=cluster_uuid)
956 return result
958 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
959 instances = await self.instances_list(cluster_uuid=cluster_uuid)
960 for instance in instances:
961 if instance.get("name") == kdu_instance:
962 return instance
963 self.log.debug("Instance {} not found".format(kdu_instance))
964 return None
966 async def upgrade_charm(
967 self,
968 ee_id: str = None,
969 path: str = None,
970 charm_id: str = None,
971 charm_type: str = None,
972 timeout: float = None,
973 ) -> str:
974 """This method upgrade charms in VNFs
976 Args:
977 ee_id: Execution environment id
978 path: Local path to the charm
979 charm_id: charm-id
980 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
981 timeout: (Float) Timeout for the ns update operation
983 Returns:
984 The output of the update operation if status equals to "completed"
985 """
986 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
988 async def exec_primitive(
989 self,
990 cluster_uuid: str = None,
991 kdu_instance: str = None,
992 primitive_name: str = None,
993 timeout: float = 300,
994 params: dict = None,
995 db_dict: dict = None,
996 **kwargs,
997 ) -> str:
998 """Exec primitive (Juju action)
1000 :param cluster_uuid: The UUID of the cluster or namespace:cluster
1001 :param kdu_instance: The unique name of the KDU instance
1002 :param primitive_name: Name of action that will be executed
1003 :param timeout: Timeout for action execution
1004 :param params: Dictionary of all the parameters needed for the action
1005 :db_dict: Dictionary for any additional data
1006 :param kwargs: Additional parameters (None yet)
1008 :return: Returns the output of the action
1009 """
1010 raise K8sException(
1011 "KDUs deployed with Helm don't support actions "
1012 "different from rollback, upgrade and status"
1013 )
1015 async def get_services(
1016 self, cluster_uuid: str, kdu_instance: str, namespace: str
1017 ) -> list:
1018 """
1019 Returns a list of services defined for the specified kdu instance.
1021 :param cluster_uuid: UUID of a K8s cluster known by OSM
1022 :param kdu_instance: unique name for the KDU instance
1023 :param namespace: K8s namespace used by the KDU instance
1024 :return: If successful, it will return a list of services, Each service
1025 can have the following data:
1026 - `name` of the service
1027 - `type` type of service in the k8 cluster
1028 - `ports` List of ports offered by the service, for each port includes at least
1029 name, port, protocol
1030 - `cluster_ip` Internal ip to be used inside k8s cluster
1031 - `external_ip` List of external ips (in case they are available)
1032 """
1034 self.log.debug(
1035 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1036 cluster_uuid, kdu_instance
1037 )
1038 )
1040 # init env, paths
1041 paths, env = self._init_paths_env(
1042 cluster_name=cluster_uuid, create_if_not_exist=True
1043 )
1045 # sync local dir
1046 self.fs.sync(from_path=cluster_uuid)
1048 # get list of services names for kdu
1049 service_names = await self._get_services(
1050 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1051 )
1053 service_list = []
1054 for service in service_names:
1055 service = await self._get_service(cluster_uuid, service, namespace)
1056 service_list.append(service)
1058 # sync fs
1059 self.fs.reverse_sync(from_path=cluster_uuid)
1061 return service_list
1063 async def get_service(
1064 self, cluster_uuid: str, service_name: str, namespace: str
1065 ) -> object:
1066 self.log.debug(
1067 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1068 service_name, namespace, cluster_uuid
1069 )
1070 )
1072 # sync local dir
1073 self.fs.sync(from_path=cluster_uuid)
1075 service = await self._get_service(cluster_uuid, service_name, namespace)
1077 # sync fs
1078 self.fs.reverse_sync(from_path=cluster_uuid)
1080 return service
1082 async def status_kdu(
1083 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1084 ) -> Union[str, dict]:
1085 """
1086 This call would retrieve tha current state of a given KDU instance. It would be
1087 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1088 values_ of the configuration parameters applied to a given instance. This call
1089 would be based on the `status` call.
1091 :param cluster_uuid: UUID of a K8s cluster known by OSM
1092 :param kdu_instance: unique name for the KDU instance
1093 :param kwargs: Additional parameters (None yet)
1094 :param yaml_format: if the return shall be returned as an YAML string or as a
1095 dictionary
1096 :return: If successful, it will return the following vector of arguments:
1097 - K8s `namespace` in the cluster where the KDU lives
1098 - `state` of the KDU instance. It can be:
1099 - UNKNOWN
1100 - DEPLOYED
1101 - DELETED
1102 - SUPERSEDED
1103 - FAILED or
1104 - DELETING
1105 - List of `resources` (objects) that this release consists of, sorted by kind,
1106 and the status of those resources
1107 - Last `deployment_time`.
1109 """
1110 self.log.debug(
1111 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1112 cluster_uuid, kdu_instance
1113 )
1114 )
1116 # sync local dir
1117 self.fs.sync(from_path=cluster_uuid)
1119 # get instance: needed to obtain namespace
1120 instances = await self._instances_list(cluster_id=cluster_uuid)
1121 for instance in instances:
1122 if instance.get("name") == kdu_instance:
1123 break
1124 else:
1125 # instance does not exist
1126 raise K8sException(
1127 "Instance name: {} not found in cluster: {}".format(
1128 kdu_instance, cluster_uuid
1129 )
1130 )
1132 status = await self._status_kdu(
1133 cluster_id=cluster_uuid,
1134 kdu_instance=kdu_instance,
1135 namespace=instance["namespace"],
1136 yaml_format=yaml_format,
1137 show_error_log=True,
1138 )
1140 # sync fs
1141 self.fs.reverse_sync(from_path=cluster_uuid)
1143 return status
1145 async def get_values_kdu(
1146 self, kdu_instance: str, namespace: str, kubeconfig: str
1147 ) -> str:
1148 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1150 return await self._exec_get_command(
1151 get_command="values",
1152 kdu_instance=kdu_instance,
1153 namespace=namespace,
1154 kubeconfig=kubeconfig,
1155 )
1157 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1158 """Method to obtain the Helm Chart package's values
1160 Args:
1161 kdu_model: The name or path of an Helm Chart
1162 repo_url: Helm Chart repository url
1164 Returns:
1165 str: the values of the Helm Chart package
1166 """
1168 self.log.debug(
1169 "inspect kdu_model values {} from (optional) repo: {}".format(
1170 kdu_model, repo_url
1171 )
1172 )
1174 return await self._exec_inspect_command(
1175 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1176 )
1178 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1179 self.log.debug(
1180 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1181 )
1183 return await self._exec_inspect_command(
1184 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1185 )
1187 async def synchronize_repos(self, cluster_uuid: str):
1188 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1189 try:
1190 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1191 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1193 local_repo_list = await self.repo_list(cluster_uuid)
1194 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1196 deleted_repo_list = []
1197 added_repo_dict = {}
1199 # iterate over the list of repos in the database that should be
1200 # added if not present
1201 for repo_name, db_repo in db_repo_dict.items():
1202 try:
1203 # check if it is already present
1204 curr_repo_url = local_repo_dict.get(db_repo["name"])
1205 repo_id = db_repo.get("_id")
1206 if curr_repo_url != db_repo["url"]:
1207 if curr_repo_url:
1208 self.log.debug(
1209 "repo {} url changed, delete and and again".format(
1210 db_repo["url"]
1211 )
1212 )
1213 await self.repo_remove(cluster_uuid, db_repo["name"])
1214 deleted_repo_list.append(repo_id)
1216 # add repo
1217 self.log.debug("add repo {}".format(db_repo["name"]))
1218 await self.repo_add(
1219 cluster_uuid,
1220 db_repo["name"],
1221 db_repo["url"],
1222 cert=db_repo.get("ca_cert"),
1223 user=db_repo.get("user"),
1224 password=db_repo.get("password"),
1225 oci=db_repo.get("oci", False),
1226 )
1227 added_repo_dict[repo_id] = db_repo["name"]
1228 except Exception as e:
1229 raise K8sException(
1230 "Error adding repo id: {}, err_msg: {} ".format(
1231 repo_id, repr(e)
1232 )
1233 )
1235 # Delete repos that are present but not in nbi_list
1236 for repo_name in local_repo_dict:
1237 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1238 self.log.debug("delete repo {}".format(repo_name))
1239 try:
1240 await self.repo_remove(cluster_uuid, repo_name)
1241 deleted_repo_list.append(repo_name)
1242 except Exception as e:
1243 self.warning(
1244 "Error deleting repo, name: {}, err_msg: {}".format(
1245 repo_name, str(e)
1246 )
1247 )
1249 return deleted_repo_list, added_repo_dict
1251 except K8sException:
1252 raise
1253 except Exception as e:
1254 # Do not raise errors synchronizing repos
1255 self.log.error("Error synchronizing repos: {}".format(e))
1256 raise Exception("Error synchronizing repos: {}".format(e))
1258 def _get_db_repos_dict(self, repo_ids: list):
1259 db_repos_dict = {}
1260 for repo_id in repo_ids:
1261 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1262 db_repos_dict[db_repo["name"]] = db_repo
1263 return db_repos_dict
1265 """
1266 ####################################################################################
1267 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1268 ####################################################################################
1269 """
1271 @abc.abstractmethod
1272 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1273 """
1274 Creates and returns base cluster and kube dirs and returns them.
1275 Also created helm3 dirs according to new directory specification, paths are
1276 not returned but assigned to helm environment variables
1278 :param cluster_name: cluster_name
1279 :return: Dictionary with config_paths and dictionary with helm environment variables
1280 """
1282 @abc.abstractmethod
1283 async def _cluster_init(self, cluster_id, namespace, paths, env):
1284 """
1285 Implements the helm version dependent cluster initialization
1286 """
1288 @abc.abstractmethod
1289 async def _instances_list(self, cluster_id):
1290 """
1291 Implements the helm version dependent helm instances list
1292 """
1294 @abc.abstractmethod
1295 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1296 """
1297 Implements the helm version dependent method to obtain services from a helm instance
1298 """
1300 @abc.abstractmethod
1301 async def _status_kdu(
1302 self,
1303 cluster_id: str,
1304 kdu_instance: str,
1305 namespace: str = None,
1306 yaml_format: bool = False,
1307 show_error_log: bool = False,
1308 ) -> Union[str, dict]:
1309 """
1310 Implements the helm version dependent method to obtain status of a helm instance
1311 """
1313 @abc.abstractmethod
1314 def _get_install_command(
1315 self,
1316 kdu_model,
1317 kdu_instance,
1318 namespace,
1319 params_str,
1320 version,
1321 atomic,
1322 timeout,
1323 kubeconfig,
1324 ) -> str:
1325 """
1326 Obtain command to be executed to delete the indicated instance
1327 """
1329 @abc.abstractmethod
1330 def _get_upgrade_scale_command(
1331 self,
1332 kdu_model,
1333 kdu_instance,
1334 namespace,
1335 count,
1336 version,
1337 atomic,
1338 replicas,
1339 timeout,
1340 resource_name,
1341 kubeconfig,
1342 ) -> str:
1343 """Generates the command to scale a Helm Chart release
1345 Args:
1346 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1347 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1348 namespace (str): Namespace where this KDU instance is deployed
1349 scale (int): Scale count
1350 version (str): Constraint with specific version of the Chart to use
1351 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1352 The --wait flag will be set automatically if --atomic is used
1353 replica_str (str): The key under resource_name key where the scale count is stored
1354 timeout (float): The time, in seconds, to wait
1355 resource_name (str): The KDU's resource to scale
1356 kubeconfig (str): Kubeconfig file path
1358 Returns:
1359 str: command to scale a Helm Chart release
1360 """
1362 @abc.abstractmethod
1363 def _get_upgrade_command(
1364 self,
1365 kdu_model,
1366 kdu_instance,
1367 namespace,
1368 params_str,
1369 version,
1370 atomic,
1371 timeout,
1372 kubeconfig,
1373 reset_values,
1374 reuse_values,
1375 reset_then_reuse_values,
1376 force,
1377 ) -> str:
1378 """Generates the command to upgrade a Helm Chart release
1380 Args:
1381 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1382 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1383 namespace (str): Namespace where this KDU instance is deployed
1384 params_str (str): Params used to upgrade the Helm Chart release
1385 version (str): Constraint with specific version of the Chart to use
1386 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1387 The --wait flag will be set automatically if --atomic is used
1388 timeout (float): The time, in seconds, to wait
1389 kubeconfig (str): Kubeconfig file path
1390 reset_values(bool): If set, helm resets values instead of reusing previous values.
1391 reuse_values(bool): If set, helm reuses previous values.
1392 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
1393 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1394 Returns:
1395 str: command to upgrade a Helm Chart release
1396 """
1398 @abc.abstractmethod
1399 def _get_rollback_command(
1400 self, kdu_instance, namespace, revision, kubeconfig
1401 ) -> str:
1402 """
1403 Obtain command to be executed to rollback the indicated instance
1404 """
1406 @abc.abstractmethod
1407 def _get_uninstall_command(
1408 self, kdu_instance: str, namespace: str, kubeconfig: str
1409 ) -> str:
1410 """
1411 Obtain command to be executed to delete the indicated instance
1412 """
1414 @abc.abstractmethod
1415 def _get_inspect_command(
1416 self, show_command: str, kdu_model: str, repo_str: str, version: str
1417 ):
1418 """Generates the command to obtain the information about an Helm Chart package
1419 (´helm show ...´ command)
1421 Args:
1422 show_command: the second part of the command (`helm show <show_command>`)
1423 kdu_model: The name or path of an Helm Chart
1424 repo_url: Helm Chart repository url
1425 version: constraint with specific version of the Chart to use
1427 Returns:
1428 str: the generated Helm Chart command
1429 """
1431 @abc.abstractmethod
1432 def _get_get_command(
1433 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1434 ):
1435 """Obtain command to be executed to get information about the kdu instance."""
1437 @abc.abstractmethod
1438 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1439 """
1440 Method call to uninstall cluster software for helm. This method is dependent
1441 of helm version
1442 For Helm v2 it will be called when Tiller must be uninstalled
1443 For Helm v3 it does nothing and does not need to be callled
1444 """
1446 @abc.abstractmethod
1447 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1448 """
1449 Obtains the cluster repos identifiers
1450 """
1452 """
1453 ####################################################################################
1454 ################################### P R I V A T E ##################################
1455 ####################################################################################
1456 """
1458 @staticmethod
1459 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1460 if os.path.exists(filename):
1461 return True
1462 else:
1463 msg = "File {} does not exist".format(filename)
1464 if exception_if_not_exists:
1465 raise K8sException(msg)
1467 @staticmethod
1468 def _remove_multiple_spaces(strobj):
1469 strobj = strobj.strip()
1470 while " " in strobj:
1471 strobj = strobj.replace(" ", " ")
1472 return strobj
1474 @staticmethod
1475 def _output_to_lines(output: str) -> list:
1476 output_lines = list()
1477 lines = output.splitlines(keepends=False)
1478 for line in lines:
1479 line = line.strip()
1480 if len(line) > 0:
1481 output_lines.append(line)
1482 return output_lines
1484 @staticmethod
1485 def _output_to_table(output: str) -> list:
1486 output_table = list()
1487 lines = output.splitlines(keepends=False)
1488 for line in lines:
1489 line = line.replace("\t", " ")
1490 line_list = list()
1491 output_table.append(line_list)
1492 cells = line.split(sep=" ")
1493 for cell in cells:
1494 cell = cell.strip()
1495 if len(cell) > 0:
1496 line_list.append(cell)
1497 return output_table
1499 @staticmethod
1500 def _parse_services(output: str) -> list:
1501 lines = output.splitlines(keepends=False)
1502 services = []
1503 for line in lines:
1504 line = line.replace("\t", " ")
1505 cells = line.split(sep=" ")
1506 if len(cells) > 0 and cells[0].startswith("service/"):
1507 elems = cells[0].split(sep="/")
1508 if len(elems) > 1:
1509 services.append(elems[1])
1510 return services
1512 @staticmethod
1513 def _get_deep(dictionary: dict, members: tuple):
1514 target = dictionary
1515 value = None
1516 try:
1517 for m in members:
1518 value = target.get(m)
1519 if not value:
1520 return None
1521 else:
1522 target = value
1523 except Exception:
1524 pass
1525 return value
1527 # find key:value in several lines
1528 @staticmethod
1529 def _find_in_lines(p_lines: list, p_key: str) -> str:
1530 for line in p_lines:
1531 try:
1532 if line.startswith(p_key + ":"):
1533 parts = line.split(":")
1534 the_value = parts[1].strip()
1535 return the_value
1536 except Exception:
1537 # ignore it
1538 pass
1539 return None
1541 @staticmethod
1542 def _lower_keys_list(input_list: list):
1543 """
1544 Transform the keys in a list of dictionaries to lower case and returns a new list
1545 of dictionaries
1546 """
1547 new_list = []
1548 if input_list:
1549 for dictionary in input_list:
1550 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1551 new_list.append(new_dict)
1552 return new_list
1554 async def _local_async_exec(
1555 self,
1556 command: str,
1557 raise_exception_on_error: bool = False,
1558 show_error_log: bool = True,
1559 encode_utf8: bool = False,
1560 env: dict = None,
1561 ) -> tuple[str, int]:
1562 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1563 self.log.debug(
1564 "Executing async local command: {}, env: {}".format(command, env)
1565 )
1567 # split command
1568 command = shlex.split(command)
1570 environ = os.environ.copy()
1571 if env:
1572 environ.update(env)
1574 try:
1575 async with self.cmd_lock:
1576 process = await asyncio.create_subprocess_exec(
1577 *command,
1578 stdout=asyncio.subprocess.PIPE,
1579 stderr=asyncio.subprocess.PIPE,
1580 env=environ,
1581 )
1583 # wait for command terminate
1584 stdout, stderr = await process.communicate()
1586 return_code = process.returncode
1588 output = ""
1589 if stdout:
1590 output = stdout.decode("utf-8").strip()
1591 # output = stdout.decode()
1592 if stderr:
1593 output = stderr.decode("utf-8").strip()
1594 # output = stderr.decode()
1596 if return_code != 0 and show_error_log:
1597 self.log.debug(
1598 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1599 )
1600 else:
1601 self.log.debug("Return code: {}".format(return_code))
1603 if raise_exception_on_error and return_code != 0:
1604 raise K8sException(output)
1606 if encode_utf8:
1607 output = output.encode("utf-8").strip()
1608 output = str(output).replace("\\n", "\n")
1610 return output, return_code
1612 except asyncio.CancelledError:
1613 # first, kill the process if it is still running
1614 if process.returncode is None:
1615 process.kill()
1616 raise
1617 except K8sException:
1618 raise
1619 except Exception as e:
1620 msg = "Exception executing command: {} -> {}".format(command, e)
1621 self.log.error(msg)
1622 if raise_exception_on_error:
1623 raise K8sException(e) from e
1624 else:
1625 return "", -1
1627 async def _local_async_exec_pipe(
1628 self,
1629 command1: str,
1630 command2: str,
1631 raise_exception_on_error: bool = True,
1632 show_error_log: bool = True,
1633 encode_utf8: bool = False,
1634 env: dict = None,
1635 ):
1636 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1637 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1638 command = "{} | {}".format(command1, command2)
1639 self.log.debug(
1640 "Executing async local command: {}, env: {}".format(command, env)
1641 )
1643 # split command
1644 command1 = shlex.split(command1)
1645 command2 = shlex.split(command2)
1647 environ = os.environ.copy()
1648 if env:
1649 environ.update(env)
1651 try:
1652 async with self.cmd_lock:
1653 read, write = os.pipe()
1654 process_1 = await asyncio.create_subprocess_exec(
1655 *command1, stdout=write, env=environ
1656 )
1657 os.close(write)
1658 process_2 = await asyncio.create_subprocess_exec(
1659 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1660 )
1661 os.close(read)
1662 stdout, stderr = await process_2.communicate()
1664 return_code = process_2.returncode
1666 output = ""
1667 if stdout:
1668 output = stdout.decode("utf-8").strip()
1669 # output = stdout.decode()
1670 if stderr:
1671 output = stderr.decode("utf-8").strip()
1672 # output = stderr.decode()
1674 if return_code != 0 and show_error_log:
1675 self.log.debug(
1676 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1677 )
1678 else:
1679 self.log.debug("Return code: {}".format(return_code))
1681 if raise_exception_on_error and return_code != 0:
1682 raise K8sException(output)
1684 if encode_utf8:
1685 output = output.encode("utf-8").strip()
1686 output = str(output).replace("\\n", "\n")
1688 return output, return_code
1689 except asyncio.CancelledError:
1690 # first, kill the processes if they are still running
1691 for process in (process_1, process_2):
1692 if process.returncode is None:
1693 process.kill()
1694 raise
1695 except K8sException:
1696 raise
1697 except Exception as e:
1698 msg = "Exception executing command: {} -> {}".format(command, e)
1699 self.log.error(msg)
1700 if raise_exception_on_error:
1701 raise K8sException(e) from e
1702 else:
1703 return "", -1
1705 async def _get_service(self, cluster_id, service_name, namespace):
1706 """
1707 Obtains the data of the specified service in the k8cluster.
1709 :param cluster_id: id of a K8s cluster known by OSM
1710 :param service_name: name of the K8s service in the specified namespace
1711 :param namespace: K8s namespace used by the KDU instance
1712 :return: If successful, it will return a service with the following data:
1713 - `name` of the service
1714 - `type` type of service in the k8 cluster
1715 - `ports` List of ports offered by the service, for each port includes at least
1716 name, port, protocol
1717 - `cluster_ip` Internal ip to be used inside k8s cluster
1718 - `external_ip` List of external ips (in case they are available)
1719 """
1721 # init config, env
1722 paths, env = self._init_paths_env(
1723 cluster_name=cluster_id, create_if_not_exist=True
1724 )
1726 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1727 self.kubectl_command,
1728 paths["kube_config"],
1729 quote(namespace),
1730 quote(service_name),
1731 )
1733 output, _rc = await self._local_async_exec(
1734 command=command, raise_exception_on_error=True, env=env
1735 )
1737 data = yaml.load(output, Loader=yaml.SafeLoader)
1739 service = {
1740 "name": service_name,
1741 "type": self._get_deep(data, ("spec", "type")),
1742 "ports": self._get_deep(data, ("spec", "ports")),
1743 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1744 }
1745 if service["type"] == "LoadBalancer":
1746 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1747 ip_list = [elem["ip"] for elem in ip_map_list]
1748 service["external_ip"] = ip_list
1750 return service
1752 async def _exec_get_command(
1753 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1754 ):
1755 """Obtains information about the kdu instance."""
1757 full_command = self._get_get_command(
1758 get_command, kdu_instance, namespace, kubeconfig
1759 )
1761 output, _rc = await self._local_async_exec(command=full_command)
1763 return output
1765 async def _exec_inspect_command(
1766 self, inspect_command: str, kdu_model: str, repo_url: str = None
1767 ):
1768 """Obtains information about an Helm Chart package (´helm show´ command)
1770 Args:
1771 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1772 kdu_model: The name or path of an Helm Chart
1773 repo_url: Helm Chart repository url
1775 Returns:
1776 str: the requested info about the Helm Chart package
1777 """
1779 repo_str = ""
1780 if repo_url:
1781 repo_str = " --repo {}".format(quote(repo_url))
1783 # Obtain the Chart's name and store it in the var kdu_model
1784 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1786 kdu_model, version = self._split_version(kdu_model)
1787 if version:
1788 version_str = "--version {}".format(quote(version))
1789 else:
1790 version_str = ""
1792 full_command = self._get_inspect_command(
1793 show_command=inspect_command,
1794 kdu_model=quote(kdu_model),
1795 repo_str=repo_str,
1796 version=version_str,
1797 )
1799 output, _ = await self._local_async_exec(command=full_command)
1801 return output
1803 async def _get_replica_count_url(
1804 self,
1805 kdu_model: str,
1806 repo_url: str = None,
1807 resource_name: str = None,
1808 ) -> tuple[int, str]:
1809 """Get the replica count value in the Helm Chart Values.
1811 Args:
1812 kdu_model: The name or path of an Helm Chart
1813 repo_url: Helm Chart repository url
1814 resource_name: Resource name
1816 Returns:
1817 A tuple with:
1818 - The number of replicas of the specific instance; if not found, returns None; and
1819 - The string corresponding to the replica count key in the Helm values
1820 """
1822 kdu_values = yaml.load(
1823 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1824 Loader=yaml.SafeLoader,
1825 )
1827 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1829 if not kdu_values:
1830 raise K8sException(
1831 "kdu_values not found for kdu_model {}".format(kdu_model)
1832 )
1834 if resource_name:
1835 kdu_values = kdu_values.get(resource_name, None)
1837 if not kdu_values:
1838 msg = "resource {} not found in the values in model {}".format(
1839 resource_name, kdu_model
1840 )
1841 self.log.error(msg)
1842 raise K8sException(msg)
1844 duplicate_check = False
1846 replica_str = ""
1847 replicas = None
1849 if kdu_values.get("replicaCount") is not None:
1850 replicas = kdu_values["replicaCount"]
1851 replica_str = "replicaCount"
1852 elif kdu_values.get("replicas") is not None:
1853 duplicate_check = True
1854 replicas = kdu_values["replicas"]
1855 replica_str = "replicas"
1856 else:
1857 if resource_name:
1858 msg = (
1859 "replicaCount or replicas not found in the resource"
1860 "{} values in model {}. Cannot be scaled".format(
1861 resource_name, kdu_model
1862 )
1863 )
1864 else:
1865 msg = (
1866 "replicaCount or replicas not found in the values"
1867 "in model {}. Cannot be scaled".format(kdu_model)
1868 )
1869 self.log.error(msg)
1870 raise K8sException(msg)
1872 # Control if replicas and replicaCount exists at the same time
1873 msg = "replicaCount and replicas are exists at the same time"
1874 if duplicate_check:
1875 if "replicaCount" in kdu_values:
1876 self.log.error(msg)
1877 raise K8sException(msg)
1878 else:
1879 if "replicas" in kdu_values:
1880 self.log.error(msg)
1881 raise K8sException(msg)
1883 return replicas, replica_str
1885 async def _get_replica_count_instance(
1886 self,
1887 kdu_instance: str,
1888 namespace: str,
1889 kubeconfig: str,
1890 resource_name: str = None,
1891 ) -> int:
1892 """Get the replica count value in the instance.
1894 Args:
1895 kdu_instance: The name of the KDU instance
1896 namespace: KDU instance namespace
1897 kubeconfig:
1898 resource_name: Resource name
1900 Returns:
1901 The number of replicas of the specific instance; if not found, returns None
1902 """
1904 kdu_values = yaml.load(
1905 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1906 Loader=yaml.SafeLoader,
1907 )
1909 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1911 replicas = None
1913 if kdu_values:
1914 resource_values = (
1915 kdu_values.get(resource_name, None) if resource_name else None
1916 )
1918 for replica_str in ("replicaCount", "replicas"):
1919 if resource_values:
1920 replicas = resource_values.get(replica_str)
1921 else:
1922 replicas = kdu_values.get(replica_str)
1924 if replicas is not None:
1925 break
1927 return replicas
1929 async def _store_status(
1930 self,
1931 cluster_id: str,
1932 operation: str,
1933 kdu_instance: str,
1934 namespace: str = None,
1935 db_dict: dict = None,
1936 ) -> None:
1937 """
1938 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1940 :param cluster_id (str): the cluster where the KDU instance is deployed
1941 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1942 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1943 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1944 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1945 values for the keys:
1946 - "collection": The Mongo DB collection to write to
1947 - "filter": The query filter to use in the update process
1948 - "path": The dot separated keys which targets the object to be updated
1949 Defaults to None.
1950 """
1952 try:
1953 detailed_status = await self._status_kdu(
1954 cluster_id=cluster_id,
1955 kdu_instance=kdu_instance,
1956 yaml_format=False,
1957 namespace=namespace,
1958 )
1960 status = detailed_status.get("info").get("description")
1961 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1963 # write status to db
1964 result = await self.write_app_status_to_db(
1965 db_dict=db_dict,
1966 status=str(status),
1967 detailed_status=str(detailed_status),
1968 operation=operation,
1969 )
1971 if not result:
1972 self.log.info("Error writing in database. Task exiting...")
1974 except asyncio.CancelledError as e:
1975 self.log.warning(
1976 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1977 )
1978 except Exception as e:
1979 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1981 # params for use in -f file
1982 # returns values file option and filename (in order to delete it at the end)
1983 def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
1984 if params and len(params) > 0:
1985 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1987 def get_random_number():
1988 r = random.SystemRandom().randint(1, 99999999)
1989 s = str(r)
1990 while len(s) < 10:
1991 s = "0" + s
1992 return s
1994 params2 = dict()
1995 for key in params:
1996 value = params.get(key)
1997 if "!!yaml" in str(value):
1998 value = yaml.safe_load(value[7:])
1999 params2[key] = value
2001 values_file = get_random_number() + ".yaml"
2002 with open(values_file, "w") as stream:
2003 yaml.dump(params2, stream, indent=4, default_flow_style=False)
2005 return "-f {}".format(values_file), values_file
2007 return "", None
2009 # params for use in --set option
2010 @staticmethod
2011 def _params_to_set_option(params: dict) -> str:
2012 pairs = [
2013 f"{quote(str(key))}={quote(str(value))}"
2014 for key, value in params.items()
2015 if value is not None
2016 ]
2017 if not pairs:
2018 return ""
2019 return "--set " + ",".join(pairs)
2021 @staticmethod
2022 def generate_kdu_instance_name(**kwargs):
2023 chart_name = kwargs["kdu_model"]
2024 # check embeded chart (file or dir)
2025 if chart_name.startswith("/"):
2026 # extract file or directory name
2027 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2028 # check URL
2029 elif "://" in chart_name:
2030 # extract last portion of URL
2031 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2033 name = ""
2034 for c in chart_name:
2035 if c.isalpha() or c.isnumeric():
2036 name += c
2037 else:
2038 name += "-"
2039 if len(name) > 35:
2040 name = name[0:35]
2042 # if does not start with alpha character, prefix 'a'
2043 if not name[0].isalpha():
2044 name = "a" + name
2046 name += "-"
2048 def get_random_number():
2049 r = random.SystemRandom().randint(1, 99999999)
2050 s = str(r)
2051 s = s.rjust(10, "0")
2052 return s
2054 name = name + get_random_number()
2055 return name.lower()
2057 def _split_version(self, kdu_model: str) -> tuple[str, str]:
2058 version = None
2059 if (
2060 not (
2061 self._is_helm_chart_a_file(kdu_model)
2062 or self._is_helm_chart_a_url(kdu_model)
2063 )
2064 and ":" in kdu_model
2065 ):
2066 parts = kdu_model.split(sep=":")
2067 if len(parts) == 2:
2068 version = str(parts[1])
2069 kdu_model = parts[0]
2070 return kdu_model, version
2072 def _split_repo(self, kdu_model: str) -> tuple[str, str]:
2073 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2075 Args:
2076 kdu_model (str): Associated KDU model
2078 Returns:
2079 (str, str): Tuple with the Chart name in index 0, and the repo name
2080 in index 2; if there was a problem finding them, return None
2081 for both
2082 """
2084 chart_name = None
2085 repo_name = None
2087 idx = kdu_model.find("/")
2088 if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
2089 chart_name = kdu_model[idx + 1 :]
2090 repo_name = kdu_model[:idx]
2092 return chart_name, repo_name
2094 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2095 """Obtain the Helm repository for an Helm Chart
2097 Args:
2098 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2099 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2101 Returns:
2102 str: the repository URL; if Helm Chart is a local one, the function returns None
2103 """
2105 _, repo_name = self._split_repo(kdu_model=kdu_model)
2107 repo_url = None
2108 if repo_name:
2109 # Find repository link
2110 local_repo_list = await self.repo_list(cluster_uuid)
2111 for repo in local_repo_list:
2112 if repo["name"] == repo_name:
2113 repo_url = repo["url"]
2114 break # it is not necessary to continue the loop if the repo link was found...
2116 return repo_url
2118 def _repo_to_oci_url(self, repo):
2119 db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
2120 if db_repo and "oci" in db_repo:
2121 return db_repo.get("url")
2123 async def _prepare_helm_chart(self, kdu_model, cluster_id):
2124 # e.g.: "stable/openldap", "1.0"
2125 kdu_model, version = self._split_version(kdu_model)
2126 # e.g.: "openldap, stable"
2127 chart_name, repo = self._split_repo(kdu_model)
2128 if repo and chart_name: # repo/chart case
2129 oci_url = self._repo_to_oci_url(repo)
2130 if oci_url: # oci does not require helm repo update
2131 kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2132 else:
2133 await self.repo_update(cluster_id, repo)
2134 return kdu_model, version
2136 async def create_certificate(
2137 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2138 ):
2139 paths, env = self._init_paths_env(
2140 cluster_name=cluster_uuid, create_if_not_exist=True
2141 )
2142 kubectl = Kubectl(config_file=paths["kube_config"])
2143 await kubectl.create_certificate(
2144 namespace=namespace,
2145 name=name,
2146 dns_prefix=dns_prefix,
2147 secret_name=secret_name,
2148 usages=[usage],
2149 issuer_name="ca-issuer",
2150 )
2152 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2153 paths, env = self._init_paths_env(
2154 cluster_name=cluster_uuid, create_if_not_exist=True
2155 )
2156 kubectl = Kubectl(config_file=paths["kube_config"])
2157 await kubectl.delete_certificate(namespace, certificate_name)
2159 async def create_namespace(
2160 self,
2161 namespace,
2162 cluster_uuid,
2163 labels,
2164 ):
2165 """
2166 Create a namespace in a specific cluster
2168 :param namespace: Namespace to be created
2169 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2170 :param labels: Dictionary with labels for the new namespace
2171 :returns: None
2172 """
2173 paths, env = self._init_paths_env(
2174 cluster_name=cluster_uuid, create_if_not_exist=True
2175 )
2176 kubectl = Kubectl(config_file=paths["kube_config"])
2177 await kubectl.create_namespace(
2178 name=namespace,
2179 labels=labels,
2180 )
2182 async def delete_namespace(
2183 self,
2184 namespace,
2185 cluster_uuid,
2186 ):
2187 """
2188 Delete a namespace in a specific cluster
2190 :param namespace: namespace to be deleted
2191 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2192 :returns: None
2193 """
2194 paths, env = self._init_paths_env(
2195 cluster_name=cluster_uuid, create_if_not_exist=True
2196 )
2197 kubectl = Kubectl(config_file=paths["kube_config"])
2198 await kubectl.delete_namespace(
2199 name=namespace,
2200 )
2202 async def copy_secret_data(
2203 self,
2204 src_secret: str,
2205 dst_secret: str,
2206 cluster_uuid: str,
2207 data_key: str,
2208 src_namespace: str = "osm",
2209 dst_namespace: str = "osm",
2210 ):
2211 """
2212 Copy a single key and value from an existing secret to a new one
2214 :param src_secret: name of the existing secret
2215 :param dst_secret: name of the new secret
2216 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2217 :param data_key: key of the existing secret to be copied
2218 :param src_namespace: Namespace of the existing secret
2219 :param dst_namespace: Namespace of the new secret
2220 :returns: None
2221 """
2222 paths, env = self._init_paths_env(
2223 cluster_name=cluster_uuid, create_if_not_exist=True
2224 )
2225 kubectl = Kubectl(config_file=paths["kube_config"])
2226 secret_data = await kubectl.get_secret_content(
2227 name=src_secret,
2228 namespace=src_namespace,
2229 )
2230 # Only the corresponding data_key value needs to be copy
2231 data = {data_key: secret_data.get(data_key)}
2232 await kubectl.create_secret(
2233 name=dst_secret,
2234 data=data,
2235 namespace=dst_namespace,
2236 secret_type="Opaque",
2237 )
2239 async def setup_default_rbac(
2240 self,
2241 name,
2242 namespace,
2243 cluster_uuid,
2244 api_groups,
2245 resources,
2246 verbs,
2247 service_account,
2248 ):
2249 """
2250 Create a basic RBAC for a new namespace.
2252 :param name: name of both Role and Role Binding
2253 :param namespace: K8s namespace
2254 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2255 :param api_groups: Api groups to be allowed in Policy Rule
2256 :param resources: Resources to be allowed in Policy Rule
2257 :param verbs: Verbs to be allowed in Policy Rule
2258 :param service_account: Service Account name used to bind the Role
2259 :returns: None
2260 """
2261 paths, env = self._init_paths_env(
2262 cluster_name=cluster_uuid, create_if_not_exist=True
2263 )
2264 kubectl = Kubectl(config_file=paths["kube_config"])
2265 await kubectl.create_role(
2266 name=name,
2267 labels={},
2268 namespace=namespace,
2269 api_groups=api_groups,
2270 resources=resources,
2271 verbs=verbs,
2272 )
2273 await kubectl.create_role_binding(
2274 name=name,
2275 labels={},
2276 namespace=namespace,
2277 role_name=name,
2278 sa_name=service_account,
2279 )