| # Copyright 2020 Canonical Ltd. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| |
| from kubernetes import client, config |
| from kubernetes.client.rest import ApiException |
| |
| |
| CORE_CLIENT = "core_v1" |
| STORAGE_CLIENT = "storage_v1" |
| RBAC_CLIENT = "rbac_v1" |
| |
| |
| class Kubectl: |
| def __init__(self, config_file=None): |
| config.load_kube_config(config_file=config_file) |
| self._clients = { |
| "core_v1": client.CoreV1Api(), |
| "storage_v1": client.StorageV1Api(), |
| "rbac_v1": client.RbacAuthorizationV1Api(), |
| } |
| self._configuration = config.kube_config.Configuration() |
| self.logger = logging.getLogger("Kubectl") |
| |
| @property |
| def configuration(self): |
| return self._configuration |
| |
| @property |
| def clients(self): |
| return self._clients |
| |
| def get_services(self, field_selector=None, label_selector=None): |
| kwargs = {} |
| if field_selector: |
| kwargs["field_selector"] = field_selector |
| if label_selector: |
| kwargs["label_selector"] = label_selector |
| try: |
| result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) |
| return [ |
| { |
| "name": i.metadata.name, |
| "cluster_ip": i.spec.cluster_ip, |
| "type": i.spec.type, |
| "ports": [ |
| { |
| "name": p.name, |
| "node_port": p.node_port, |
| "port": p.port, |
| "protocol": p.protocol, |
| "target_port": p.target_port, |
| } |
| for p in i.spec.ports |
| ] |
| if i.spec.ports |
| else [], |
| "external_ip": [i.ip for i in i.status.load_balancer.ingress] |
| if i.status.load_balancer.ingress |
| else None, |
| } |
| for i in result.items |
| ] |
| except ApiException as e: |
| self.logger.error("Error calling get services: {}".format(e)) |
| raise e |
| |
| def get_default_storage_class(self) -> str: |
| """ |
| Default storage class |
| |
| :return: Returns the default storage class name, if exists. |
| If not, it returns the first storage class. |
| If there are not storage classes, returns None |
| """ |
| storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() |
| selected_sc = None |
| default_sc_annotations = { |
| "storageclass.kubernetes.io/is-default-class": "true", |
| # Older clusters still use the beta annotation. |
| "storageclass.beta.kubernetes.io/is-default-class": "true", |
| } |
| for sc in storage_classes.items: |
| if not selected_sc: |
| # Select the first storage class in case there is no a default-class |
| selected_sc = sc.metadata.name |
| annotations = sc.metadata.annotations |
| if any( |
| k in annotations and annotations[k] == v |
| for k, v in default_sc_annotations.items() |
| ): |
| # Default storage |
| selected_sc = sc.metadata.name |
| break |
| return selected_sc |