1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from typing
import Dict
22 from distutils
.version
import LooseVersion
24 from kubernetes
import client
, config
25 from kubernetes
.client
.api
import VersionApi
26 from kubernetes
.client
.models
import (
37 from kubernetes
.client
.rest
import ApiException
38 from n2vc
.libjuju
import retry_callback
39 from retrying_async
import retry
42 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
45 CORE_CLIENT
= "core_v1"
46 RBAC_CLIENT
= "rbac_v1"
47 STORAGE_CLIENT
= "storage_v1"
48 CUSTOM_OBJECT_CLIENT
= "custom_object"
52 def __init__(self
, config_file
=None):
53 config
.load_kube_config(config_file
=config_file
)
55 CORE_CLIENT
: client
.CoreV1Api(),
56 RBAC_CLIENT
: client
.RbacAuthorizationV1Api(),
57 STORAGE_CLIENT
: client
.StorageV1Api(),
58 CUSTOM_OBJECT_CLIENT
: client
.CustomObjectsApi(),
60 self
._configuration
= config
.kube_config
.Configuration
.get_default_copy()
61 self
.logger
= logging
.getLogger("Kubectl")
64 def configuration(self
):
65 return self
._configuration
73 field_selector
: str = None,
74 label_selector
: str = None,
75 ) -> typing
.List
[typing
.Dict
]:
77 Get Service list from a namespace
79 :param: field_selector: Kubernetes field selector for the namespace
80 :param: label_selector: Kubernetes label selector for the namespace
82 :return: List of the services matching the selectors specified
86 kwargs
["field_selector"] = field_selector
88 kwargs
["label_selector"] = label_selector
90 result
= self
.clients
[CORE_CLIENT
].list_service_for_all_namespaces(**kwargs
)
93 "name": i
.metadata
.name
,
94 "cluster_ip": i
.spec
.cluster_ip
,
99 "node_port": p
.node_port
,
101 "protocol": p
.protocol
,
102 "target_port": p
.target_port
,
104 for p
in i
.spec
.ports
108 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
109 if i
.status
.load_balancer
.ingress
112 for i
in result
.items
114 except ApiException
as e
:
115 self
.logger
.error("Error calling get services: {}".format(e
))
118 def get_default_storage_class(self
) -> str:
120 Default storage class
122 :return: Returns the default storage class name, if exists.
123 If not, it returns the first storage class.
124 If there are not storage classes, returns None
126 storage_classes
= self
.clients
[STORAGE_CLIENT
].list_storage_class()
128 default_sc_annotations
= {
129 "storageclass.kubernetes.io/is-default-class": "true",
130 # Older clusters still use the beta annotation.
131 "storageclass.beta.kubernetes.io/is-default-class": "true",
133 for sc
in storage_classes
.items
:
135 # Select the first storage class in case there is no a default-class
136 selected_sc
= sc
.metadata
.name
137 annotations
= sc
.metadata
.annotations
or {}
139 k
in annotations
and annotations
[k
] == v
140 for k
, v
in default_sc_annotations
.items()
143 selected_sc
= sc
.metadata
.name
147 def create_cluster_role(
150 labels
: Dict
[str, str],
151 namespace
: str = "kube-system",
154 Create a cluster role
156 :param: name: Name of the cluster role
157 :param: labels: Labels for cluster role metadata
158 :param: namespace: Kubernetes namespace for cluster role metadata
161 cluster_roles
= self
.clients
[RBAC_CLIENT
].list_cluster_role(
162 field_selector
="metadata.name={}".format(name
)
165 if len(cluster_roles
.items
) > 0:
167 "Cluster role with metadata.name={} already exists".format(name
)
170 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
172 cluster_role
= V1ClusterRole(
175 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
176 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
180 self
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
182 def delete_cluster_role(self
, name
: str):
184 Delete a cluster role
186 :param: name: Name of the cluster role
188 self
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
190 def _get_kubectl_version(self
):
191 version
= VersionApi().get_code()
192 return "{}.{}".format(version
.major
, version
.minor
)
194 def _need_to_create_new_secret(self
):
195 min_k8s_version
= "1.24"
196 current_k8s_version
= self
._get
_kubectl
_version
()
197 return LooseVersion(min_k8s_version
) <= LooseVersion(current_k8s_version
)
199 def _get_secret_name(self
, service_account_name
: str):
200 random_alphanum
= str(uuid
.uuid4())[:5]
201 return "{}-token-{}".format(service_account_name
, random_alphanum
)
203 def _create_service_account_secret(
204 self
, service_account_name
: str, namespace
: str, secret_name
: str
207 Create a secret for the service account. K8s version >= 1.24
209 :param: service_account_name: Name of the service account
210 :param: namespace: Kubernetes namespace for service account metadata
211 :param: secret_name: Name of the secret
213 v1_core
= self
.clients
[CORE_CLIENT
]
214 secrets
= v1_core
.list_namespaced_secret(
215 namespace
, field_selector
="metadata.name={}".format(secret_name
)
220 "Secret with metadata.name={} already exists".format(secret_name
)
223 annotations
= {"kubernetes.io/service-account.name": service_account_name
}
224 metadata
= V1ObjectMeta(
225 name
=secret_name
, namespace
=namespace
, annotations
=annotations
227 type = "kubernetes.io/service-account-token"
228 secret
= V1Secret(metadata
=metadata
, type=type)
229 v1_core
.create_namespaced_secret(namespace
, secret
)
231 def _get_secret_reference_list(self
, namespace
: str, secret_name
: str):
233 Return a secret reference list with one secret.
236 :param: namespace: Kubernetes namespace for service account metadata
237 :param: secret_name: Name of the secret
238 :rtype: list[V1SecretReference]
240 return [V1SecretReference(name
=secret_name
, namespace
=namespace
)]
242 def create_service_account(
245 labels
: Dict
[str, str],
246 namespace
: str = "kube-system",
249 Create a service account
251 :param: name: Name of the service account
252 :param: labels: Labels for service account metadata
253 :param: namespace: Kubernetes namespace for service account metadata
256 v1_core
= self
.clients
[CORE_CLIENT
]
257 service_accounts
= v1_core
.list_namespaced_service_account(
258 namespace
, field_selector
="metadata.name={}".format(name
)
260 if len(service_accounts
.items
) > 0:
262 "Service account with metadata.name={} already exists".format(name
)
265 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
267 if self
._need
_to
_create
_new
_secret
():
268 secret_name
= self
._get
_secret
_name
(name
)
269 secrets
= self
._get
_secret
_reference
_list
(namespace
, secret_name
)
270 service_account
= V1ServiceAccount(metadata
=metadata
, secrets
=secrets
)
271 v1_core
.create_namespaced_service_account(namespace
, service_account
)
272 self
._create
_service
_account
_secret
(name
, namespace
, secret_name
)
274 service_account
= V1ServiceAccount(metadata
=metadata
)
275 v1_core
.create_namespaced_service_account(namespace
, service_account
)
277 def delete_service_account(self
, name
: str, namespace
: str = "kube-system"):
279 Delete a service account
281 :param: name: Name of the service account
282 :param: namespace: Kubernetes namespace for service account metadata
285 self
.clients
[CORE_CLIENT
].delete_namespaced_service_account(name
, namespace
)
287 def create_cluster_role_binding(
288 self
, name
: str, labels
: Dict
[str, str], namespace
: str = "kube-system"
291 Create a cluster role binding
293 :param: name: Name of the cluster role
294 :param: labels: Labels for cluster role binding metadata
295 :param: namespace: Kubernetes namespace for cluster role binding metadata
298 role_bindings
= self
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
299 field_selector
="metadata.name={}".format(name
)
301 if len(role_bindings
.items
) > 0:
302 raise Exception("Generated rbac id already exists")
304 role_binding
= V1ClusterRoleBinding(
305 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
306 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
307 subjects
=[V1Subject(kind
="ServiceAccount", name
=name
, namespace
=namespace
)],
309 self
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
311 def delete_cluster_role_binding(self
, name
: str):
313 Delete a cluster role binding
315 :param: name: Name of the cluster role binding
317 self
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
322 fallback
=Exception("Failed getting the secret from service account"),
323 callback
=retry_callback
,
325 async def get_secret_data(
326 self
, name
: str, namespace
: str = "kube-system"
331 :param: name: Name of the secret data
332 :param: namespace: Name of the namespace where the secret is stored
334 :return: Tuple with the token and client certificate
336 v1_core
= self
.clients
[CORE_CLIENT
]
340 service_accounts
= v1_core
.list_namespaced_service_account(
341 namespace
, field_selector
="metadata.name={}".format(name
)
343 if len(service_accounts
.items
) == 0:
345 "Service account not found with metadata.name={}".format(name
)
347 service_account
= service_accounts
.items
[0]
348 if service_account
.secrets
and len(service_account
.secrets
) > 0:
349 secret_name
= service_account
.secrets
[0].name
352 "Failed getting the secret from service account {}".format(name
)
354 secret
= v1_core
.list_namespaced_secret(
355 namespace
, field_selector
="metadata.name={}".format(secret_name
)
358 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
359 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
362 base64
.b64decode(token
).decode("utf-8"),
363 base64
.b64decode(client_certificate_data
).decode("utf-8"),
366 async def create_certificate(
376 Creates cert-manager certificate object
378 :param: namespace: Name of the namespace where the certificate and secret is stored
379 :param: name: Name of the certificate object
380 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
381 :param: secret_name: Name of the secret created by cert-manager
382 :param: usages: List of X.509 key usages
383 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
387 "apiVersion": "cert-manager.io/v1",
388 "kind": "Certificate",
389 "metadata": {"name": name
, "namespace": namespace
},
391 "secretName": secret_name
,
393 "rotationPolicy": "Always",
394 "algorithm": "ECDSA",
397 "duration": "8760h", # 1 Year
398 "renewBefore": "2208h", # 9 months
399 "subject": {"organizations": ["osm"]},
404 "{}.{}".format(dns_prefix
, namespace
),
405 "{}.{}.svc".format(dns_prefix
, namespace
),
406 "{}.{}.svc.cluster".format(dns_prefix
, namespace
),
407 "{}.{}.svc.cluster.local".format(dns_prefix
, namespace
),
409 "issuerRef": {"name": issuer_name
, "kind": "ClusterIssuer"},
412 client
= self
.clients
[CUSTOM_OBJECT_CLIENT
]
414 client
.create_namespaced_custom_object(
415 group
="cert-manager.io",
416 plural
="certificates",
418 body
=certificate_body
,
421 except ApiException
as e
:
422 info
= json
.loads(e
.body
)
423 if info
.get("reason").lower() == "alreadyexists":
424 self
.logger
.warning("Certificate already exists: {}".format(e
))
428 async def delete_certificate(self
, namespace
, object_name
):
429 client
= self
.clients
[CUSTOM_OBJECT_CLIENT
]
431 client
.delete_namespaced_custom_object(
432 group
="cert-manager.io",
433 plural
="certificates",
438 except ApiException
as e
:
439 info
= json
.loads(e
.body
)
440 if info
.get("reason").lower() == "notfound":
441 self
.logger
.warning("Certificate already deleted: {}".format(e
))