# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from kubernetes import client, config
from kubernetes.client.rest import ApiException
-import logging
+
+
+CORE_CLIENT = "core_v1"
+STORAGE_CLIENT = "storage_v1"
+RBAC_CLIENT = "rbac_v1"
class Kubectl:
def __init__(self, config_file=None):
config.load_kube_config(config_file=config_file)
+ self._clients = {
+ "core_v1": client.CoreV1Api(),
+ "storage_v1": client.StorageV1Api(),
+ "rbac_v1": client.RbacAuthorizationV1Api(),
+ }
+ self._configuration = config.kube_config.Configuration()
self.logger = logging.getLogger("Kubectl")
- def get_configuration(self):
- return config.kube_config.Configuration()
+ @property
+ def configuration(self):
+ return self._configuration
+
+ @property
+ def clients(self):
+ return self._clients
def get_services(self, field_selector=None, label_selector=None):
kwargs = {}
kwargs["field_selector"] = field_selector
if label_selector:
kwargs["label_selector"] = label_selector
-
try:
- v1 = client.CoreV1Api()
- result = v1.list_service_for_all_namespaces(**kwargs)
+ result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
return [
{
"name": i.metadata.name,
If not, it returns the first storage class.
If there are not storage classes, returns None
"""
-
- storagev1 = client.StorageV1Api()
- storage_classes = storagev1.list_storage_class()
+ storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
selected_sc = None
default_sc_annotations = {
"storageclass.kubernetes.io/is-default-class": "true",
if not selected_sc:
# Select the first storage class in case there is no a default-class
selected_sc = sc.metadata.name
- annotations = sc.metadata.annotations
+ annotations = sc.metadata.annotations or {}
if any(
k in annotations and annotations[k] == v
for k, v in default_sc_annotations.items()