1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from kubernetes
import client
, config
16 from kubernetes
.client
.rest
import ApiException
21 def __init__(self
, config_file
=None):
22 config
.load_kube_config(config_file
=config_file
)
23 self
.logger
= logging
.getLogger("Kubectl")
25 def get_configuration(self
):
26 return config
.kube_config
.Configuration()
28 def get_services(self
, field_selector
=None, label_selector
=None):
31 kwargs
["field_selector"] = field_selector
33 kwargs
["label_selector"] = label_selector
36 v1
= client
.CoreV1Api()
37 result
= v1
.list_service_for_all_namespaces(**kwargs
)
40 "name": i
.metadata
.name
,
41 "cluster_ip": i
.spec
.cluster_ip
,
46 "node_port": p
.node_port
,
48 "protocol": p
.protocol
,
49 "target_port": p
.target_port
,
55 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
56 if i
.status
.load_balancer
.ingress
61 except ApiException
as e
:
62 self
.logger
.error("Error calling get services: {}".format(e
))
65 def get_default_storage_class(self
) -> str:
69 :return: Returns the default storage class name, if exists.
70 If not, it returns the first storage class.
71 If there are not storage classes, returns None
74 storagev1
= client
.StorageV1Api()
75 storage_classes
= storagev1
.list_storage_class()
77 default_sc_annotations
= {
78 "storageclass.kubernetes.io/is-default-class": "true",
79 # Older clusters still use the beta annotation.
80 "storageclass.beta.kubernetes.io/is-default-class": "true",
82 for sc
in storage_classes
.items
:
84 # Select the first storage class in case there is no a default-class
85 selected_sc
= sc
.metadata
.name
86 annotations
= sc
.metadata
.annotations
88 k
in annotations
and annotations
[k
] == v
89 for k
, v
in default_sc_annotations
.items()
92 selected_sc
= sc
.metadata
.name