1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from typing
import Dict
22 from distutils
.version
import LooseVersion
24 from kubernetes
import client
, config
25 from kubernetes
.client
.api
import VersionApi
26 from kubernetes
.client
.models
import (
40 from kubernetes
.client
.rest
import ApiException
41 from n2vc
.libjuju
import retry_callback
42 from retrying_async
import retry
45 SERVICE_ACCOUNT_TOKEN_KEY
= "token"
46 SERVICE_ACCOUNT_ROOT_CA_KEY
= "ca.crt"
48 CORE_CLIENT
= "core_v1"
49 RBAC_CLIENT
= "rbac_v1"
50 STORAGE_CLIENT
= "storage_v1"
51 CUSTOM_OBJECT_CLIENT
= "custom_object"
55 def __init__(self
, config_file
=None):
56 config
.load_kube_config(config_file
=config_file
)
58 CORE_CLIENT
: client
.CoreV1Api(),
59 RBAC_CLIENT
: client
.RbacAuthorizationV1Api(),
60 STORAGE_CLIENT
: client
.StorageV1Api(),
61 CUSTOM_OBJECT_CLIENT
: client
.CustomObjectsApi(),
63 self
._configuration
= config
.kube_config
.Configuration
.get_default_copy()
64 self
.logger
= logging
.getLogger("Kubectl")
67 def configuration(self
):
68 return self
._configuration
76 field_selector
: str = None,
77 label_selector
: str = None,
78 ) -> typing
.List
[typing
.Dict
]:
80 Get Service list from a namespace
82 :param: field_selector: Kubernetes field selector for the namespace
83 :param: label_selector: Kubernetes label selector for the namespace
85 :return: List of the services matching the selectors specified
89 kwargs
["field_selector"] = field_selector
91 kwargs
["label_selector"] = label_selector
93 result
= self
.clients
[CORE_CLIENT
].list_service_for_all_namespaces(**kwargs
)
96 "name": i
.metadata
.name
,
97 "cluster_ip": i
.spec
.cluster_ip
,
102 "node_port": p
.node_port
,
104 "protocol": p
.protocol
,
105 "target_port": p
.target_port
,
107 for p
in i
.spec
.ports
111 "external_ip": [i
.ip
for i
in i
.status
.load_balancer
.ingress
]
112 if i
.status
.load_balancer
.ingress
115 for i
in result
.items
117 except ApiException
as e
:
118 self
.logger
.error("Error calling get services: {}".format(e
))
121 def get_default_storage_class(self
) -> str:
123 Default storage class
125 :return: Returns the default storage class name, if exists.
126 If not, it returns the first storage class.
127 If there are not storage classes, returns None
129 storage_classes
= self
.clients
[STORAGE_CLIENT
].list_storage_class()
131 default_sc_annotations
= {
132 "storageclass.kubernetes.io/is-default-class": "true",
133 # Older clusters still use the beta annotation.
134 "storageclass.beta.kubernetes.io/is-default-class": "true",
136 for sc
in storage_classes
.items
:
138 # Select the first storage class in case there is no a default-class
139 selected_sc
= sc
.metadata
.name
140 annotations
= sc
.metadata
.annotations
or {}
142 k
in annotations
and annotations
[k
] == v
143 for k
, v
in default_sc_annotations
.items()
146 selected_sc
= sc
.metadata
.name
150 def create_cluster_role(
153 labels
: Dict
[str, str],
154 namespace
: str = "kube-system",
157 Create a cluster role
159 :param: name: Name of the cluster role
160 :param: labels: Labels for cluster role metadata
161 :param: namespace: Kubernetes namespace for cluster role metadata
164 cluster_roles
= self
.clients
[RBAC_CLIENT
].list_cluster_role(
165 field_selector
="metadata.name={}".format(name
)
168 if len(cluster_roles
.items
) > 0:
169 raise Exception("Role with metadata.name={} already exists".format(name
))
171 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
173 cluster_role
= V1ClusterRole(
176 V1PolicyRule(api_groups
=["*"], resources
=["*"], verbs
=["*"]),
177 V1PolicyRule(non_resource_ur_ls
=["*"], verbs
=["*"]),
181 self
.clients
[RBAC_CLIENT
].create_cluster_role(cluster_role
)
183 async def create_role(
186 labels
: Dict
[str, str],
193 Create a role with one PolicyRule
195 :param: name: Name of the namespaced Role
196 :param: labels: Labels for namespaced Role metadata
197 :param: api_groups: List with api-groups allowed in the policy rule
198 :param: resources: List with resources allowed in the policy rule
199 :param: verbs: List with verbs allowed in the policy rule
200 :param: namespace: Kubernetes namespace for Role metadata
205 roles
= self
.clients
[RBAC_CLIENT
].list_namespaced_role(
206 namespace
, field_selector
="metadata.name={}".format(name
)
209 if len(roles
.items
) > 0:
210 raise Exception("Role with metadata.name={} already exists".format(name
))
212 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
217 V1PolicyRule(api_groups
=api_groups
, resources
=resources
, verbs
=verbs
),
221 self
.clients
[RBAC_CLIENT
].create_namespaced_role(namespace
, role
)
223 def delete_cluster_role(self
, name
: str):
225 Delete a cluster role
227 :param: name: Name of the cluster role
229 self
.clients
[RBAC_CLIENT
].delete_cluster_role(name
)
231 def _get_kubectl_version(self
):
232 version
= VersionApi().get_code()
233 return "{}.{}".format(version
.major
, version
.minor
)
235 def _need_to_create_new_secret(self
):
236 min_k8s_version
= "1.24"
237 current_k8s_version
= self
._get
_kubectl
_version
()
238 return LooseVersion(min_k8s_version
) <= LooseVersion(current_k8s_version
)
240 def _get_secret_name(self
, service_account_name
: str):
241 random_alphanum
= str(uuid
.uuid4())[:5]
242 return "{}-token-{}".format(service_account_name
, random_alphanum
)
244 def _create_service_account_secret(
245 self
, service_account_name
: str, namespace
: str, secret_name
: str
248 Create a secret for the service account. K8s version >= 1.24
250 :param: service_account_name: Name of the service account
251 :param: namespace: Kubernetes namespace for service account metadata
252 :param: secret_name: Name of the secret
254 v1_core
= self
.clients
[CORE_CLIENT
]
255 secrets
= v1_core
.list_namespaced_secret(
256 namespace
, field_selector
="metadata.name={}".format(secret_name
)
261 "Secret with metadata.name={} already exists".format(secret_name
)
264 annotations
= {"kubernetes.io/service-account.name": service_account_name
}
265 metadata
= V1ObjectMeta(
266 name
=secret_name
, namespace
=namespace
, annotations
=annotations
268 type = "kubernetes.io/service-account-token"
269 secret
= V1Secret(metadata
=metadata
, type=type)
270 v1_core
.create_namespaced_secret(namespace
, secret
)
272 def _get_secret_reference_list(self
, namespace
: str, secret_name
: str):
274 Return a secret reference list with one secret.
277 :param: namespace: Kubernetes namespace for service account metadata
278 :param: secret_name: Name of the secret
279 :rtype: list[V1SecretReference]
281 return [V1SecretReference(name
=secret_name
, namespace
=namespace
)]
283 def create_service_account(
286 labels
: Dict
[str, str],
287 namespace
: str = "kube-system",
290 Create a service account
292 :param: name: Name of the service account
293 :param: labels: Labels for service account metadata
294 :param: namespace: Kubernetes namespace for service account metadata
297 v1_core
= self
.clients
[CORE_CLIENT
]
298 service_accounts
= v1_core
.list_namespaced_service_account(
299 namespace
, field_selector
="metadata.name={}".format(name
)
301 if len(service_accounts
.items
) > 0:
303 "Service account with metadata.name={} already exists".format(name
)
306 metadata
= V1ObjectMeta(name
=name
, labels
=labels
, namespace
=namespace
)
308 if self
._need
_to
_create
_new
_secret
():
309 secret_name
= self
._get
_secret
_name
(name
)
310 secrets
= self
._get
_secret
_reference
_list
(namespace
, secret_name
)
311 service_account
= V1ServiceAccount(metadata
=metadata
, secrets
=secrets
)
312 v1_core
.create_namespaced_service_account(namespace
, service_account
)
313 self
._create
_service
_account
_secret
(name
, namespace
, secret_name
)
315 service_account
= V1ServiceAccount(metadata
=metadata
)
316 v1_core
.create_namespaced_service_account(namespace
, service_account
)
318 def delete_service_account(self
, name
: str, namespace
: str = "kube-system"):
320 Delete a service account
322 :param: name: Name of the service account
323 :param: namespace: Kubernetes namespace for service account metadata
326 self
.clients
[CORE_CLIENT
].delete_namespaced_service_account(name
, namespace
)
328 def create_cluster_role_binding(
329 self
, name
: str, labels
: Dict
[str, str], namespace
: str = "kube-system"
332 Create a cluster role binding
334 :param: name: Name of the cluster role
335 :param: labels: Labels for cluster role binding metadata
336 :param: namespace: Kubernetes namespace for cluster role binding metadata
339 role_bindings
= self
.clients
[RBAC_CLIENT
].list_cluster_role_binding(
340 field_selector
="metadata.name={}".format(name
)
342 if len(role_bindings
.items
) > 0:
343 raise Exception("Generated rbac id already exists")
345 role_binding
= V1ClusterRoleBinding(
346 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
347 role_ref
=V1RoleRef(kind
="ClusterRole", name
=name
, api_group
=""),
348 subjects
=[V1Subject(kind
="ServiceAccount", name
=name
, namespace
=namespace
)],
350 self
.clients
[RBAC_CLIENT
].create_cluster_role_binding(role_binding
)
352 async def create_role_binding(
357 labels
: Dict
[str, str],
361 Create a cluster role binding
363 :param: name: Name of the namespaced Role Binding
364 :param: role_name: Name of the namespaced Role to be bound
365 :param: sa_name: Name of the Service Account to be bound
366 :param: labels: Labels for Role Binding metadata
367 :param: namespace: Kubernetes namespace for Role Binding metadata
371 role_bindings
= self
.clients
[RBAC_CLIENT
].list_namespaced_role_binding(
372 namespace
, field_selector
="metadata.name={}".format(name
)
374 if len(role_bindings
.items
) > 0:
376 "Role Binding with metadata.name={} already exists".format(name
)
379 role_binding
= V1RoleBinding(
380 metadata
=V1ObjectMeta(name
=name
, labels
=labels
),
381 role_ref
=V1RoleRef(kind
="Role", name
=role_name
, api_group
=""),
383 V1Subject(kind
="ServiceAccount", name
=sa_name
, namespace
=namespace
)
386 self
.clients
[RBAC_CLIENT
].create_namespaced_role_binding(
387 namespace
, role_binding
390 def delete_cluster_role_binding(self
, name
: str):
392 Delete a cluster role binding
394 :param: name: Name of the cluster role binding
396 self
.clients
[RBAC_CLIENT
].delete_cluster_role_binding(name
)
401 fallback
=Exception("Failed getting the secret from service account"),
402 callback
=retry_callback
,
404 async def get_secret_data(
405 self
, name
: str, namespace
: str = "kube-system"
410 :param: name: Name of the secret data
411 :param: namespace: Name of the namespace where the secret is stored
413 :return: Tuple with the token and client certificate
415 v1_core
= self
.clients
[CORE_CLIENT
]
419 service_accounts
= v1_core
.list_namespaced_service_account(
420 namespace
, field_selector
="metadata.name={}".format(name
)
422 if len(service_accounts
.items
) == 0:
424 "Service account not found with metadata.name={}".format(name
)
426 service_account
= service_accounts
.items
[0]
427 if service_account
.secrets
and len(service_account
.secrets
) > 0:
428 secret_name
= service_account
.secrets
[0].name
431 "Failed getting the secret from service account {}".format(name
)
433 # TODO: refactor to use get_secret_content
434 secret
= v1_core
.list_namespaced_secret(
435 namespace
, field_selector
="metadata.name={}".format(secret_name
)
438 token
= secret
.data
[SERVICE_ACCOUNT_TOKEN_KEY
]
439 client_certificate_data
= secret
.data
[SERVICE_ACCOUNT_ROOT_CA_KEY
]
442 base64
.b64decode(token
).decode("utf-8"),
443 base64
.b64decode(client_certificate_data
).decode("utf-8"),
449 fallback
=Exception("Failed getting data from the secret"),
451 async def get_secret_content(
459 :param: name: Name of the secret
460 :param: namespace: Name of the namespace where the secret is stored
462 :return: Dictionary with secret's data
464 v1_core
= self
.clients
[CORE_CLIENT
]
466 secret
= v1_core
.read_namespaced_secret(name
, namespace
)
473 fallback
=Exception("Failed creating the secret"),
475 async def create_secret(
476 self
, name
: str, data
: dict, namespace
: str, secret_type
: str
481 :param: name: Name of the secret
482 :param: data: Dict with data content. Values must be already base64 encoded
483 :param: namespace: Name of the namespace where the secret will be stored
484 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
488 v1_core
= self
.clients
[CORE_CLIENT
]
489 metadata
= V1ObjectMeta(name
=name
, namespace
=namespace
)
490 secret
= V1Secret(metadata
=metadata
, data
=data
, type=secret_type
)
491 v1_core
.create_namespaced_secret(namespace
, secret
)
493 async def create_certificate(
503 Creates cert-manager certificate object
505 :param: namespace: Name of the namespace where the certificate and secret is stored
506 :param: name: Name of the certificate object
507 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
508 :param: secret_name: Name of the secret created by cert-manager
509 :param: usages: List of X.509 key usages
510 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
514 "apiVersion": "cert-manager.io/v1",
515 "kind": "Certificate",
516 "metadata": {"name": name
, "namespace": namespace
},
518 "secretName": secret_name
,
520 "rotationPolicy": "Always",
521 "algorithm": "ECDSA",
524 "duration": "8760h", # 1 Year
525 "renewBefore": "2208h", # 9 months
526 "subject": {"organizations": ["osm"]},
531 "{}.{}".format(dns_prefix
, namespace
),
532 "{}.{}.svc".format(dns_prefix
, namespace
),
533 "{}.{}.svc.cluster".format(dns_prefix
, namespace
),
534 "{}.{}.svc.cluster.local".format(dns_prefix
, namespace
),
536 "issuerRef": {"name": issuer_name
, "kind": "ClusterIssuer"},
539 client
= self
.clients
[CUSTOM_OBJECT_CLIENT
]
541 client
.create_namespaced_custom_object(
542 group
="cert-manager.io",
543 plural
="certificates",
545 body
=certificate_body
,
548 except ApiException
as e
:
549 info
= json
.loads(e
.body
)
550 if info
.get("reason").lower() == "alreadyexists":
551 self
.logger
.warning("Certificate already exists: {}".format(e
))
555 async def delete_certificate(self
, namespace
, object_name
):
556 client
= self
.clients
[CUSTOM_OBJECT_CLIENT
]
558 client
.delete_namespaced_custom_object(
559 group
="cert-manager.io",
560 plural
="certificates",
565 except ApiException
as e
:
566 info
= json
.loads(e
.body
)
567 if info
.get("reason").lower() == "notfound":
568 self
.logger
.warning("Certificate already deleted: {}".format(e
))
575 fallback
=Exception("Failed creating the namespace"),
577 async def create_namespace(self
, name
: str, labels
: dict = None):
581 :param: name: Name of the namespace to be created
582 :param: labels: Dictionary with labels for the new namespace
585 v1_core
= self
.clients
[CORE_CLIENT
]
586 metadata
= V1ObjectMeta(name
=name
, labels
=labels
)
587 namespace
= V1Namespace(
592 v1_core
.create_namespace(namespace
)
593 self
.logger
.debug("Namespace created: {}".format(name
))
594 except ApiException
as e
:
595 info
= json
.loads(e
.body
)
596 if info
.get("reason").lower() == "alreadyexists":
597 self
.logger
.warning("Namespace already exists: {}".format(e
))
604 fallback
=Exception("Failed deleting the namespace"),
606 async def delete_namespace(self
, name
: str):
610 :param: name: Name of the namespace to be deleted
614 self
.clients
[CORE_CLIENT
].delete_namespace(name
)
615 except ApiException
as e
:
616 if e
.reason
== "Not Found":
617 self
.logger
.warning("Namespace already deleted: {}".format(e
))