1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from typing
import Dict
21 from distutils
.version
import LooseVersion
23 from kubernetes
import client
, config
24 from kubernetes
.client
.api
import VersionApi
25 from kubernetes
.client
.models
import (
36 from kubernetes
.client
.rest
import ApiException
37 from retrying_async
import retry
40 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
41 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
43 CORE_CLIENT
= "core_v1"
44 RBAC_CLIENT
= "rbac_v1"
45 STORAGE_CLIENT
= "storage_v1"
49 def __init__(self
, config_file
=None):
50 config
.load_kube_config(config_file
=config_file
)
52 CORE_CLIENT
: client
.CoreV1Api(),
53 RBAC_CLIENT
: client
.RbacAuthorizationV1Api(),
54 STORAGE_CLIENT
: client
.StorageV1Api(),
56 self
._configuration
= config
.kube_config
.Configuration
.get_default_copy()
57 self
.logger
= logging
.getLogger("Kubectl")
60 def configuration(self
):
61 return self
._configuration
69 field_selector
: str = None,
70 label_selector
: str = None,
71 ) -> typing
.List
[typing
.Dict
]:
73 Get Service list from a namespace
75 :param: field_selector: Kubernetes field selector for the namespace
76 :param: label_selector: Kubernetes label selector for the namespace
78 :return: List of the services matching the selectors specified
82 kwargs
["field_selector"] = field_selector
84 kwargs
["label_selector"] = label_selector
86 result
= self
.clients
[CORE_CLIENT
].list_service_for_all_namespaces(**kwargs
)
89 "name": i
.metadata
.name
,
90 "cluster_ip": i
.spec
.cluster_ip
,
95 "node_port": p
.node_port
,
97 "protocol": p
.protocol
,
98 "target_port": p
.target_port
,
100 for p
in i
.spec
.ports
104 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
105 if i
.status
.load_balancer
.ingress
108 for i
in result
.items
110 except ApiException
as e
:
111 self
.logger
.error("Error calling get services: {}".format(e
))
114 def get_default_storage_class(self
) -> str:
116 Default storage class
118 :return: Returns the default storage class name, if exists.
119 If not, it returns the first storage class.
120 If there are not storage classes, returns None
122 storage_classes
= self
.clients
[STORAGE_CLIENT
].list_storage_class()
124 default_sc_annotations
= {
125 "storageclass.kubernetes.io/is-default-class": "true",
126 # Older clusters still use the beta annotation.
127 "storageclass.beta.kubernetes.io/is-default-class": "true",
129 for sc
in storage_classes
.items
:
131 # Select the first storage class in case there is no a default-class
132 selected_sc
= sc
.metadata
.name
133 annotations
= sc
.metadata
.annotations
or {}
135 k
in annotations
and annotations
[k
] == v
136 for k
, v
in default_sc_annotations
.items()
139 selected_sc
= sc
.metadata
.name
143 def create_cluster_role(
146 labels
: Dict
[str, str],
147 namespace
: str = "kube-system",
150 Create a cluster role
152 :param: name: Name of the cluster role
153 :param: labels: Labels for cluster role metadata
154 :param: namespace: Kubernetes namespace for cluster role metadata
157 cluster_roles
= self
.clients
[RBAC_CLIENT
].list_cluster_role(
158 field_selector
="metadata.name={}".format(name
)
161 if len(cluster_roles
.items
) > 0:
163 "Cluster role with metadata.name={} already exists".format(name
)
166 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
168 cluster_role
= V1ClusterRole(
171 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
172 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
176 self
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
178 def delete_cluster_role(self
, name
: str):
180 Delete a cluster role
182 :param: name: Name of the cluster role
184 self
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
186 def _get_kubectl_version(self
):
187 version
= VersionApi().get_code()
188 return "{}.{}".format(version
.major
, version
.minor
)
190 def _need_to_create_new_secret(self
):
191 min_k8s_version
= "1.24"
192 current_k8s_version
= self
._get
_kubectl
_version
()
193 return LooseVersion(min_k8s_version
) <= LooseVersion(current_k8s_version
)
195 def _get_secret_name(self
, service_account_name
: str):
196 random_alphanum
= str(uuid
.uuid4())[:5]
197 return "{}-token-{}".format(service_account_name
, random_alphanum
)
199 def _create_service_account_secret(
200 self
, service_account_name
: str, namespace
: str, secret_name
: str
203 Create a secret for the service account. K8s version >= 1.24
205 :param: service_account_name: Name of the service account
206 :param: namespace: Kubernetes namespace for service account metadata
207 :param: secret_name: Name of the secret
209 v1_core
= self
.clients
[CORE_CLIENT
]
210 secrets
= v1_core
.list_namespaced_secret(
211 namespace
, field_selector
="metadata.name={}".format(secret_name
)
216 "Secret with metadata.name={} already exists".format(secret_name
)
219 annotations
= {"kubernetes.io/service-account.name": service_account_name
}
220 metadata
= V1ObjectMeta(
221 name
=secret_name
, namespace
=namespace
, annotations
=annotations
223 type = "kubernetes.io/service-account-token"
224 secret
= V1Secret(metadata
=metadata
, type=type)
225 v1_core
.create_namespaced_secret(namespace
, secret
)
227 def _get_secret_reference_list(self
, namespace
: str, secret_name
: str):
229 Return a secret reference list with one secret.
232 :param: namespace: Kubernetes namespace for service account metadata
233 :param: secret_name: Name of the secret
234 :rtype: list[V1SecretReference]
236 return [V1SecretReference(name
=secret_name
, namespace
=namespace
)]
238 def create_service_account(
241 labels
: Dict
[str, str],
242 namespace
: str = "kube-system",
245 Create a service account
247 :param: name: Name of the service account
248 :param: labels: Labels for service account metadata
249 :param: namespace: Kubernetes namespace for service account metadata
252 v1_core
= self
.clients
[CORE_CLIENT
]
253 service_accounts
= v1_core
.list_namespaced_service_account(
254 namespace
, field_selector
="metadata.name={}".format(name
)
256 if len(service_accounts
.items
) > 0:
258 "Service account with metadata.name={} already exists".format(name
)
261 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
263 if self
._need
_to
_create
_new
_secret
():
264 secret_name
= self
._get
_secret
_name
(name
)
265 secrets
= self
._get
_secret
_reference
_list
(namespace
, secret_name
)
266 service_account
= V1ServiceAccount(metadata
=metadata
, secrets
=secrets
)
267 v1_core
.create_namespaced_service_account(namespace
, service_account
)
268 self
._create
_service
_account
_secret
(name
, namespace
, secret_name
)
270 service_account
= V1ServiceAccount(metadata
=metadata
)
271 v1_core
.create_namespaced_service_account(namespace
, service_account
)
273 def delete_service_account(self
, name
: str, namespace
: str = "kube-system"):
275 Delete a service account
277 :param: name: Name of the service account
278 :param: namespace: Kubernetes namespace for service account metadata
281 self
.clients
[CORE_CLIENT
].delete_namespaced_service_account(name
, namespace
)
283 def create_cluster_role_binding(
284 self
, name
: str, labels
: Dict
[str, str], namespace
: str = "kube-system"
287 Create a cluster role binding
289 :param: name: Name of the cluster role
290 :param: labels: Labels for cluster role binding metadata
291 :param: namespace: Kubernetes namespace for cluster role binding metadata
294 role_bindings
= self
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
295 field_selector
="metadata.name={}".format(name
)
297 if len(role_bindings
.items
) > 0:
298 raise Exception("Generated rbac id already exists")
300 role_binding
= V1ClusterRoleBinding(
301 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
302 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
303 subjects
=[V1Subject(kind
="ServiceAccount", name
=name
, namespace
=namespace
)],
305 self
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
307 def delete_cluster_role_binding(self
, name
: str):
309 Delete a cluster role binding
311 :param: name: Name of the cluster role binding
313 self
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
318 fallback
=Exception("Failed getting the secret from service account"),
320 async def get_secret_data(
321 self
, name
: str, namespace
: str = "kube-system"
326 :param: name: Name of the secret data
327 :param: namespace: Name of the namespace where the secret is stored
329 :return: Tuple with the token and client certificate
331 v1_core
= self
.clients
[CORE_CLIENT
]
335 service_accounts
= v1_core
.list_namespaced_service_account(
336 namespace
, field_selector
="metadata.name={}".format(name
)
338 if len(service_accounts
.items
) == 0:
340 "Service account not found with metadata.name={}".format(name
)
342 service_account
= service_accounts
.items
[0]
343 if service_account
.secrets
and len(service_account
.secrets
) > 0:
344 secret_name
= service_account
.secrets
[0].name
347 "Failed getting the secret from service account {}".format(name
)
349 secret
= v1_core
.list_namespaced_secret(
350 namespace
, field_selector
="metadata.name={}".format(secret_name
)
353 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
354 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
357 base64
.b64decode(token
).decode("utf-8"),
358 base64
.b64decode(client_certificate_data
).decode("utf-8"),