1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from typing
import Dict
21 from kubernetes
import client
, config
22 from kubernetes
.client
.models
import (
31 from kubernetes
.client
.rest
import ApiException
32 from retrying_async
import retry
35 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
36 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
38 CORE_CLIENT
= "core_v1"
39 RBAC_CLIENT
= "rbac_v1"
40 STORAGE_CLIENT
= "storage_v1"
44 def __init__(self
, config_file
=None):
45 config
.load_kube_config(config_file
=config_file
)
47 CORE_CLIENT
: client
.CoreV1Api(),
48 RBAC_CLIENT
: client
.RbacAuthorizationV1Api(),
49 STORAGE_CLIENT
: client
.StorageV1Api(),
51 self
._configuration
= config
.kube_config
.Configuration
.get_default_copy()
52 self
.logger
= logging
.getLogger("Kubectl")
55 def configuration(self
):
56 return self
._configuration
64 field_selector
: str = None,
65 label_selector
: str = None,
66 ) -> typing
.List
[typing
.Dict
]:
68 Get Service list from a namespace
70 :param: field_selector: Kubernetes field selector for the namespace
71 :param: label_selector: Kubernetes label selector for the namespace
73 :return: List of the services matching the selectors specified
77 kwargs
["field_selector"] = field_selector
79 kwargs
["label_selector"] = label_selector
81 result
= self
.clients
[CORE_CLIENT
].list_service_for_all_namespaces(**kwargs
)
84 "name": i
.metadata
.name
,
85 "cluster_ip": i
.spec
.cluster_ip
,
90 "node_port": p
.node_port
,
92 "protocol": p
.protocol
,
93 "target_port": p
.target_port
,
99 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
100 if i
.status
.load_balancer
.ingress
103 for i
in result
.items
105 except ApiException
as e
:
106 self
.logger
.error("Error calling get services: {}".format(e
))
109 def get_default_storage_class(self
) -> str:
111 Default storage class
113 :return: Returns the default storage class name, if exists.
114 If not, it returns the first storage class.
115 If there are not storage classes, returns None
117 storage_classes
= self
.clients
[STORAGE_CLIENT
].list_storage_class()
119 default_sc_annotations
= {
120 "storageclass.kubernetes.io/is-default-class": "true",
121 # Older clusters still use the beta annotation.
122 "storageclass.beta.kubernetes.io/is-default-class": "true",
124 for sc
in storage_classes
.items
:
126 # Select the first storage class in case there is no a default-class
127 selected_sc
= sc
.metadata
.name
128 annotations
= sc
.metadata
.annotations
or {}
130 k
in annotations
and annotations
[k
] == v
131 for k
, v
in default_sc_annotations
.items()
134 selected_sc
= sc
.metadata
.name
138 def create_cluster_role(
141 labels
: Dict
[str, str],
142 namespace
: str = "kube-system",
145 Create a cluster role
147 :param: name: Name of the cluster role
148 :param: labels: Labels for cluster role metadata
149 :param: namespace: Kubernetes namespace for cluster role metadata
152 cluster_roles
= self
.clients
[RBAC_CLIENT
].list_cluster_role(
153 field_selector
="metadata.name={}".format(name
)
156 if len(cluster_roles
.items
) > 0:
158 "Cluster role with metadata.name={} already exists".format(name
)
161 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
163 cluster_role
= V1ClusterRole(
166 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
167 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
171 self
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
173 def delete_cluster_role(self
, name
: str):
175 Delete a cluster role
177 :param: name: Name of the cluster role
179 self
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
181 def create_service_account(
184 labels
: Dict
[str, str],
185 namespace
: str = "kube-system",
188 Create a service account
190 :param: name: Name of the service account
191 :param: labels: Labels for service account metadata
192 :param: namespace: Kubernetes namespace for service account metadata
195 service_accounts
= self
.clients
[CORE_CLIENT
].list_namespaced_service_account(
196 namespace
, field_selector
="metadata.name={}".format(name
)
198 if len(service_accounts
.items
) > 0:
200 "Service account with metadata.name={} already exists".format(name
)
203 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
204 service_account
= V1ServiceAccount(metadata
=metadata
)
206 self
.clients
[CORE_CLIENT
].create_namespaced_service_account(
207 namespace
, service_account
210 def delete_service_account(self
, name
: str, namespace
: str = "kube-system"):
212 Delete a service account
214 :param: name: Name of the service account
215 :param: namespace: Kubernetes namespace for service account metadata
218 self
.clients
[CORE_CLIENT
].delete_namespaced_service_account(name
, namespace
)
220 def create_cluster_role_binding(
221 self
, name
: str, labels
: Dict
[str, str], namespace
: str = "kube-system"
224 Create a cluster role binding
226 :param: name: Name of the cluster role
227 :param: labels: Labels for cluster role binding metadata
228 :param: namespace: Kubernetes namespace for cluster role binding metadata
231 role_bindings
= self
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
232 field_selector
="metadata.name={}".format(name
)
234 if len(role_bindings
.items
) > 0:
235 raise Exception("Generated rbac id already exists")
237 role_binding
= V1ClusterRoleBinding(
238 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
239 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
240 subjects
=[V1Subject(kind
="ServiceAccount", name
=name
, namespace
=namespace
)],
242 self
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
244 def delete_cluster_role_binding(self
, name
: str):
246 Delete a cluster role binding
248 :param: name: Name of the cluster role binding
250 self
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
255 fallback
=Exception("Failed getting the secret from service account"),
257 async def get_secret_data(
258 self
, name
: str, namespace
: str = "kube-system"
263 :param: name: Name of the secret data
264 :param: namespace: Name of the namespace where the secret is stored
266 :return: Tuple with the token and client certificate
268 v1_core
= self
.clients
[CORE_CLIENT
]
272 service_accounts
= v1_core
.list_namespaced_service_account(
273 namespace
, field_selector
="metadata.name={}".format(name
)
275 if len(service_accounts
.items
) == 0:
277 "Service account not found with metadata.name={}".format(name
)
279 service_account
= service_accounts
.items
[0]
280 if service_account
.secrets
and len(service_account
.secrets
) > 0:
281 secret_name
= service_account
.secrets
[0].name
284 "Failed getting the secret from service account {}".format(name
)
286 secret
= v1_core
.list_namespaced_secret(
287 namespace
, field_selector
="metadata.name={}".format(secret_name
)
290 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
291 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
294 base64
.b64decode(token
).decode("utf-8"),
295 base64
.b64decode(client_certificate_data
).decode("utf-8"),