Coverage for n2vc/kubectl.py: 78%

189 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:03 +0000

1# Copyright 2020 Canonical Ltd. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import base64 

16import logging 

17from typing import Dict 

18import typing 

19import uuid 

20import json 

21 

22from distutils.version import LooseVersion 

23 

24from kubernetes import client, config 

25from kubernetes.client.api import VersionApi 

26from kubernetes.client.models import ( 

27 V1ClusterRole, 

28 V1Role, 

29 V1ObjectMeta, 

30 V1PolicyRule, 

31 V1ServiceAccount, 

32 V1ClusterRoleBinding, 

33 V1RoleBinding, 

34 V1RoleRef, 

35 V1Subject, 

36 V1Secret, 

37 V1SecretReference, 

38 V1Namespace, 

39) 

40from kubernetes.client.rest import ApiException 

41from n2vc.libjuju import retry_callback 

42from retrying_async import retry 

43 

44 

45SERVICE_ACCOUNT_TOKEN_KEY = "token" 

46SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt" 

47# clients 

48CORE_CLIENT = "core_v1" 

49RBAC_CLIENT = "rbac_v1" 

50STORAGE_CLIENT = "storage_v1" 

51CUSTOM_OBJECT_CLIENT = "custom_object" 

52 

53 

54class Kubectl: 

55 def __init__(self, config_file=None): 

56 config.load_kube_config(config_file=config_file) 

57 self._clients = { 

58 CORE_CLIENT: client.CoreV1Api(), 

59 RBAC_CLIENT: client.RbacAuthorizationV1Api(), 

60 STORAGE_CLIENT: client.StorageV1Api(), 

61 CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(), 

62 } 

63 self._configuration = config.kube_config.Configuration.get_default_copy() 

64 self.logger = logging.getLogger("Kubectl") 

65 

66 @property 

67 def configuration(self): 

68 return self._configuration 

69 

70 @property 

71 def clients(self): 

72 return self._clients 

73 

74 def get_services( 

75 self, 

76 field_selector: str = None, 

77 label_selector: str = None, 

78 ) -> typing.List[typing.Dict]: 

79 """ 

80 Get Service list from a namespace 

81 

82 :param: field_selector: Kubernetes field selector for the namespace 

83 :param: label_selector: Kubernetes label selector for the namespace 

84 

85 :return: List of the services matching the selectors specified 

86 """ 

87 kwargs = {} 

88 if field_selector: 

89 kwargs["field_selector"] = field_selector 

90 if label_selector: 

91 kwargs["label_selector"] = label_selector 

92 try: 

93 result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs) 

94 return [ 

95 { 

96 "name": i.metadata.name, 

97 "cluster_ip": i.spec.cluster_ip, 

98 "type": i.spec.type, 

99 "ports": [ 

100 { 

101 "name": p.name, 

102 "node_port": p.node_port, 

103 "port": p.port, 

104 "protocol": p.protocol, 

105 "target_port": p.target_port, 

106 } 

107 for p in i.spec.ports 

108 ] 

109 if i.spec.ports 

110 else [], 

111 "external_ip": [i.ip for i in i.status.load_balancer.ingress] 

112 if i.status.load_balancer.ingress 

113 else None, 

114 } 

115 for i in result.items 

116 ] 

117 except ApiException as e: 

118 self.logger.error("Error calling get services: {}".format(e)) 

119 raise e 

120 

121 def get_default_storage_class(self) -> str: 

122 """ 

123 Default storage class 

124 

125 :return: Returns the default storage class name, if exists. 

126 If not, it returns the first storage class. 

127 If there are not storage classes, returns None 

128 """ 

129 storage_classes = self.clients[STORAGE_CLIENT].list_storage_class() 

130 selected_sc = None 

131 default_sc_annotations = { 

132 "storageclass.kubernetes.io/is-default-class": "true", 

133 # Older clusters still use the beta annotation. 

134 "storageclass.beta.kubernetes.io/is-default-class": "true", 

135 } 

136 for sc in storage_classes.items: 

137 if not selected_sc: 

138 # Select the first storage class in case there is no a default-class 

139 selected_sc = sc.metadata.name 

140 annotations = sc.metadata.annotations or {} 

141 if any( 

142 k in annotations and annotations[k] == v 

143 for k, v in default_sc_annotations.items() 

144 ): 

145 # Default storage 

146 selected_sc = sc.metadata.name 

147 break 

148 return selected_sc 

149 

150 def create_cluster_role( 

151 self, 

152 name: str, 

153 labels: Dict[str, str], 

154 namespace: str = "kube-system", 

155 ): 

156 """ 

157 Create a cluster role 

158 

159 :param: name: Name of the cluster role 

160 :param: labels: Labels for cluster role metadata 

161 :param: namespace: Kubernetes namespace for cluster role metadata 

162 Default: kube-system 

163 """ 

164 cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role( 

165 field_selector="metadata.name={}".format(name) 

166 ) 

167 

168 if len(cluster_roles.items) > 0: 

169 raise Exception("Role with metadata.name={} already exists".format(name)) 

170 

171 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) 

172 # Cluster role 

173 cluster_role = V1ClusterRole( 

174 metadata=metadata, 

175 rules=[ 

176 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]), 

177 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]), 

178 ], 

179 ) 

180 

181 self.clients[RBAC_CLIENT].create_cluster_role(cluster_role) 

182 

183 async def create_role( 

184 self, 

185 name: str, 

186 labels: Dict[str, str], 

187 api_groups: list, 

188 resources: list, 

189 verbs: list, 

190 namespace: str, 

191 ): 

192 """ 

193 Create a role with one PolicyRule 

194 

195 :param: name: Name of the namespaced Role 

196 :param: labels: Labels for namespaced Role metadata 

197 :param: api_groups: List with api-groups allowed in the policy rule 

198 :param: resources: List with resources allowed in the policy rule 

199 :param: verbs: List with verbs allowed in the policy rule 

200 :param: namespace: Kubernetes namespace for Role metadata 

201 

202 :return: None 

203 """ 

204 

205 roles = self.clients[RBAC_CLIENT].list_namespaced_role( 

206 namespace, field_selector="metadata.name={}".format(name) 

207 ) 

208 

209 if len(roles.items) > 0: 

210 raise Exception("Role with metadata.name={} already exists".format(name)) 

211 

212 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) 

213 

214 role = V1Role( 

215 metadata=metadata, 

216 rules=[ 

217 V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs), 

218 ], 

219 ) 

220 

221 self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role) 

222 

223 def delete_cluster_role(self, name: str): 

224 """ 

225 Delete a cluster role 

226 

227 :param: name: Name of the cluster role 

228 """ 

229 self.clients[RBAC_CLIENT].delete_cluster_role(name) 

230 

231 def _get_kubectl_version(self): 

232 version = VersionApi().get_code() 

233 return "{}.{}".format(version.major, version.minor) 

234 

235 def _need_to_create_new_secret(self): 

236 min_k8s_version = "1.24" 

237 current_k8s_version = self._get_kubectl_version() 

238 return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version) 

239 

240 def _get_secret_name(self, service_account_name: str): 

241 random_alphanum = str(uuid.uuid4())[:5] 

242 return "{}-token-{}".format(service_account_name, random_alphanum) 

243 

244 def _create_service_account_secret( 

245 self, service_account_name: str, namespace: str, secret_name: str 

246 ): 

247 """ 

248 Create a secret for the service account. K8s version >= 1.24 

249 

250 :param: service_account_name: Name of the service account 

251 :param: namespace: Kubernetes namespace for service account metadata 

252 :param: secret_name: Name of the secret 

253 """ 

254 v1_core = self.clients[CORE_CLIENT] 

255 secrets = v1_core.list_namespaced_secret( 

256 namespace, field_selector="metadata.name={}".format(secret_name) 

257 ).items 

258 

259 if len(secrets) > 0: 

260 raise Exception( 

261 "Secret with metadata.name={} already exists".format(secret_name) 

262 ) 

263 

264 annotations = {"kubernetes.io/service-account.name": service_account_name} 

265 metadata = V1ObjectMeta( 

266 name=secret_name, namespace=namespace, annotations=annotations 

267 ) 

268 type = "kubernetes.io/service-account-token" 

269 secret = V1Secret(metadata=metadata, type=type) 

270 v1_core.create_namespaced_secret(namespace, secret) 

271 

272 def _get_secret_reference_list(self, namespace: str, secret_name: str): 

273 """ 

274 Return a secret reference list with one secret. 

275 K8s version >= 1.24 

276 

277 :param: namespace: Kubernetes namespace for service account metadata 

278 :param: secret_name: Name of the secret 

279 :rtype: list[V1SecretReference] 

280 """ 

281 return [V1SecretReference(name=secret_name, namespace=namespace)] 

282 

283 def create_service_account( 

284 self, 

285 name: str, 

286 labels: Dict[str, str], 

287 namespace: str = "kube-system", 

288 ): 

289 """ 

290 Create a service account 

291 

292 :param: name: Name of the service account 

293 :param: labels: Labels for service account metadata 

294 :param: namespace: Kubernetes namespace for service account metadata 

295 Default: kube-system 

296 """ 

297 v1_core = self.clients[CORE_CLIENT] 

298 service_accounts = v1_core.list_namespaced_service_account( 

299 namespace, field_selector="metadata.name={}".format(name) 

300 ) 

301 if len(service_accounts.items) > 0: 

302 raise Exception( 

303 "Service account with metadata.name={} already exists".format(name) 

304 ) 

305 

306 metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace) 

307 

308 if self._need_to_create_new_secret(): 

309 secret_name = self._get_secret_name(name) 

310 secrets = self._get_secret_reference_list(namespace, secret_name) 

311 service_account = V1ServiceAccount(metadata=metadata, secrets=secrets) 

312 v1_core.create_namespaced_service_account(namespace, service_account) 

313 self._create_service_account_secret(name, namespace, secret_name) 

314 else: 

315 service_account = V1ServiceAccount(metadata=metadata) 

316 v1_core.create_namespaced_service_account(namespace, service_account) 

317 

318 def delete_service_account(self, name: str, namespace: str = "kube-system"): 

319 """ 

320 Delete a service account 

321 

322 :param: name: Name of the service account 

323 :param: namespace: Kubernetes namespace for service account metadata 

324 Default: kube-system 

325 """ 

326 self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace) 

327 

328 def create_cluster_role_binding( 

329 self, name: str, labels: Dict[str, str], namespace: str = "kube-system" 

330 ): 

331 """ 

332 Create a cluster role binding 

333 

334 :param: name: Name of the cluster role 

335 :param: labels: Labels for cluster role binding metadata 

336 :param: namespace: Kubernetes namespace for cluster role binding metadata 

337 Default: kube-system 

338 """ 

339 role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding( 

340 field_selector="metadata.name={}".format(name) 

341 ) 

342 if len(role_bindings.items) > 0: 

343 raise Exception("Generated rbac id already exists") 

344 

345 role_binding = V1ClusterRoleBinding( 

346 metadata=V1ObjectMeta(name=name, labels=labels), 

347 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""), 

348 subjects=[V1Subject(kind="ServiceAccount", name=name, namespace=namespace)], 

349 ) 

350 self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding) 

351 

352 async def create_role_binding( 

353 self, 

354 name: str, 

355 role_name: str, 

356 sa_name: str, 

357 labels: Dict[str, str], 

358 namespace: str, 

359 ): 

360 """ 

361 Create a cluster role binding 

362 

363 :param: name: Name of the namespaced Role Binding 

364 :param: role_name: Name of the namespaced Role to be bound 

365 :param: sa_name: Name of the Service Account to be bound 

366 :param: labels: Labels for Role Binding metadata 

367 :param: namespace: Kubernetes namespace for Role Binding metadata 

368 

369 :return: None 

370 """ 

371 role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding( 

372 namespace, field_selector="metadata.name={}".format(name) 

373 ) 

374 if len(role_bindings.items) > 0: 

375 raise Exception( 

376 "Role Binding with metadata.name={} already exists".format(name) 

377 ) 

378 

379 role_binding = V1RoleBinding( 

380 metadata=V1ObjectMeta(name=name, labels=labels), 

381 role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""), 

382 subjects=[ 

383 V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace) 

384 ], 

385 ) 

386 self.clients[RBAC_CLIENT].create_namespaced_role_binding( 

387 namespace, role_binding 

388 ) 

389 

390 def delete_cluster_role_binding(self, name: str): 

391 """ 

392 Delete a cluster role binding 

393 

394 :param: name: Name of the cluster role binding 

395 """ 

396 self.clients[RBAC_CLIENT].delete_cluster_role_binding(name) 

397 

398 @retry( 

399 attempts=10, 

400 delay=1, 

401 fallback=Exception("Failed getting the secret from service account"), 

402 callback=retry_callback, 

403 ) 

404 async def get_secret_data( 

405 self, name: str, namespace: str = "kube-system" 

406 ) -> (str, str): 

407 """ 

408 Get secret data 

409 

410 :param: name: Name of the secret data 

411 :param: namespace: Name of the namespace where the secret is stored 

412 

413 :return: Tuple with the token and client certificate 

414 """ 

415 v1_core = self.clients[CORE_CLIENT] 

416 

417 secret_name = None 

418 

419 service_accounts = v1_core.list_namespaced_service_account( 

420 namespace, field_selector="metadata.name={}".format(name) 

421 ) 

422 if len(service_accounts.items) == 0: 

423 raise Exception( 

424 "Service account not found with metadata.name={}".format(name) 

425 ) 

426 service_account = service_accounts.items[0] 

427 if service_account.secrets and len(service_account.secrets) > 0: 

428 secret_name = service_account.secrets[0].name 

429 if not secret_name: 

430 raise Exception( 

431 "Failed getting the secret from service account {}".format(name) 

432 ) 

433 # TODO: refactor to use get_secret_content 

434 secret = v1_core.list_namespaced_secret( 

435 namespace, field_selector="metadata.name={}".format(secret_name) 

436 ).items[0] 

437 

438 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY] 

439 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY] 

440 

441 return ( 

442 base64.b64decode(token).decode("utf-8"), 

443 base64.b64decode(client_certificate_data).decode("utf-8"), 

444 ) 

445 

446 @retry( 

447 attempts=10, 

448 delay=1, 

449 fallback=Exception("Failed getting data from the secret"), 

450 ) 

451 async def get_secret_content( 

452 self, 

453 name: str, 

454 namespace: str, 

455 ) -> dict: 

456 """ 

457 Get secret data 

458 

459 :param: name: Name of the secret 

460 :param: namespace: Name of the namespace where the secret is stored 

461 

462 :return: Dictionary with secret's data 

463 """ 

464 v1_core = self.clients[CORE_CLIENT] 

465 

466 secret = v1_core.read_namespaced_secret(name, namespace) 

467 

468 return secret.data 

469 

470 @retry( 

471 attempts=10, 

472 delay=1, 

473 fallback=Exception("Failed creating the secret"), 

474 ) 

475 async def create_secret( 

476 self, name: str, data: dict, namespace: str, secret_type: str 

477 ): 

478 """ 

479 Get secret data 

480 

481 :param: name: Name of the secret 

482 :param: data: Dict with data content. Values must be already base64 encoded 

483 :param: namespace: Name of the namespace where the secret will be stored 

484 :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls 

485 

486 :return: None 

487 """ 

488 v1_core = self.clients[CORE_CLIENT] 

489 metadata = V1ObjectMeta(name=name, namespace=namespace) 

490 secret = V1Secret(metadata=metadata, data=data, type=secret_type) 

491 v1_core.create_namespaced_secret(namespace, secret) 

492 

493 async def create_certificate( 

494 self, 

495 namespace: str, 

496 name: str, 

497 dns_prefix: str, 

498 secret_name: str, 

499 usages: list, 

500 issuer_name: str, 

501 ): 

502 """ 

503 Creates cert-manager certificate object 

504 

505 :param: namespace: Name of the namespace where the certificate and secret is stored 

506 :param: name: Name of the certificate object 

507 :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes 

508 :param: secret_name: Name of the secret created by cert-manager 

509 :param: usages: List of X.509 key usages 

510 :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object 

511 

512 """ 

513 certificate_body = { 

514 "apiVersion": "cert-manager.io/v1", 

515 "kind": "Certificate", 

516 "metadata": {"name": name, "namespace": namespace}, 

517 "spec": { 

518 "secretName": secret_name, 

519 "privateKey": { 

520 "rotationPolicy": "Always", 

521 "algorithm": "ECDSA", 

522 "size": 256, 

523 }, 

524 "duration": "8760h", # 1 Year 

525 "renewBefore": "2208h", # 9 months 

526 "subject": {"organizations": ["osm"]}, 

527 "commonName": "osm", 

528 "isCA": False, 

529 "usages": usages, 

530 "dnsNames": [ 

531 "{}.{}".format(dns_prefix, namespace), 

532 "{}.{}.svc".format(dns_prefix, namespace), 

533 "{}.{}.svc.cluster".format(dns_prefix, namespace), 

534 "{}.{}.svc.cluster.local".format(dns_prefix, namespace), 

535 ], 

536 "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"}, 

537 }, 

538 } 

539 client = self.clients[CUSTOM_OBJECT_CLIENT] 

540 try: 

541 client.create_namespaced_custom_object( 

542 group="cert-manager.io", 

543 plural="certificates", 

544 version="v1", 

545 body=certificate_body, 

546 namespace=namespace, 

547 ) 

548 except ApiException as e: 

549 info = json.loads(e.body) 

550 if info.get("reason").lower() == "alreadyexists": 

551 self.logger.warning("Certificate already exists: {}".format(e)) 

552 else: 

553 raise e 

554 

555 async def delete_certificate(self, namespace, object_name): 

556 client = self.clients[CUSTOM_OBJECT_CLIENT] 

557 try: 

558 client.delete_namespaced_custom_object( 

559 group="cert-manager.io", 

560 plural="certificates", 

561 version="v1", 

562 name=object_name, 

563 namespace=namespace, 

564 ) 

565 except ApiException as e: 

566 info = json.loads(e.body) 

567 if info.get("reason").lower() == "notfound": 

568 self.logger.warning("Certificate already deleted: {}".format(e)) 

569 else: 

570 raise e 

571 

572 @retry( 

573 attempts=10, 

574 delay=1, 

575 fallback=Exception("Failed creating the namespace"), 

576 ) 

577 async def create_namespace(self, name: str, labels: dict = None): 

578 """ 

579 Create a namespace 

580 

581 :param: name: Name of the namespace to be created 

582 :param: labels: Dictionary with labels for the new namespace 

583 

584 """ 

585 v1_core = self.clients[CORE_CLIENT] 

586 metadata = V1ObjectMeta(name=name, labels=labels) 

587 namespace = V1Namespace( 

588 metadata=metadata, 

589 ) 

590 

591 try: 

592 v1_core.create_namespace(namespace) 

593 self.logger.debug("Namespace created: {}".format(name)) 

594 except ApiException as e: 

595 info = json.loads(e.body) 

596 if info.get("reason").lower() == "alreadyexists": 

597 self.logger.warning("Namespace already exists: {}".format(e)) 

598 else: 

599 raise e 

600 

601 @retry( 

602 attempts=10, 

603 delay=1, 

604 fallback=Exception("Failed deleting the namespace"), 

605 ) 

606 async def delete_namespace(self, name: str): 

607 """ 

608 Delete a namespace 

609 

610 :param: name: Name of the namespace to be deleted 

611 

612 """ 

613 try: 

614 self.clients[CORE_CLIENT].delete_namespace(name) 

615 except ApiException as e: 

616 if e.reason == "Not Found": 

617 self.logger.warning("Namespace already deleted: {}".format(e))