config.load_kube_config(config_file=config_file)
self.logger = logging.getLogger("Kubectl")
+ def get_configuration(self):
+ return config.kube_config.Configuration()
+
def get_services(self, field_selector=None, label_selector=None):
kwargs = {}
if field_selector:
"name": i.metadata.name,
"cluster_ip": i.spec.cluster_ip,
"type": i.spec.type,
- "ports": i.spec.ports,
+ "ports": [
+ {
+ "name": p.name,
+ "node_port": p.node_port,
+ "port": p.port,
+ "protocol": p.protocol,
+ "target_port": p.target_port,
+ }
+ for p in i.spec.ports
+ ]
+ if i.spec.ports
+ else [],
"external_ip": [i.ip for i in i.status.load_balancer.ingress]
if i.status.load_balancer.ingress
else None,
except ApiException as e:
self.logger.error("Error calling get services: {}".format(e))
raise e
+
+ def get_default_storage_class(self) -> str:
+ """
+ Default storage class
+
+ :return: Returns the default storage class name, if exists.
+ If not, it returns the first storage class.
+ If there are not storage classes, returns None
+ """
+
+ storagev1 = client.StorageV1Api()
+ storage_classes = storagev1.list_storage_class()
+ selected_sc = None
+ default_sc_annotations = {
+ "storageclass.kubernetes.io/is-default-class": "true",
+ # Older clusters still use the beta annotation.
+ "storageclass.beta.kubernetes.io/is-default-class": "true",
+ }
+ for sc in storage_classes.items:
+ if not selected_sc:
+ # Select the first storage class in case there is no a default-class
+ selected_sc = sc.metadata.name
+ annotations = sc.metadata.annotations
+ if any(
+ k in annotations and annotations[k] == v
+ for k, v in default_sc_annotations.items()
+ ):
+ # Default storage
+ selected_sc = sc.metadata.name
+ break
+ return selected_sc