1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from typing
import Dict
22 from distutils
.version
import LooseVersion
24 from kubernetes
import client
, config
25 from kubernetes
.client
.api
import VersionApi
26 from kubernetes
.client
.models
import (
37 from kubernetes
.client
.rest
import ApiException
38 from retrying_async
import retry
41 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
42 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
44 CORE_CLIENT
= "core_v1"
45 RBAC_CLIENT
= "rbac_v1"
46 STORAGE_CLIENT
= "storage_v1"
47 CUSTOM_OBJECT_CLIENT
= "custom_object"
51 def __init__(self
, config_file
=None):
52 config
.load_kube_config(config_file
=config_file
)
54 CORE_CLIENT
: client
.CoreV1Api(),
55 RBAC_CLIENT
: client
.RbacAuthorizationV1Api(),
56 STORAGE_CLIENT
: client
.StorageV1Api(),
57 CUSTOM_OBJECT_CLIENT
: client
.CustomObjectsApi(),
59 self
._configuration
= config
.kube_config
.Configuration
.get_default_copy()
60 self
.logger
= logging
.getLogger("Kubectl")
63 def configuration(self
):
64 return self
._configuration
72 field_selector
: str = None,
73 label_selector
: str = None,
74 ) -> typing
.List
[typing
.Dict
]:
76 Get Service list from a namespace
78 :param: field_selector: Kubernetes field selector for the namespace
79 :param: label_selector: Kubernetes label selector for the namespace
81 :return: List of the services matching the selectors specified
85 kwargs
["field_selector"] = field_selector
87 kwargs
["label_selector"] = label_selector
89 result
= self
.clients
[CORE_CLIENT
].list_service_for_all_namespaces(**kwargs
)
92 "name": i
.metadata
.name
,
93 "cluster_ip": i
.spec
.cluster_ip
,
98 "node_port": p
.node_port
,
100 "protocol": p
.protocol
,
101 "target_port": p
.target_port
,
103 for p
in i
.spec
.ports
107 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
108 if i
.status
.load_balancer
.ingress
111 for i
in result
.items
113 except ApiException
as e
:
114 self
.logger
.error("Error calling get services: {}".format(e
))
117 def get_default_storage_class(self
) -> str:
119 Default storage class
121 :return: Returns the default storage class name, if exists.
122 If not, it returns the first storage class.
123 If there are not storage classes, returns None
125 storage_classes
= self
.clients
[STORAGE_CLIENT
].list_storage_class()
127 default_sc_annotations
= {
128 "storageclass.kubernetes.io/is-default-class": "true",
129 # Older clusters still use the beta annotation.
130 "storageclass.beta.kubernetes.io/is-default-class": "true",
132 for sc
in storage_classes
.items
:
134 # Select the first storage class in case there is no a default-class
135 selected_sc
= sc
.metadata
.name
136 annotations
= sc
.metadata
.annotations
or {}
138 k
in annotations
and annotations
[k
] == v
139 for k
, v
in default_sc_annotations
.items()
142 selected_sc
= sc
.metadata
.name
146 def create_cluster_role(
149 labels
: Dict
[str, str],
150 namespace
: str = "kube-system",
153 Create a cluster role
155 :param: name: Name of the cluster role
156 :param: labels: Labels for cluster role metadata
157 :param: namespace: Kubernetes namespace for cluster role metadata
160 cluster_roles
= self
.clients
[RBAC_CLIENT
].list_cluster_role(
161 field_selector
="metadata.name={}".format(name
)
164 if len(cluster_roles
.items
) > 0:
166 "Cluster role with metadata.name={} already exists".format(name
)
169 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
171 cluster_role
= V1ClusterRole(
174 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
175 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
179 self
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
181 def delete_cluster_role(self
, name
: str):
183 Delete a cluster role
185 :param: name: Name of the cluster role
187 self
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
189 def _get_kubectl_version(self
):
190 version
= VersionApi().get_code()
191 return "{}.{}".format(version
.major
, version
.minor
)
193 def _need_to_create_new_secret(self
):
194 min_k8s_version
= "1.24"
195 current_k8s_version
= self
._get
_kubectl
_version
()
196 return LooseVersion(min_k8s_version
) <= LooseVersion(current_k8s_version
)
198 def _get_secret_name(self
, service_account_name
: str):
199 random_alphanum
= str(uuid
.uuid4())[:5]
200 return "{}-token-{}".format(service_account_name
, random_alphanum
)
202 def _create_service_account_secret(
203 self
, service_account_name
: str, namespace
: str, secret_name
: str
206 Create a secret for the service account. K8s version >= 1.24
208 :param: service_account_name: Name of the service account
209 :param: namespace: Kubernetes namespace for service account metadata
210 :param: secret_name: Name of the secret
212 v1_core
= self
.clients
[CORE_CLIENT
]
213 secrets
= v1_core
.list_namespaced_secret(
214 namespace
, field_selector
="metadata.name={}".format(secret_name
)
219 "Secret with metadata.name={} already exists".format(secret_name
)
222 annotations
= {"kubernetes.io/service-account.name": service_account_name
}
223 metadata
= V1ObjectMeta(
224 name
=secret_name
, namespace
=namespace
, annotations
=annotations
226 type = "kubernetes.io/service-account-token"
227 secret
= V1Secret(metadata
=metadata
, type=type)
228 v1_core
.create_namespaced_secret(namespace
, secret
)
230 def _get_secret_reference_list(self
, namespace
: str, secret_name
: str):
232 Return a secret reference list with one secret.
235 :param: namespace: Kubernetes namespace for service account metadata
236 :param: secret_name: Name of the secret
237 :rtype: list[V1SecretReference]
239 return [V1SecretReference(name
=secret_name
, namespace
=namespace
)]
241 def create_service_account(
244 labels
: Dict
[str, str],
245 namespace
: str = "kube-system",
248 Create a service account
250 :param: name: Name of the service account
251 :param: labels: Labels for service account metadata
252 :param: namespace: Kubernetes namespace for service account metadata
255 v1_core
= self
.clients
[CORE_CLIENT
]
256 service_accounts
= v1_core
.list_namespaced_service_account(
257 namespace
, field_selector
="metadata.name={}".format(name
)
259 if len(service_accounts
.items
) > 0:
261 "Service account with metadata.name={} already exists".format(name
)
264 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
266 if self
._need
_to
_create
_new
_secret
():
267 secret_name
= self
._get
_secret
_name
(name
)
268 secrets
= self
._get
_secret
_reference
_list
(namespace
, secret_name
)
269 service_account
= V1ServiceAccount(metadata
=metadata
, secrets
=secrets
)
270 v1_core
.create_namespaced_service_account(namespace
, service_account
)
271 self
._create
_service
_account
_secret
(name
, namespace
, secret_name
)
273 service_account
= V1ServiceAccount(metadata
=metadata
)
274 v1_core
.create_namespaced_service_account(namespace
, service_account
)
276 def delete_service_account(self
, name
: str, namespace
: str = "kube-system"):
278 Delete a service account
280 :param: name: Name of the service account
281 :param: namespace: Kubernetes namespace for service account metadata
284 self
.clients
[CORE_CLIENT
].delete_namespaced_service_account(name
, namespace
)
286 def create_cluster_role_binding(
287 self
, name
: str, labels
: Dict
[str, str], namespace
: str = "kube-system"
290 Create a cluster role binding
292 :param: name: Name of the cluster role
293 :param: labels: Labels for cluster role binding metadata
294 :param: namespace: Kubernetes namespace for cluster role binding metadata
297 role_bindings
= self
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
298 field_selector
="metadata.name={}".format(name
)
300 if len(role_bindings
.items
) > 0:
301 raise Exception("Generated rbac id already exists")
303 role_binding
= V1ClusterRoleBinding(
304 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
305 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
306 subjects
=[V1Subject(kind
="ServiceAccount", name
=name
, namespace
=namespace
)],
308 self
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
310 def delete_cluster_role_binding(self
, name
: str):
312 Delete a cluster role binding
314 :param: name: Name of the cluster role binding
316 self
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
321 fallback
=Exception("Failed getting the secret from service account"),
323 async def get_secret_data(
324 self
, name
: str, namespace
: str = "kube-system"
329 :param: name: Name of the secret data
330 :param: namespace: Name of the namespace where the secret is stored
332 :return: Tuple with the token and client certificate
334 v1_core
= self
.clients
[CORE_CLIENT
]
338 service_accounts
= v1_core
.list_namespaced_service_account(
339 namespace
, field_selector
="metadata.name={}".format(name
)
341 if len(service_accounts
.items
) == 0:
343 "Service account not found with metadata.name={}".format(name
)
345 service_account
= service_accounts
.items
[0]
346 if service_account
.secrets
and len(service_account
.secrets
) > 0:
347 secret_name
= service_account
.secrets
[0].name
350 "Failed getting the secret from service account {}".format(name
)
352 secret
= v1_core
.list_namespaced_secret(
353 namespace
, field_selector
="metadata.name={}".format(secret_name
)
356 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
357 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
360 base64
.b64decode(token
).decode("utf-8"),
361 base64
.b64decode(client_certificate_data
).decode("utf-8"),
364 async def create_certificate(
374 Creates cert-manager certificate object
376 :param: namespace: Name of the namespace where the certificate and secret is stored
377 :param: name: Name of the certificate object
378 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
379 :param: secret_name: Name of the secret created by cert-manager
380 :param: usages: List of X.509 key usages
381 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
385 "apiVersion": "cert-manager.io/v1",
386 "kind": "Certificate",
387 "metadata": {"name": name
, "namespace": namespace
},
389 "secretName": secret_name
,
391 "rotationPolicy": "Always",
392 "algorithm": "ECDSA",
395 "duration": "8760h", # 1 Year
396 "renewBefore": "2208h", # 9 months
397 "subject": {"organizations": ["osm"]},
402 "{}.{}".format(dns_prefix
, namespace
),
403 "{}.{}.svc".format(dns_prefix
, namespace
),
404 "{}.{}.svc.cluster".format(dns_prefix
, namespace
),
405 "{}.{}.svc.cluster.local".format(dns_prefix
, namespace
),
407 "issuerRef": {"name": issuer_name
, "kind": "ClusterIssuer"},
410 client
= self
.clients
[CUSTOM_OBJECT_CLIENT
]
412 client
.create_namespaced_custom_object(
413 group
="cert-manager.io",
414 plural
="certificates",
416 body
=certificate_body
,
419 except ApiException
as e
:
420 info
= json
.loads(e
.body
)
421 if info
.get("reason").lower() == "alreadyexists":
422 self
.logger
.warning("Certificate already exists: {}".format(e
))
426 async def delete_certificate(self
, namespace
, object_name
):
427 client
= self
.clients
[CUSTOM_OBJECT_CLIENT
]
429 client
.delete_namespaced_custom_object(
430 group
="cert-manager.io",
431 plural
="certificates",
436 except ApiException
as e
:
437 info
= json
.loads(e
.body
)
438 if info
.get("reason").lower() == "notfound":
439 self
.logger
.warning("Certificate already deleted: {}".format(e
))