import typing
import uuid
import json
+import yaml
import tarfile
import io
from time import sleep
self.logger = logging.getLogger("lcm.kubectl")
self._config_file = config_file
self.logger.info(f"Kubectl cfg file: {config_file}")
- # kconfig.load_kube_config(config_file=config_file)
- self._api_client = kconfig.new_client_from_config(config_file=config_file)
+
+ # Create default configuration for API client
+ self._configuration = kclient.Configuration()
+
+ # Get proxy_url
+ proxy_url = None
+ if config_file:
+ with open(config_file, "r", encoding="utf-8") as f:
+ kubeconfig_yaml = f.read()
+ try:
+ kubeconfig_dict = yaml.safe_load(kubeconfig_yaml)
+ except yaml.YAMLError as e:
+ raise e
+ proxy_url = (
+ kubeconfig_dict.get("clusters", [])[0]
+ .get("cluster", {})
+ .get("proxy-url")
+ )
+
+ # If kubeconfig has proxy configured, use it
+ if proxy_url:
+ self._configuration.proxy = proxy_url
+ self.logger.info(f"Using proxy for kubernetes: {proxy_url}")
+
+ # Create API client
+ self._api_client = kconfig.new_client_from_config(
+ config_file=config_file,
+ client_configuration=self._configuration,
+ )
+ # self._configuration = self._api_client.configuration.get_default_copy()
+
+ # Carga la config base
self._clients = {
CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
}
- self._configuration = self._api_client.configuration.get_default_copy()
self.logger.info(f"Kubectl cfg file: {config_file}")
@property