1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from kubernetes
import client
, config
18 from kubernetes
.client
.rest
import ApiException
21 CORE_CLIENT
= "core_v1"
22 STORAGE_CLIENT
= "storage_v1"
23 RBAC_CLIENT
= "rbac_v1"
27 def __init__(self
, config_file
=None):
28 config
.load_kube_config(config_file
=config_file
)
30 "core_v1": client
.CoreV1Api(),
31 "storage_v1": client
.StorageV1Api(),
32 "rbac_v1": client
.RbacAuthorizationV1Api(),
34 self
._configuration
= config
.kube_config
.Configuration()
35 self
.logger
= logging
.getLogger("Kubectl")
38 def configuration(self
):
39 return self
._configuration
45 def get_services(self
, field_selector
=None, label_selector
=None):
48 kwargs
["field_selector"] = field_selector
50 kwargs
["label_selector"] = label_selector
52 result
= self
.clients
[CORE_CLIENT
].list_service_for_all_namespaces(**kwargs
)
55 "name": i
.metadata
.name
,
56 "cluster_ip": i
.spec
.cluster_ip
,
61 "node_port": p
.node_port
,
63 "protocol": p
.protocol
,
64 "target_port": p
.target_port
,
70 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
71 if i
.status
.load_balancer
.ingress
76 except ApiException
as e
:
77 self
.logger
.error("Error calling get services: {}".format(e
))
80 def get_default_storage_class(self
) -> str:
84 :return: Returns the default storage class name, if exists.
85 If not, it returns the first storage class.
86 If there are not storage classes, returns None
88 storage_classes
= self
.clients
[STORAGE_CLIENT
].list_storage_class()
90 default_sc_annotations
= {
91 "storageclass.kubernetes.io/is-default-class": "true",
92 # Older clusters still use the beta annotation.
93 "storageclass.beta.kubernetes.io/is-default-class": "true",
95 for sc
in storage_classes
.items
:
97 # Select the first storage class in case there is no a default-class
98 selected_sc
= sc
.metadata
.name
99 annotations
= sc
.metadata
.annotations
or {}
101 k
in annotations
and annotations
[k
] == v
102 for k
, v
in default_sc_annotations
.items()
105 selected_sc
= sc
.metadata
.name