Coverage for n2vc/k8s_helm_base_conn.py: 58%

772 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:03 +0000

1## 

2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. 

3# This file is part of OSM 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

15# implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18# 

19# For those usages not covered by the Apache License, Version 2.0 please 

20# contact with: nfvlabs@tid.es 

21## 

22import abc 

23import asyncio 

24from typing import Union 

25from shlex import quote 

26import random 

27import time 

28import shlex 

29import shutil 

30import stat 

31import os 

32import yaml 

33from uuid import uuid4 

34from urllib.parse import urlparse 

35 

36from n2vc.config import EnvironConfig 

37from n2vc.exceptions import K8sException 

38from n2vc.k8s_conn import K8sConnector 

39from n2vc.kubectl import Kubectl 

40 

41 

42class K8sHelmBaseConnector(K8sConnector): 

43 

44 """ 

45 #################################################################################### 

46 ################################### P U B L I C #################################### 

47 #################################################################################### 

48 """ 

49 

50 service_account = "osm" 

51 

52 def __init__( 

53 self, 

54 fs: object, 

55 db: object, 

56 kubectl_command: str = "/usr/bin/kubectl", 

57 helm_command: str = "/usr/bin/helm", 

58 log: object = None, 

59 on_update_db=None, 

60 ): 

61 """ 

62 

63 :param fs: file system for kubernetes and helm configuration 

64 :param db: database object to write current operation status 

65 :param kubectl_command: path to kubectl executable 

66 :param helm_command: path to helm executable 

67 :param log: logger 

68 :param on_update_db: callback called when k8s connector updates database 

69 """ 

70 

71 # parent class 

72 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db) 

73 

74 self.log.info("Initializing K8S Helm connector") 

75 

76 self.config = EnvironConfig() 

77 # random numbers for release name generation 

78 random.seed(time.time()) 

79 

80 # the file system 

81 self.fs = fs 

82 

83 # exception if kubectl is not installed 

84 self.kubectl_command = kubectl_command 

85 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True) 

86 

87 # exception if helm is not installed 

88 self._helm_command = helm_command 

89 self._check_file_exists(filename=helm_command, exception_if_not_exists=True) 

90 

91 # obtain stable repo url from config or apply default 

92 self._stable_repo_url = self.config.get("stablerepourl") 

93 if self._stable_repo_url == "None": 

94 self._stable_repo_url = None 

95 

96 # Lock to avoid concurrent execution of helm commands 

97 self.cmd_lock = asyncio.Lock() 

98 

99 def _get_namespace(self, cluster_uuid: str) -> str: 

100 """ 

101 Obtains the namespace used by the cluster with the uuid passed by argument 

102 

103 param: cluster_uuid: cluster's uuid 

104 """ 

105 

106 # first, obtain the cluster corresponding to the uuid passed by argument 

107 k8scluster = self.db.get_one( 

108 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False 

109 ) 

110 return k8scluster.get("namespace") 

111 

112 async def init_env( 

113 self, 

114 k8s_creds: str, 

115 namespace: str = "kube-system", 

116 reuse_cluster_uuid=None, 

117 **kwargs, 

118 ) -> tuple[str, bool]: 

119 """ 

120 It prepares a given K8s cluster environment to run Charts 

121 

122 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid 

123 '.kube/config' 

124 :param namespace: optional namespace to be used for helm. By default, 

125 'kube-system' will be used 

126 :param reuse_cluster_uuid: existing cluster uuid for reuse 

127 :param kwargs: Additional parameters (None yet) 

128 :return: uuid of the K8s cluster and True if connector has installed some 

129 software in the cluster 

130 (on error, an exception will be raised) 

131 """ 

132 

133 if reuse_cluster_uuid: 

134 cluster_id = reuse_cluster_uuid 

135 else: 

136 cluster_id = str(uuid4()) 

137 

138 self.log.debug( 

139 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) 

140 ) 

141 

142 paths, env = self._init_paths_env( 

143 cluster_name=cluster_id, create_if_not_exist=True 

144 ) 

145 mode = stat.S_IRUSR | stat.S_IWUSR 

146 with open(paths["kube_config"], "w", mode) as f: 

147 f.write(k8s_creds) 

148 os.chmod(paths["kube_config"], 0o600) 

149 

150 # Code with initialization specific of helm version 

151 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env) 

152 

153 # sync fs with local data 

154 self.fs.reverse_sync(from_path=cluster_id) 

155 

156 self.log.info("Cluster {} initialized".format(cluster_id)) 

157 

158 return cluster_id, n2vc_installed_sw 

159 

160 async def repo_add( 

161 self, 

162 cluster_uuid: str, 

163 name: str, 

164 url: str, 

165 repo_type: str = "chart", 

166 cert: str = None, 

167 user: str = None, 

168 password: str = None, 

169 oci: bool = False, 

170 ): 

171 self.log.debug( 

172 "Cluster {}, adding {} repository {}. URL: {}".format( 

173 cluster_uuid, repo_type, name, url 

174 ) 

175 ) 

176 

177 # init_env 

178 paths, env = self._init_paths_env( 

179 cluster_name=cluster_uuid, create_if_not_exist=True 

180 ) 

181 

182 # sync local dir 

183 self.fs.sync(from_path=cluster_uuid) 

184 

185 if oci: 

186 if user and password: 

187 host_port = urlparse(url).netloc if url.startswith("oci://") else url 

188 # helm registry login url 

189 command = "env KUBECONFIG={} {} registry login {}".format( 

190 paths["kube_config"], self._helm_command, quote(host_port) 

191 ) 

192 else: 

193 self.log.debug( 

194 "OCI registry login is not needed for repo: {}".format(name) 

195 ) 

196 return 

197 else: 

198 # helm repo add name url 

199 command = "env KUBECONFIG={} {} repo add {} {}".format( 

200 paths["kube_config"], self._helm_command, quote(name), quote(url) 

201 ) 

202 

203 if cert: 

204 temp_cert_file = os.path.join( 

205 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt" 

206 ) 

207 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True) 

208 with open(temp_cert_file, "w") as the_cert: 

209 the_cert.write(cert) 

210 command += " --ca-file {}".format(quote(temp_cert_file)) 

211 

212 if user: 

213 command += " --username={}".format(quote(user)) 

214 

215 if password: 

216 command += " --password={}".format(quote(password)) 

217 

218 self.log.debug("adding repo: {}".format(command)) 

219 await self._local_async_exec( 

220 command=command, raise_exception_on_error=True, env=env 

221 ) 

222 

223 if not oci: 

224 # helm repo update 

225 command = "env KUBECONFIG={} {} repo update {}".format( 

226 paths["kube_config"], self._helm_command, quote(name) 

227 ) 

228 self.log.debug("updating repo: {}".format(command)) 

229 await self._local_async_exec( 

230 command=command, raise_exception_on_error=False, env=env 

231 ) 

232 

233 # sync fs 

234 self.fs.reverse_sync(from_path=cluster_uuid) 

235 

236 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): 

237 self.log.debug( 

238 "Cluster {}, updating {} repository {}".format( 

239 cluster_uuid, repo_type, name 

240 ) 

241 ) 

242 

243 # init_env 

244 paths, env = self._init_paths_env( 

245 cluster_name=cluster_uuid, create_if_not_exist=True 

246 ) 

247 

248 # sync local dir 

249 self.fs.sync(from_path=cluster_uuid) 

250 

251 # helm repo update 

252 command = "{} repo update {}".format(self._helm_command, quote(name)) 

253 self.log.debug("updating repo: {}".format(command)) 

254 await self._local_async_exec( 

255 command=command, raise_exception_on_error=False, env=env 

256 ) 

257 

258 # sync fs 

259 self.fs.reverse_sync(from_path=cluster_uuid) 

260 

261 async def repo_list(self, cluster_uuid: str) -> list: 

262 """ 

263 Get the list of registered repositories 

264 

265 :return: list of registered repositories: [ (name, url) .... ] 

266 """ 

267 

268 self.log.debug("list repositories for cluster {}".format(cluster_uuid)) 

269 

270 # config filename 

271 paths, env = self._init_paths_env( 

272 cluster_name=cluster_uuid, create_if_not_exist=True 

273 ) 

274 

275 # sync local dir 

276 self.fs.sync(from_path=cluster_uuid) 

277 

278 command = "env KUBECONFIG={} {} repo list --output yaml".format( 

279 paths["kube_config"], self._helm_command 

280 ) 

281 

282 # Set exception to false because if there are no repos just want an empty list 

283 output, _rc = await self._local_async_exec( 

284 command=command, raise_exception_on_error=False, env=env 

285 ) 

286 

287 # sync fs 

288 self.fs.reverse_sync(from_path=cluster_uuid) 

289 

290 if _rc == 0: 

291 if output and len(output) > 0: 

292 repos = yaml.load(output, Loader=yaml.SafeLoader) 

293 # unify format between helm2 and helm3 setting all keys lowercase 

294 return self._lower_keys_list(repos) 

295 else: 

296 return [] 

297 else: 

298 return [] 

299 

300 async def repo_remove(self, cluster_uuid: str, name: str): 

301 self.log.debug( 

302 "remove {} repositories for cluster {}".format(name, cluster_uuid) 

303 ) 

304 

305 # init env, paths 

306 paths, env = self._init_paths_env( 

307 cluster_name=cluster_uuid, create_if_not_exist=True 

308 ) 

309 

310 # sync local dir 

311 self.fs.sync(from_path=cluster_uuid) 

312 

313 command = "env KUBECONFIG={} {} repo remove {}".format( 

314 paths["kube_config"], self._helm_command, quote(name) 

315 ) 

316 await self._local_async_exec( 

317 command=command, raise_exception_on_error=True, env=env 

318 ) 

319 

320 # sync fs 

321 self.fs.reverse_sync(from_path=cluster_uuid) 

322 

323 async def reset( 

324 self, 

325 cluster_uuid: str, 

326 force: bool = False, 

327 uninstall_sw: bool = False, 

328 **kwargs, 

329 ) -> bool: 

330 """Reset a cluster 

331 

332 Resets the Kubernetes cluster by removing the helm deployment that represents it. 

333 

334 :param cluster_uuid: The UUID of the cluster to reset 

335 :param force: Boolean to force the reset 

336 :param uninstall_sw: Boolean to force the reset 

337 :param kwargs: Additional parameters (None yet) 

338 :return: Returns True if successful or raises an exception. 

339 """ 

340 namespace = self._get_namespace(cluster_uuid=cluster_uuid) 

341 self.log.debug( 

342 "Resetting K8s environment. cluster uuid: {} uninstall={}".format( 

343 cluster_uuid, uninstall_sw 

344 ) 

345 ) 

346 

347 # sync local dir 

348 self.fs.sync(from_path=cluster_uuid) 

349 

350 # uninstall releases if needed. 

351 if uninstall_sw: 

352 releases = await self.instances_list(cluster_uuid=cluster_uuid) 

353 if len(releases) > 0: 

354 if force: 

355 for r in releases: 

356 try: 

357 kdu_instance = r.get("name") 

358 chart = r.get("chart") 

359 self.log.debug( 

360 "Uninstalling {} -> {}".format(chart, kdu_instance) 

361 ) 

362 await self.uninstall( 

363 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance 

364 ) 

365 except Exception as e: 

366 # will not raise exception as it was found 

367 # that in some cases of previously installed helm releases it 

368 # raised an error 

369 self.log.warn( 

370 "Error uninstalling release {}: {}".format( 

371 kdu_instance, e 

372 ) 

373 ) 

374 else: 

375 msg = ( 

376 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" 

377 ).format(cluster_uuid) 

378 self.log.warn(msg) 

379 uninstall_sw = ( 

380 False # Allow to remove k8s cluster without removing Tiller 

381 ) 

382 

383 if uninstall_sw: 

384 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) 

385 

386 # delete cluster directory 

387 self.log.debug("Removing directory {}".format(cluster_uuid)) 

388 self.fs.file_delete(cluster_uuid, ignore_non_exist=True) 

389 # Remove also local directorio if still exist 

390 direct = self.fs.path + "/" + cluster_uuid 

391 shutil.rmtree(direct, ignore_errors=True) 

392 

393 return True 

394 

395 def _is_helm_chart_a_file(self, chart_name: str): 

396 return chart_name.count("/") > 1 

397 

398 @staticmethod 

399 def _is_helm_chart_a_url(chart_name: str): 

400 result = urlparse(chart_name) 

401 return all([result.scheme, result.netloc]) 

402 

403 async def _install_impl( 

404 self, 

405 cluster_id: str, 

406 kdu_model: str, 

407 paths: dict, 

408 env: dict, 

409 kdu_instance: str, 

410 atomic: bool = True, 

411 timeout: float = 300, 

412 params: dict = None, 

413 db_dict: dict = None, 

414 kdu_name: str = None, 

415 namespace: str = None, 

416 ): 

417 # init env, paths 

418 paths, env = self._init_paths_env( 

419 cluster_name=cluster_id, create_if_not_exist=True 

420 ) 

421 

422 # params to str 

423 params_str, file_to_delete = self._params_to_file_option( 

424 cluster_id=cluster_id, params=params 

425 ) 

426 

427 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id) 

428 

429 command = self._get_install_command( 

430 kdu_model, 

431 kdu_instance, 

432 namespace, 

433 params_str, 

434 version, 

435 atomic, 

436 timeout, 

437 paths["kube_config"], 

438 ) 

439 

440 self.log.debug("installing: {}".format(command)) 

441 

442 if atomic: 

443 # exec helm in a task 

444 exec_task = asyncio.ensure_future( 

445 coro_or_future=self._local_async_exec( 

446 command=command, raise_exception_on_error=False, env=env 

447 ) 

448 ) 

449 

450 # write status in another task 

451 status_task = asyncio.ensure_future( 

452 coro_or_future=self._store_status( 

453 cluster_id=cluster_id, 

454 kdu_instance=kdu_instance, 

455 namespace=namespace, 

456 db_dict=db_dict, 

457 operation="install", 

458 ) 

459 ) 

460 

461 # wait for execution task 

462 await asyncio.wait([exec_task]) 

463 

464 # cancel status task 

465 status_task.cancel() 

466 

467 output, rc = exec_task.result() 

468 

469 else: 

470 output, rc = await self._local_async_exec( 

471 command=command, raise_exception_on_error=False, env=env 

472 ) 

473 

474 # remove temporal values yaml file 

475 if file_to_delete: 

476 os.remove(file_to_delete) 

477 

478 # write final status 

479 await self._store_status( 

480 cluster_id=cluster_id, 

481 kdu_instance=kdu_instance, 

482 namespace=namespace, 

483 db_dict=db_dict, 

484 operation="install", 

485 ) 

486 

487 if rc != 0: 

488 msg = "Error executing command: {}\nOutput: {}".format(command, output) 

489 self.log.error(msg) 

490 raise K8sException(msg) 

491 

492 async def upgrade( 

493 self, 

494 cluster_uuid: str, 

495 kdu_instance: str, 

496 kdu_model: str = None, 

497 atomic: bool = True, 

498 timeout: float = 300, 

499 params: dict = None, 

500 db_dict: dict = None, 

501 namespace: str = None, 

502 reset_values: bool = False, 

503 reuse_values: bool = True, 

504 reset_then_reuse_values: bool = False, 

505 force: bool = False, 

506 ): 

507 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) 

508 

509 # sync local dir 

510 self.fs.sync(from_path=cluster_uuid) 

511 

512 # look for instance to obtain namespace 

513 

514 # set namespace 

515 if not namespace: 

516 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) 

517 if not instance_info: 

518 raise K8sException("kdu_instance {} not found".format(kdu_instance)) 

519 namespace = instance_info["namespace"] 

520 

521 # init env, paths 

522 paths, env = self._init_paths_env( 

523 cluster_name=cluster_uuid, create_if_not_exist=True 

524 ) 

525 

526 # sync local dir 

527 self.fs.sync(from_path=cluster_uuid) 

528 

529 # params to str 

530 params_str, file_to_delete = self._params_to_file_option( 

531 cluster_id=cluster_uuid, params=params 

532 ) 

533 

534 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) 

535 

536 command = self._get_upgrade_command( 

537 kdu_model, 

538 kdu_instance, 

539 namespace, 

540 params_str, 

541 version, 

542 atomic, 

543 timeout, 

544 paths["kube_config"], 

545 reset_values, 

546 reuse_values, 

547 reset_then_reuse_values, 

548 force, 

549 ) 

550 

551 self.log.debug("upgrading: {}".format(command)) 

552 

553 if atomic: 

554 # exec helm in a task 

555 exec_task = asyncio.ensure_future( 

556 coro_or_future=self._local_async_exec( 

557 command=command, raise_exception_on_error=False, env=env 

558 ) 

559 ) 

560 # write status in another task 

561 status_task = asyncio.ensure_future( 

562 coro_or_future=self._store_status( 

563 cluster_id=cluster_uuid, 

564 kdu_instance=kdu_instance, 

565 namespace=namespace, 

566 db_dict=db_dict, 

567 operation="upgrade", 

568 ) 

569 ) 

570 

571 # wait for execution task 

572 await asyncio.wait([exec_task]) 

573 

574 # cancel status task 

575 status_task.cancel() 

576 output, rc = exec_task.result() 

577 

578 else: 

579 output, rc = await self._local_async_exec( 

580 command=command, raise_exception_on_error=False, env=env 

581 ) 

582 

583 # remove temporal values yaml file 

584 if file_to_delete: 

585 os.remove(file_to_delete) 

586 

587 # write final status 

588 await self._store_status( 

589 cluster_id=cluster_uuid, 

590 kdu_instance=kdu_instance, 

591 namespace=namespace, 

592 db_dict=db_dict, 

593 operation="upgrade", 

594 ) 

595 

596 if rc != 0: 

597 msg = "Error executing command: {}\nOutput: {}".format(command, output) 

598 self.log.error(msg) 

599 raise K8sException(msg) 

600 

601 # sync fs 

602 self.fs.reverse_sync(from_path=cluster_uuid) 

603 

604 # return new revision number 

605 instance = await self.get_instance_info( 

606 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance 

607 ) 

608 if instance: 

609 revision = int(instance.get("revision")) 

610 self.log.debug("New revision: {}".format(revision)) 

611 return revision 

612 else: 

613 return 0 

614 

615 async def scale( 

616 self, 

617 kdu_instance: str, 

618 scale: int, 

619 resource_name: str, 

620 total_timeout: float = 1800, 

621 cluster_uuid: str = None, 

622 kdu_model: str = None, 

623 atomic: bool = True, 

624 db_dict: dict = None, 

625 **kwargs, 

626 ): 

627 """Scale a resource in a Helm Chart. 

628 

629 Args: 

630 kdu_instance: KDU instance name 

631 scale: Scale to which to set the resource 

632 resource_name: Resource name 

633 total_timeout: The time, in seconds, to wait 

634 cluster_uuid: The UUID of the cluster 

635 kdu_model: The chart reference 

636 atomic: if set, upgrade process rolls back changes made in case of failed upgrade. 

637 The --wait flag will be set automatically if --atomic is used 

638 db_dict: Dictionary for any additional data 

639 kwargs: Additional parameters 

640 

641 Returns: 

642 True if successful, False otherwise 

643 """ 

644 

645 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid) 

646 if resource_name: 

647 debug_mgs = "scaling resource {} in model {} (cluster {})".format( 

648 resource_name, kdu_model, cluster_uuid 

649 ) 

650 

651 self.log.debug(debug_mgs) 

652 

653 # look for instance to obtain namespace 

654 # get_instance_info function calls the sync command 

655 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) 

656 if not instance_info: 

657 raise K8sException("kdu_instance {} not found".format(kdu_instance)) 

658 

659 # init env, paths 

660 paths, env = self._init_paths_env( 

661 cluster_name=cluster_uuid, create_if_not_exist=True 

662 ) 

663 

664 # version 

665 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid) 

666 

667 repo_url = await self._find_repo(kdu_model, cluster_uuid) 

668 

669 _, replica_str = await self._get_replica_count_url( 

670 kdu_model, repo_url, resource_name 

671 ) 

672 

673 command = self._get_upgrade_scale_command( 

674 kdu_model, 

675 kdu_instance, 

676 instance_info["namespace"], 

677 scale, 

678 version, 

679 atomic, 

680 replica_str, 

681 total_timeout, 

682 resource_name, 

683 paths["kube_config"], 

684 ) 

685 

686 self.log.debug("scaling: {}".format(command)) 

687 

688 if atomic: 

689 # exec helm in a task 

690 exec_task = asyncio.ensure_future( 

691 coro_or_future=self._local_async_exec( 

692 command=command, raise_exception_on_error=False, env=env 

693 ) 

694 ) 

695 # write status in another task 

696 status_task = asyncio.ensure_future( 

697 coro_or_future=self._store_status( 

698 cluster_id=cluster_uuid, 

699 kdu_instance=kdu_instance, 

700 namespace=instance_info["namespace"], 

701 db_dict=db_dict, 

702 operation="scale", 

703 ) 

704 ) 

705 

706 # wait for execution task 

707 await asyncio.wait([exec_task]) 

708 

709 # cancel status task 

710 status_task.cancel() 

711 output, rc = exec_task.result() 

712 

713 else: 

714 output, rc = await self._local_async_exec( 

715 command=command, raise_exception_on_error=False, env=env 

716 ) 

717 

718 # write final status 

719 await self._store_status( 

720 cluster_id=cluster_uuid, 

721 kdu_instance=kdu_instance, 

722 namespace=instance_info["namespace"], 

723 db_dict=db_dict, 

724 operation="scale", 

725 ) 

726 

727 if rc != 0: 

728 msg = "Error executing command: {}\nOutput: {}".format(command, output) 

729 self.log.error(msg) 

730 raise K8sException(msg) 

731 

732 # sync fs 

733 self.fs.reverse_sync(from_path=cluster_uuid) 

734 

735 return True 

736 

737 async def get_scale_count( 

738 self, 

739 resource_name: str, 

740 kdu_instance: str, 

741 cluster_uuid: str, 

742 kdu_model: str, 

743 **kwargs, 

744 ) -> int: 

745 """Get a resource scale count. 

746 

747 Args: 

748 cluster_uuid: The UUID of the cluster 

749 resource_name: Resource name 

750 kdu_instance: KDU instance name 

751 kdu_model: The name or path of an Helm Chart 

752 kwargs: Additional parameters 

753 

754 Returns: 

755 Resource instance count 

756 """ 

757 

758 self.log.debug( 

759 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid) 

760 ) 

761 

762 # look for instance to obtain namespace 

763 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) 

764 if not instance_info: 

765 raise K8sException("kdu_instance {} not found".format(kdu_instance)) 

766 

767 # init env, paths 

768 paths, _ = self._init_paths_env( 

769 cluster_name=cluster_uuid, create_if_not_exist=True 

770 ) 

771 

772 replicas = await self._get_replica_count_instance( 

773 kdu_instance=kdu_instance, 

774 namespace=instance_info["namespace"], 

775 kubeconfig=paths["kube_config"], 

776 resource_name=resource_name, 

777 ) 

778 

779 self.log.debug( 

780 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}" 

781 ) 

782 

783 # Get default value if scale count is not found from provided values 

784 # Important note: this piece of code shall only be executed in the first scaling operation, 

785 # since it is expected that the _get_replica_count_instance is able to obtain the number of 

786 # replicas when a scale operation was already conducted previously for this KDU/resource! 

787 if replicas is None: 

788 repo_url = await self._find_repo( 

789 kdu_model=kdu_model, cluster_uuid=cluster_uuid 

790 ) 

791 replicas, _ = await self._get_replica_count_url( 

792 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name 

793 ) 

794 

795 self.log.debug( 

796 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource " 

797 f"{resource_name} obtained: {replicas}" 

798 ) 

799 

800 if replicas is None: 

801 msg = "Replica count not found. Cannot be scaled" 

802 self.log.error(msg) 

803 raise K8sException(msg) 

804 

805 return int(replicas) 

806 

807 async def rollback( 

808 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None 

809 ): 

810 self.log.debug( 

811 "rollback kdu_instance {} to revision {} from cluster {}".format( 

812 kdu_instance, revision, cluster_uuid 

813 ) 

814 ) 

815 

816 # sync local dir 

817 self.fs.sync(from_path=cluster_uuid) 

818 

819 # look for instance to obtain namespace 

820 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) 

821 if not instance_info: 

822 raise K8sException("kdu_instance {} not found".format(kdu_instance)) 

823 

824 # init env, paths 

825 paths, env = self._init_paths_env( 

826 cluster_name=cluster_uuid, create_if_not_exist=True 

827 ) 

828 

829 # sync local dir 

830 self.fs.sync(from_path=cluster_uuid) 

831 

832 command = self._get_rollback_command( 

833 kdu_instance, instance_info["namespace"], revision, paths["kube_config"] 

834 ) 

835 

836 self.log.debug("rolling_back: {}".format(command)) 

837 

838 # exec helm in a task 

839 exec_task = asyncio.ensure_future( 

840 coro_or_future=self._local_async_exec( 

841 command=command, raise_exception_on_error=False, env=env 

842 ) 

843 ) 

844 # write status in another task 

845 status_task = asyncio.ensure_future( 

846 coro_or_future=self._store_status( 

847 cluster_id=cluster_uuid, 

848 kdu_instance=kdu_instance, 

849 namespace=instance_info["namespace"], 

850 db_dict=db_dict, 

851 operation="rollback", 

852 ) 

853 ) 

854 

855 # wait for execution task 

856 await asyncio.wait([exec_task]) 

857 

858 # cancel status task 

859 status_task.cancel() 

860 

861 output, rc = exec_task.result() 

862 

863 # write final status 

864 await self._store_status( 

865 cluster_id=cluster_uuid, 

866 kdu_instance=kdu_instance, 

867 namespace=instance_info["namespace"], 

868 db_dict=db_dict, 

869 operation="rollback", 

870 ) 

871 

872 if rc != 0: 

873 msg = "Error executing command: {}\nOutput: {}".format(command, output) 

874 self.log.error(msg) 

875 raise K8sException(msg) 

876 

877 # sync fs 

878 self.fs.reverse_sync(from_path=cluster_uuid) 

879 

880 # return new revision number 

881 instance = await self.get_instance_info( 

882 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance 

883 ) 

884 if instance: 

885 revision = int(instance.get("revision")) 

886 self.log.debug("New revision: {}".format(revision)) 

887 return revision 

888 else: 

889 return 0 

890 

891 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs): 

892 """ 

893 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call 

894 (this call should happen after all _terminate-config-primitive_ of the VNF 

895 are invoked). 

896 

897 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id 

898 :param kdu_instance: unique name for the KDU instance to be deleted 

899 :param kwargs: Additional parameters (None yet) 

900 :return: True if successful 

901 """ 

902 

903 self.log.debug( 

904 "uninstall kdu_instance {} from cluster {}".format( 

905 kdu_instance, cluster_uuid 

906 ) 

907 ) 

908 

909 # sync local dir 

910 self.fs.sync(from_path=cluster_uuid) 

911 

912 # look for instance to obtain namespace 

913 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) 

914 if not instance_info: 

915 self.log.warning(("kdu_instance {} not found".format(kdu_instance))) 

916 return True 

917 # init env, paths 

918 paths, env = self._init_paths_env( 

919 cluster_name=cluster_uuid, create_if_not_exist=True 

920 ) 

921 

922 # sync local dir 

923 self.fs.sync(from_path=cluster_uuid) 

924 

925 command = self._get_uninstall_command( 

926 kdu_instance, instance_info["namespace"], paths["kube_config"] 

927 ) 

928 output, _rc = await self._local_async_exec( 

929 command=command, raise_exception_on_error=True, env=env 

930 ) 

931 

932 # sync fs 

933 self.fs.reverse_sync(from_path=cluster_uuid) 

934 

935 return self._output_to_table(output) 

936 

937 async def instances_list(self, cluster_uuid: str) -> list: 

938 """ 

939 returns a list of deployed releases in a cluster 

940 

941 :param cluster_uuid: the 'cluster' or 'namespace:cluster' 

942 :return: 

943 """ 

944 

945 self.log.debug("list releases for cluster {}".format(cluster_uuid)) 

946 

947 # sync local dir 

948 self.fs.sync(from_path=cluster_uuid) 

949 

950 # execute internal command 

951 result = await self._instances_list(cluster_uuid) 

952 

953 # sync fs 

954 self.fs.reverse_sync(from_path=cluster_uuid) 

955 

956 return result 

957 

958 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str): 

959 instances = await self.instances_list(cluster_uuid=cluster_uuid) 

960 for instance in instances: 

961 if instance.get("name") == kdu_instance: 

962 return instance 

963 self.log.debug("Instance {} not found".format(kdu_instance)) 

964 return None 

965 

966 async def upgrade_charm( 

967 self, 

968 ee_id: str = None, 

969 path: str = None, 

970 charm_id: str = None, 

971 charm_type: str = None, 

972 timeout: float = None, 

973 ) -> str: 

974 """This method upgrade charms in VNFs 

975 

976 Args: 

977 ee_id: Execution environment id 

978 path: Local path to the charm 

979 charm_id: charm-id 

980 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm 

981 timeout: (Float) Timeout for the ns update operation 

982 

983 Returns: 

984 The output of the update operation if status equals to "completed" 

985 """ 

986 raise K8sException("KDUs deployed with Helm do not support charm upgrade") 

987 

988 async def exec_primitive( 

989 self, 

990 cluster_uuid: str = None, 

991 kdu_instance: str = None, 

992 primitive_name: str = None, 

993 timeout: float = 300, 

994 params: dict = None, 

995 db_dict: dict = None, 

996 **kwargs, 

997 ) -> str: 

998 """Exec primitive (Juju action) 

999 

1000 :param cluster_uuid: The UUID of the cluster or namespace:cluster 

1001 :param kdu_instance: The unique name of the KDU instance 

1002 :param primitive_name: Name of action that will be executed 

1003 :param timeout: Timeout for action execution 

1004 :param params: Dictionary of all the parameters needed for the action 

1005 :db_dict: Dictionary for any additional data 

1006 :param kwargs: Additional parameters (None yet) 

1007 

1008 :return: Returns the output of the action 

1009 """ 

1010 raise K8sException( 

1011 "KDUs deployed with Helm don't support actions " 

1012 "different from rollback, upgrade and status" 

1013 ) 

1014 

1015 async def get_services( 

1016 self, cluster_uuid: str, kdu_instance: str, namespace: str 

1017 ) -> list: 

1018 """ 

1019 Returns a list of services defined for the specified kdu instance. 

1020 

1021 :param cluster_uuid: UUID of a K8s cluster known by OSM 

1022 :param kdu_instance: unique name for the KDU instance 

1023 :param namespace: K8s namespace used by the KDU instance 

1024 :return: If successful, it will return a list of services, Each service 

1025 can have the following data: 

1026 - `name` of the service 

1027 - `type` type of service in the k8 cluster 

1028 - `ports` List of ports offered by the service, for each port includes at least 

1029 name, port, protocol 

1030 - `cluster_ip` Internal ip to be used inside k8s cluster 

1031 - `external_ip` List of external ips (in case they are available) 

1032 """ 

1033 

1034 self.log.debug( 

1035 "get_services: cluster_uuid: {}, kdu_instance: {}".format( 

1036 cluster_uuid, kdu_instance 

1037 ) 

1038 ) 

1039 

1040 # init env, paths 

1041 paths, env = self._init_paths_env( 

1042 cluster_name=cluster_uuid, create_if_not_exist=True 

1043 ) 

1044 

1045 # sync local dir 

1046 self.fs.sync(from_path=cluster_uuid) 

1047 

1048 # get list of services names for kdu 

1049 service_names = await self._get_services( 

1050 cluster_uuid, kdu_instance, namespace, paths["kube_config"] 

1051 ) 

1052 

1053 service_list = [] 

1054 for service in service_names: 

1055 service = await self._get_service(cluster_uuid, service, namespace) 

1056 service_list.append(service) 

1057 

1058 # sync fs 

1059 self.fs.reverse_sync(from_path=cluster_uuid) 

1060 

1061 return service_list 

1062 

1063 async def get_service( 

1064 self, cluster_uuid: str, service_name: str, namespace: str 

1065 ) -> object: 

1066 self.log.debug( 

1067 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( 

1068 service_name, namespace, cluster_uuid 

1069 ) 

1070 ) 

1071 

1072 # sync local dir 

1073 self.fs.sync(from_path=cluster_uuid) 

1074 

1075 service = await self._get_service(cluster_uuid, service_name, namespace) 

1076 

1077 # sync fs 

1078 self.fs.reverse_sync(from_path=cluster_uuid) 

1079 

1080 return service 

1081 

1082 async def status_kdu( 

1083 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs 

1084 ) -> Union[str, dict]: 

1085 """ 

1086 This call would retrieve tha current state of a given KDU instance. It would be 

1087 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific 

1088 values_ of the configuration parameters applied to a given instance. This call 

1089 would be based on the `status` call. 

1090 

1091 :param cluster_uuid: UUID of a K8s cluster known by OSM 

1092 :param kdu_instance: unique name for the KDU instance 

1093 :param kwargs: Additional parameters (None yet) 

1094 :param yaml_format: if the return shall be returned as an YAML string or as a 

1095 dictionary 

1096 :return: If successful, it will return the following vector of arguments: 

1097 - K8s `namespace` in the cluster where the KDU lives 

1098 - `state` of the KDU instance. It can be: 

1099 - UNKNOWN 

1100 - DEPLOYED 

1101 - DELETED 

1102 - SUPERSEDED 

1103 - FAILED or 

1104 - DELETING 

1105 - List of `resources` (objects) that this release consists of, sorted by kind, 

1106 and the status of those resources 

1107 - Last `deployment_time`. 

1108 

1109 """ 

1110 self.log.debug( 

1111 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format( 

1112 cluster_uuid, kdu_instance 

1113 ) 

1114 ) 

1115 

1116 # sync local dir 

1117 self.fs.sync(from_path=cluster_uuid) 

1118 

1119 # get instance: needed to obtain namespace 

1120 instances = await self._instances_list(cluster_id=cluster_uuid) 

1121 for instance in instances: 

1122 if instance.get("name") == kdu_instance: 

1123 break 

1124 else: 

1125 # instance does not exist 

1126 raise K8sException( 

1127 "Instance name: {} not found in cluster: {}".format( 

1128 kdu_instance, cluster_uuid 

1129 ) 

1130 ) 

1131 

1132 status = await self._status_kdu( 

1133 cluster_id=cluster_uuid, 

1134 kdu_instance=kdu_instance, 

1135 namespace=instance["namespace"], 

1136 yaml_format=yaml_format, 

1137 show_error_log=True, 

1138 ) 

1139 

1140 # sync fs 

1141 self.fs.reverse_sync(from_path=cluster_uuid) 

1142 

1143 return status 

1144 

1145 async def get_values_kdu( 

1146 self, kdu_instance: str, namespace: str, kubeconfig: str 

1147 ) -> str: 

1148 self.log.debug("get kdu_instance values {}".format(kdu_instance)) 

1149 

1150 return await self._exec_get_command( 

1151 get_command="values", 

1152 kdu_instance=kdu_instance, 

1153 namespace=namespace, 

1154 kubeconfig=kubeconfig, 

1155 ) 

1156 

1157 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: 

1158 """Method to obtain the Helm Chart package's values 

1159 

1160 Args: 

1161 kdu_model: The name or path of an Helm Chart 

1162 repo_url: Helm Chart repository url 

1163 

1164 Returns: 

1165 str: the values of the Helm Chart package 

1166 """ 

1167 

1168 self.log.debug( 

1169 "inspect kdu_model values {} from (optional) repo: {}".format( 

1170 kdu_model, repo_url 

1171 ) 

1172 ) 

1173 

1174 return await self._exec_inspect_command( 

1175 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url 

1176 ) 

1177 

1178 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: 

1179 self.log.debug( 

1180 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) 

1181 ) 

1182 

1183 return await self._exec_inspect_command( 

1184 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url 

1185 ) 

1186 

1187 async def synchronize_repos(self, cluster_uuid: str): 

1188 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) 

1189 try: 

1190 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) 

1191 db_repo_dict = self._get_db_repos_dict(db_repo_ids) 

1192 

1193 local_repo_list = await self.repo_list(cluster_uuid) 

1194 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list} 

1195 

1196 deleted_repo_list = [] 

1197 added_repo_dict = {} 

1198 

1199 # iterate over the list of repos in the database that should be 

1200 # added if not present 

1201 for repo_name, db_repo in db_repo_dict.items(): 

1202 try: 

1203 # check if it is already present 

1204 curr_repo_url = local_repo_dict.get(db_repo["name"]) 

1205 repo_id = db_repo.get("_id") 

1206 if curr_repo_url != db_repo["url"]: 

1207 if curr_repo_url: 

1208 self.log.debug( 

1209 "repo {} url changed, delete and and again".format( 

1210 db_repo["url"] 

1211 ) 

1212 ) 

1213 await self.repo_remove(cluster_uuid, db_repo["name"]) 

1214 deleted_repo_list.append(repo_id) 

1215 

1216 # add repo 

1217 self.log.debug("add repo {}".format(db_repo["name"])) 

1218 await self.repo_add( 

1219 cluster_uuid, 

1220 db_repo["name"], 

1221 db_repo["url"], 

1222 cert=db_repo.get("ca_cert"), 

1223 user=db_repo.get("user"), 

1224 password=db_repo.get("password"), 

1225 oci=db_repo.get("oci", False), 

1226 ) 

1227 added_repo_dict[repo_id] = db_repo["name"] 

1228 except Exception as e: 

1229 raise K8sException( 

1230 "Error adding repo id: {}, err_msg: {} ".format( 

1231 repo_id, repr(e) 

1232 ) 

1233 ) 

1234 

1235 # Delete repos that are present but not in nbi_list 

1236 for repo_name in local_repo_dict: 

1237 if not db_repo_dict.get(repo_name) and repo_name != "stable": 

1238 self.log.debug("delete repo {}".format(repo_name)) 

1239 try: 

1240 await self.repo_remove(cluster_uuid, repo_name) 

1241 deleted_repo_list.append(repo_name) 

1242 except Exception as e: 

1243 self.warning( 

1244 "Error deleting repo, name: {}, err_msg: {}".format( 

1245 repo_name, str(e) 

1246 ) 

1247 ) 

1248 

1249 return deleted_repo_list, added_repo_dict 

1250 

1251 except K8sException: 

1252 raise 

1253 except Exception as e: 

1254 # Do not raise errors synchronizing repos 

1255 self.log.error("Error synchronizing repos: {}".format(e)) 

1256 raise Exception("Error synchronizing repos: {}".format(e)) 

1257 

1258 def _get_db_repos_dict(self, repo_ids: list): 

1259 db_repos_dict = {} 

1260 for repo_id in repo_ids: 

1261 db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) 

1262 db_repos_dict[db_repo["name"]] = db_repo 

1263 return db_repos_dict 

1264 

1265 """ 

1266 #################################################################################### 

1267 ################################### TO BE IMPLEMENTED SUBCLASSES ################### 

1268 #################################################################################### 

1269 """ 

1270 

1271 @abc.abstractmethod 

1272 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True): 

1273 """ 

1274 Creates and returns base cluster and kube dirs and returns them. 

1275 Also created helm3 dirs according to new directory specification, paths are 

1276 not returned but assigned to helm environment variables 

1277 

1278 :param cluster_name: cluster_name 

1279 :return: Dictionary with config_paths and dictionary with helm environment variables 

1280 """ 

1281 

1282 @abc.abstractmethod 

1283 async def _cluster_init(self, cluster_id, namespace, paths, env): 

1284 """ 

1285 Implements the helm version dependent cluster initialization 

1286 """ 

1287 

1288 @abc.abstractmethod 

1289 async def _instances_list(self, cluster_id): 

1290 """ 

1291 Implements the helm version dependent helm instances list 

1292 """ 

1293 

1294 @abc.abstractmethod 

1295 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): 

1296 """ 

1297 Implements the helm version dependent method to obtain services from a helm instance 

1298 """ 

1299 

1300 @abc.abstractmethod 

1301 async def _status_kdu( 

1302 self, 

1303 cluster_id: str, 

1304 kdu_instance: str, 

1305 namespace: str = None, 

1306 yaml_format: bool = False, 

1307 show_error_log: bool = False, 

1308 ) -> Union[str, dict]: 

1309 """ 

1310 Implements the helm version dependent method to obtain status of a helm instance 

1311 """ 

1312 

1313 @abc.abstractmethod 

1314 def _get_install_command( 

1315 self, 

1316 kdu_model, 

1317 kdu_instance, 

1318 namespace, 

1319 params_str, 

1320 version, 

1321 atomic, 

1322 timeout, 

1323 kubeconfig, 

1324 ) -> str: 

1325 """ 

1326 Obtain command to be executed to delete the indicated instance 

1327 """ 

1328 

1329 @abc.abstractmethod 

1330 def _get_upgrade_scale_command( 

1331 self, 

1332 kdu_model, 

1333 kdu_instance, 

1334 namespace, 

1335 count, 

1336 version, 

1337 atomic, 

1338 replicas, 

1339 timeout, 

1340 resource_name, 

1341 kubeconfig, 

1342 ) -> str: 

1343 """Generates the command to scale a Helm Chart release 

1344 

1345 Args: 

1346 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository 

1347 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question 

1348 namespace (str): Namespace where this KDU instance is deployed 

1349 scale (int): Scale count 

1350 version (str): Constraint with specific version of the Chart to use 

1351 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. 

1352 The --wait flag will be set automatically if --atomic is used 

1353 replica_str (str): The key under resource_name key where the scale count is stored 

1354 timeout (float): The time, in seconds, to wait 

1355 resource_name (str): The KDU's resource to scale 

1356 kubeconfig (str): Kubeconfig file path 

1357 

1358 Returns: 

1359 str: command to scale a Helm Chart release 

1360 """ 

1361 

1362 @abc.abstractmethod 

1363 def _get_upgrade_command( 

1364 self, 

1365 kdu_model, 

1366 kdu_instance, 

1367 namespace, 

1368 params_str, 

1369 version, 

1370 atomic, 

1371 timeout, 

1372 kubeconfig, 

1373 reset_values, 

1374 reuse_values, 

1375 reset_then_reuse_values, 

1376 force, 

1377 ) -> str: 

1378 """Generates the command to upgrade a Helm Chart release 

1379 

1380 Args: 

1381 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository 

1382 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question 

1383 namespace (str): Namespace where this KDU instance is deployed 

1384 params_str (str): Params used to upgrade the Helm Chart release 

1385 version (str): Constraint with specific version of the Chart to use 

1386 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade. 

1387 The --wait flag will be set automatically if --atomic is used 

1388 timeout (float): The time, in seconds, to wait 

1389 kubeconfig (str): Kubeconfig file path 

1390 reset_values(bool): If set, helm resets values instead of reusing previous values. 

1391 reuse_values(bool): If set, helm reuses previous values. 

1392 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values 

1393 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods. 

1394 Returns: 

1395 str: command to upgrade a Helm Chart release 

1396 """ 

1397 

1398 @abc.abstractmethod 

1399 def _get_rollback_command( 

1400 self, kdu_instance, namespace, revision, kubeconfig 

1401 ) -> str: 

1402 """ 

1403 Obtain command to be executed to rollback the indicated instance 

1404 """ 

1405 

1406 @abc.abstractmethod 

1407 def _get_uninstall_command( 

1408 self, kdu_instance: str, namespace: str, kubeconfig: str 

1409 ) -> str: 

1410 """ 

1411 Obtain command to be executed to delete the indicated instance 

1412 """ 

1413 

1414 @abc.abstractmethod 

1415 def _get_inspect_command( 

1416 self, show_command: str, kdu_model: str, repo_str: str, version: str 

1417 ): 

1418 """Generates the command to obtain the information about an Helm Chart package 

1419 (´helm show ...´ command) 

1420 

1421 Args: 

1422 show_command: the second part of the command (`helm show <show_command>`) 

1423 kdu_model: The name or path of an Helm Chart 

1424 repo_url: Helm Chart repository url 

1425 version: constraint with specific version of the Chart to use 

1426 

1427 Returns: 

1428 str: the generated Helm Chart command 

1429 """ 

1430 

1431 @abc.abstractmethod 

1432 def _get_get_command( 

1433 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str 

1434 ): 

1435 """Obtain command to be executed to get information about the kdu instance.""" 

1436 

1437 @abc.abstractmethod 

1438 async def _uninstall_sw(self, cluster_id: str, namespace: str): 

1439 """ 

1440 Method call to uninstall cluster software for helm. This method is dependent 

1441 of helm version 

1442 For Helm v2 it will be called when Tiller must be uninstalled 

1443 For Helm v3 it does nothing and does not need to be callled 

1444 """ 

1445 

1446 @abc.abstractmethod 

1447 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list: 

1448 """ 

1449 Obtains the cluster repos identifiers 

1450 """ 

1451 

1452 """ 

1453 #################################################################################### 

1454 ################################### P R I V A T E ################################## 

1455 #################################################################################### 

1456 """ 

1457 

1458 @staticmethod 

1459 def _check_file_exists(filename: str, exception_if_not_exists: bool = False): 

1460 if os.path.exists(filename): 

1461 return True 

1462 else: 

1463 msg = "File {} does not exist".format(filename) 

1464 if exception_if_not_exists: 

1465 raise K8sException(msg) 

1466 

1467 @staticmethod 

1468 def _remove_multiple_spaces(strobj): 

1469 strobj = strobj.strip() 

1470 while " " in strobj: 

1471 strobj = strobj.replace(" ", " ") 

1472 return strobj 

1473 

1474 @staticmethod 

1475 def _output_to_lines(output: str) -> list: 

1476 output_lines = list() 

1477 lines = output.splitlines(keepends=False) 

1478 for line in lines: 

1479 line = line.strip() 

1480 if len(line) > 0: 

1481 output_lines.append(line) 

1482 return output_lines 

1483 

1484 @staticmethod 

1485 def _output_to_table(output: str) -> list: 

1486 output_table = list() 

1487 lines = output.splitlines(keepends=False) 

1488 for line in lines: 

1489 line = line.replace("\t", " ") 

1490 line_list = list() 

1491 output_table.append(line_list) 

1492 cells = line.split(sep=" ") 

1493 for cell in cells: 

1494 cell = cell.strip() 

1495 if len(cell) > 0: 

1496 line_list.append(cell) 

1497 return output_table 

1498 

1499 @staticmethod 

1500 def _parse_services(output: str) -> list: 

1501 lines = output.splitlines(keepends=False) 

1502 services = [] 

1503 for line in lines: 

1504 line = line.replace("\t", " ") 

1505 cells = line.split(sep=" ") 

1506 if len(cells) > 0 and cells[0].startswith("service/"): 

1507 elems = cells[0].split(sep="/") 

1508 if len(elems) > 1: 

1509 services.append(elems[1]) 

1510 return services 

1511 

1512 @staticmethod 

1513 def _get_deep(dictionary: dict, members: tuple): 

1514 target = dictionary 

1515 value = None 

1516 try: 

1517 for m in members: 

1518 value = target.get(m) 

1519 if not value: 

1520 return None 

1521 else: 

1522 target = value 

1523 except Exception: 

1524 pass 

1525 return value 

1526 

1527 # find key:value in several lines 

1528 @staticmethod 

1529 def _find_in_lines(p_lines: list, p_key: str) -> str: 

1530 for line in p_lines: 

1531 try: 

1532 if line.startswith(p_key + ":"): 

1533 parts = line.split(":") 

1534 the_value = parts[1].strip() 

1535 return the_value 

1536 except Exception: 

1537 # ignore it 

1538 pass 

1539 return None 

1540 

1541 @staticmethod 

1542 def _lower_keys_list(input_list: list): 

1543 """ 

1544 Transform the keys in a list of dictionaries to lower case and returns a new list 

1545 of dictionaries 

1546 """ 

1547 new_list = [] 

1548 if input_list: 

1549 for dictionary in input_list: 

1550 new_dict = dict((k.lower(), v) for k, v in dictionary.items()) 

1551 new_list.append(new_dict) 

1552 return new_list 

1553 

1554 async def _local_async_exec( 

1555 self, 

1556 command: str, 

1557 raise_exception_on_error: bool = False, 

1558 show_error_log: bool = True, 

1559 encode_utf8: bool = False, 

1560 env: dict = None, 

1561 ) -> tuple[str, int]: 

1562 command = K8sHelmBaseConnector._remove_multiple_spaces(command) 

1563 self.log.debug( 

1564 "Executing async local command: {}, env: {}".format(command, env) 

1565 ) 

1566 

1567 # split command 

1568 command = shlex.split(command) 

1569 

1570 environ = os.environ.copy() 

1571 if env: 

1572 environ.update(env) 

1573 

1574 try: 

1575 async with self.cmd_lock: 

1576 process = await asyncio.create_subprocess_exec( 

1577 *command, 

1578 stdout=asyncio.subprocess.PIPE, 

1579 stderr=asyncio.subprocess.PIPE, 

1580 env=environ, 

1581 ) 

1582 

1583 # wait for command terminate 

1584 stdout, stderr = await process.communicate() 

1585 

1586 return_code = process.returncode 

1587 

1588 output = "" 

1589 if stdout: 

1590 output = stdout.decode("utf-8").strip() 

1591 # output = stdout.decode() 

1592 if stderr: 

1593 output = stderr.decode("utf-8").strip() 

1594 # output = stderr.decode() 

1595 

1596 if return_code != 0 and show_error_log: 

1597 self.log.debug( 

1598 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) 

1599 ) 

1600 else: 

1601 self.log.debug("Return code: {}".format(return_code)) 

1602 

1603 if raise_exception_on_error and return_code != 0: 

1604 raise K8sException(output) 

1605 

1606 if encode_utf8: 

1607 output = output.encode("utf-8").strip() 

1608 output = str(output).replace("\\n", "\n") 

1609 

1610 return output, return_code 

1611 

1612 except asyncio.CancelledError: 

1613 # first, kill the process if it is still running 

1614 if process.returncode is None: 

1615 process.kill() 

1616 raise 

1617 except K8sException: 

1618 raise 

1619 except Exception as e: 

1620 msg = "Exception executing command: {} -> {}".format(command, e) 

1621 self.log.error(msg) 

1622 if raise_exception_on_error: 

1623 raise K8sException(e) from e 

1624 else: 

1625 return "", -1 

1626 

1627 async def _local_async_exec_pipe( 

1628 self, 

1629 command1: str, 

1630 command2: str, 

1631 raise_exception_on_error: bool = True, 

1632 show_error_log: bool = True, 

1633 encode_utf8: bool = False, 

1634 env: dict = None, 

1635 ): 

1636 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) 

1637 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) 

1638 command = "{} | {}".format(command1, command2) 

1639 self.log.debug( 

1640 "Executing async local command: {}, env: {}".format(command, env) 

1641 ) 

1642 

1643 # split command 

1644 command1 = shlex.split(command1) 

1645 command2 = shlex.split(command2) 

1646 

1647 environ = os.environ.copy() 

1648 if env: 

1649 environ.update(env) 

1650 

1651 try: 

1652 async with self.cmd_lock: 

1653 read, write = os.pipe() 

1654 process_1 = await asyncio.create_subprocess_exec( 

1655 *command1, stdout=write, env=environ 

1656 ) 

1657 os.close(write) 

1658 process_2 = await asyncio.create_subprocess_exec( 

1659 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ 

1660 ) 

1661 os.close(read) 

1662 stdout, stderr = await process_2.communicate() 

1663 

1664 return_code = process_2.returncode 

1665 

1666 output = "" 

1667 if stdout: 

1668 output = stdout.decode("utf-8").strip() 

1669 # output = stdout.decode() 

1670 if stderr: 

1671 output = stderr.decode("utf-8").strip() 

1672 # output = stderr.decode() 

1673 

1674 if return_code != 0 and show_error_log: 

1675 self.log.debug( 

1676 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) 

1677 ) 

1678 else: 

1679 self.log.debug("Return code: {}".format(return_code)) 

1680 

1681 if raise_exception_on_error and return_code != 0: 

1682 raise K8sException(output) 

1683 

1684 if encode_utf8: 

1685 output = output.encode("utf-8").strip() 

1686 output = str(output).replace("\\n", "\n") 

1687 

1688 return output, return_code 

1689 except asyncio.CancelledError: 

1690 # first, kill the processes if they are still running 

1691 for process in (process_1, process_2): 

1692 if process.returncode is None: 

1693 process.kill() 

1694 raise 

1695 except K8sException: 

1696 raise 

1697 except Exception as e: 

1698 msg = "Exception executing command: {} -> {}".format(command, e) 

1699 self.log.error(msg) 

1700 if raise_exception_on_error: 

1701 raise K8sException(e) from e 

1702 else: 

1703 return "", -1 

1704 

1705 async def _get_service(self, cluster_id, service_name, namespace): 

1706 """ 

1707 Obtains the data of the specified service in the k8cluster. 

1708 

1709 :param cluster_id: id of a K8s cluster known by OSM 

1710 :param service_name: name of the K8s service in the specified namespace 

1711 :param namespace: K8s namespace used by the KDU instance 

1712 :return: If successful, it will return a service with the following data: 

1713 - `name` of the service 

1714 - `type` type of service in the k8 cluster 

1715 - `ports` List of ports offered by the service, for each port includes at least 

1716 name, port, protocol 

1717 - `cluster_ip` Internal ip to be used inside k8s cluster 

1718 - `external_ip` List of external ips (in case they are available) 

1719 """ 

1720 

1721 # init config, env 

1722 paths, env = self._init_paths_env( 

1723 cluster_name=cluster_id, create_if_not_exist=True 

1724 ) 

1725 

1726 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format( 

1727 self.kubectl_command, 

1728 paths["kube_config"], 

1729 quote(namespace), 

1730 quote(service_name), 

1731 ) 

1732 

1733 output, _rc = await self._local_async_exec( 

1734 command=command, raise_exception_on_error=True, env=env 

1735 ) 

1736 

1737 data = yaml.load(output, Loader=yaml.SafeLoader) 

1738 

1739 service = { 

1740 "name": service_name, 

1741 "type": self._get_deep(data, ("spec", "type")), 

1742 "ports": self._get_deep(data, ("spec", "ports")), 

1743 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")), 

1744 } 

1745 if service["type"] == "LoadBalancer": 

1746 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress")) 

1747 ip_list = [elem["ip"] for elem in ip_map_list] 

1748 service["external_ip"] = ip_list 

1749 

1750 return service 

1751 

1752 async def _exec_get_command( 

1753 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str 

1754 ): 

1755 """Obtains information about the kdu instance.""" 

1756 

1757 full_command = self._get_get_command( 

1758 get_command, kdu_instance, namespace, kubeconfig 

1759 ) 

1760 

1761 output, _rc = await self._local_async_exec(command=full_command) 

1762 

1763 return output 

1764 

1765 async def _exec_inspect_command( 

1766 self, inspect_command: str, kdu_model: str, repo_url: str = None 

1767 ): 

1768 """Obtains information about an Helm Chart package (´helm show´ command) 

1769 

1770 Args: 

1771 inspect_command: the Helm sub command (`helm show <inspect_command> ...`) 

1772 kdu_model: The name or path of an Helm Chart 

1773 repo_url: Helm Chart repository url 

1774 

1775 Returns: 

1776 str: the requested info about the Helm Chart package 

1777 """ 

1778 

1779 repo_str = "" 

1780 if repo_url: 

1781 repo_str = " --repo {}".format(quote(repo_url)) 

1782 

1783 # Obtain the Chart's name and store it in the var kdu_model 

1784 kdu_model, _ = self._split_repo(kdu_model=kdu_model) 

1785 

1786 kdu_model, version = self._split_version(kdu_model) 

1787 if version: 

1788 version_str = "--version {}".format(quote(version)) 

1789 else: 

1790 version_str = "" 

1791 

1792 full_command = self._get_inspect_command( 

1793 show_command=inspect_command, 

1794 kdu_model=quote(kdu_model), 

1795 repo_str=repo_str, 

1796 version=version_str, 

1797 ) 

1798 

1799 output, _ = await self._local_async_exec(command=full_command) 

1800 

1801 return output 

1802 

1803 async def _get_replica_count_url( 

1804 self, 

1805 kdu_model: str, 

1806 repo_url: str = None, 

1807 resource_name: str = None, 

1808 ) -> tuple[int, str]: 

1809 """Get the replica count value in the Helm Chart Values. 

1810 

1811 Args: 

1812 kdu_model: The name or path of an Helm Chart 

1813 repo_url: Helm Chart repository url 

1814 resource_name: Resource name 

1815 

1816 Returns: 

1817 A tuple with: 

1818 - The number of replicas of the specific instance; if not found, returns None; and 

1819 - The string corresponding to the replica count key in the Helm values 

1820 """ 

1821 

1822 kdu_values = yaml.load( 

1823 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url), 

1824 Loader=yaml.SafeLoader, 

1825 ) 

1826 

1827 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}") 

1828 

1829 if not kdu_values: 

1830 raise K8sException( 

1831 "kdu_values not found for kdu_model {}".format(kdu_model) 

1832 ) 

1833 

1834 if resource_name: 

1835 kdu_values = kdu_values.get(resource_name, None) 

1836 

1837 if not kdu_values: 

1838 msg = "resource {} not found in the values in model {}".format( 

1839 resource_name, kdu_model 

1840 ) 

1841 self.log.error(msg) 

1842 raise K8sException(msg) 

1843 

1844 duplicate_check = False 

1845 

1846 replica_str = "" 

1847 replicas = None 

1848 

1849 if kdu_values.get("replicaCount") is not None: 

1850 replicas = kdu_values["replicaCount"] 

1851 replica_str = "replicaCount" 

1852 elif kdu_values.get("replicas") is not None: 

1853 duplicate_check = True 

1854 replicas = kdu_values["replicas"] 

1855 replica_str = "replicas" 

1856 else: 

1857 if resource_name: 

1858 msg = ( 

1859 "replicaCount or replicas not found in the resource" 

1860 "{} values in model {}. Cannot be scaled".format( 

1861 resource_name, kdu_model 

1862 ) 

1863 ) 

1864 else: 

1865 msg = ( 

1866 "replicaCount or replicas not found in the values" 

1867 "in model {}. Cannot be scaled".format(kdu_model) 

1868 ) 

1869 self.log.error(msg) 

1870 raise K8sException(msg) 

1871 

1872 # Control if replicas and replicaCount exists at the same time 

1873 msg = "replicaCount and replicas are exists at the same time" 

1874 if duplicate_check: 

1875 if "replicaCount" in kdu_values: 

1876 self.log.error(msg) 

1877 raise K8sException(msg) 

1878 else: 

1879 if "replicas" in kdu_values: 

1880 self.log.error(msg) 

1881 raise K8sException(msg) 

1882 

1883 return replicas, replica_str 

1884 

1885 async def _get_replica_count_instance( 

1886 self, 

1887 kdu_instance: str, 

1888 namespace: str, 

1889 kubeconfig: str, 

1890 resource_name: str = None, 

1891 ) -> int: 

1892 """Get the replica count value in the instance. 

1893 

1894 Args: 

1895 kdu_instance: The name of the KDU instance 

1896 namespace: KDU instance namespace 

1897 kubeconfig: 

1898 resource_name: Resource name 

1899 

1900 Returns: 

1901 The number of replicas of the specific instance; if not found, returns None 

1902 """ 

1903 

1904 kdu_values = yaml.load( 

1905 await self.get_values_kdu(kdu_instance, namespace, kubeconfig), 

1906 Loader=yaml.SafeLoader, 

1907 ) 

1908 

1909 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}") 

1910 

1911 replicas = None 

1912 

1913 if kdu_values: 

1914 resource_values = ( 

1915 kdu_values.get(resource_name, None) if resource_name else None 

1916 ) 

1917 

1918 for replica_str in ("replicaCount", "replicas"): 

1919 if resource_values: 

1920 replicas = resource_values.get(replica_str) 

1921 else: 

1922 replicas = kdu_values.get(replica_str) 

1923 

1924 if replicas is not None: 

1925 break 

1926 

1927 return replicas 

1928 

1929 async def _store_status( 

1930 self, 

1931 cluster_id: str, 

1932 operation: str, 

1933 kdu_instance: str, 

1934 namespace: str = None, 

1935 db_dict: dict = None, 

1936 ) -> None: 

1937 """ 

1938 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. 

1939 

1940 :param cluster_id (str): the cluster where the KDU instance is deployed 

1941 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") 

1942 :param kdu_instance (str): The KDU instance in relation to which the status is obtained 

1943 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None 

1944 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the 

1945 values for the keys: 

1946 - "collection": The Mongo DB collection to write to 

1947 - "filter": The query filter to use in the update process 

1948 - "path": The dot separated keys which targets the object to be updated 

1949 Defaults to None. 

1950 """ 

1951 

1952 try: 

1953 detailed_status = await self._status_kdu( 

1954 cluster_id=cluster_id, 

1955 kdu_instance=kdu_instance, 

1956 yaml_format=False, 

1957 namespace=namespace, 

1958 ) 

1959 

1960 status = detailed_status.get("info").get("description") 

1961 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") 

1962 

1963 # write status to db 

1964 result = await self.write_app_status_to_db( 

1965 db_dict=db_dict, 

1966 status=str(status), 

1967 detailed_status=str(detailed_status), 

1968 operation=operation, 

1969 ) 

1970 

1971 if not result: 

1972 self.log.info("Error writing in database. Task exiting...") 

1973 

1974 except asyncio.CancelledError as e: 

1975 self.log.warning( 

1976 f"Exception in method {self._store_status.__name__} (task cancelled): {e}" 

1977 ) 

1978 except Exception as e: 

1979 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") 

1980 

1981 # params for use in -f file 

1982 # returns values file option and filename (in order to delete it at the end) 

1983 def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]: 

1984 if params and len(params) > 0: 

1985 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) 

1986 

1987 def get_random_number(): 

1988 r = random.SystemRandom().randint(1, 99999999) 

1989 s = str(r) 

1990 while len(s) < 10: 

1991 s = "0" + s 

1992 return s 

1993 

1994 params2 = dict() 

1995 for key in params: 

1996 value = params.get(key) 

1997 if "!!yaml" in str(value): 

1998 value = yaml.safe_load(value[7:]) 

1999 params2[key] = value 

2000 

2001 values_file = get_random_number() + ".yaml" 

2002 with open(values_file, "w") as stream: 

2003 yaml.dump(params2, stream, indent=4, default_flow_style=False) 

2004 

2005 return "-f {}".format(values_file), values_file 

2006 

2007 return "", None 

2008 

2009 # params for use in --set option 

2010 @staticmethod 

2011 def _params_to_set_option(params: dict) -> str: 

2012 pairs = [ 

2013 f"{quote(str(key))}={quote(str(value))}" 

2014 for key, value in params.items() 

2015 if value is not None 

2016 ] 

2017 if not pairs: 

2018 return "" 

2019 return "--set " + ",".join(pairs) 

2020 

2021 @staticmethod 

2022 def generate_kdu_instance_name(**kwargs): 

2023 chart_name = kwargs["kdu_model"] 

2024 # check embeded chart (file or dir) 

2025 if chart_name.startswith("/"): 

2026 # extract file or directory name 

2027 chart_name = chart_name[chart_name.rfind("/") + 1 :] 

2028 # check URL 

2029 elif "://" in chart_name: 

2030 # extract last portion of URL 

2031 chart_name = chart_name[chart_name.rfind("/") + 1 :] 

2032 

2033 name = "" 

2034 for c in chart_name: 

2035 if c.isalpha() or c.isnumeric(): 

2036 name += c 

2037 else: 

2038 name += "-" 

2039 if len(name) > 35: 

2040 name = name[0:35] 

2041 

2042 # if does not start with alpha character, prefix 'a' 

2043 if not name[0].isalpha(): 

2044 name = "a" + name 

2045 

2046 name += "-" 

2047 

2048 def get_random_number(): 

2049 r = random.SystemRandom().randint(1, 99999999) 

2050 s = str(r) 

2051 s = s.rjust(10, "0") 

2052 return s 

2053 

2054 name = name + get_random_number() 

2055 return name.lower() 

2056 

2057 def _split_version(self, kdu_model: str) -> tuple[str, str]: 

2058 version = None 

2059 if ( 

2060 not ( 

2061 self._is_helm_chart_a_file(kdu_model) 

2062 or self._is_helm_chart_a_url(kdu_model) 

2063 ) 

2064 and ":" in kdu_model 

2065 ): 

2066 parts = kdu_model.split(sep=":") 

2067 if len(parts) == 2: 

2068 version = str(parts[1]) 

2069 kdu_model = parts[0] 

2070 return kdu_model, version 

2071 

2072 def _split_repo(self, kdu_model: str) -> tuple[str, str]: 

2073 """Obtain the Helm Chart's repository and Chart's names from the KDU model 

2074 

2075 Args: 

2076 kdu_model (str): Associated KDU model 

2077 

2078 Returns: 

2079 (str, str): Tuple with the Chart name in index 0, and the repo name 

2080 in index 2; if there was a problem finding them, return None 

2081 for both 

2082 """ 

2083 

2084 chart_name = None 

2085 repo_name = None 

2086 

2087 idx = kdu_model.find("/") 

2088 if not self._is_helm_chart_a_url(kdu_model) and idx >= 0: 

2089 chart_name = kdu_model[idx + 1 :] 

2090 repo_name = kdu_model[:idx] 

2091 

2092 return chart_name, repo_name 

2093 

2094 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str: 

2095 """Obtain the Helm repository for an Helm Chart 

2096 

2097 Args: 

2098 kdu_model (str): the KDU model associated with the Helm Chart instantiation 

2099 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation 

2100 

2101 Returns: 

2102 str: the repository URL; if Helm Chart is a local one, the function returns None 

2103 """ 

2104 

2105 _, repo_name = self._split_repo(kdu_model=kdu_model) 

2106 

2107 repo_url = None 

2108 if repo_name: 

2109 # Find repository link 

2110 local_repo_list = await self.repo_list(cluster_uuid) 

2111 for repo in local_repo_list: 

2112 if repo["name"] == repo_name: 

2113 repo_url = repo["url"] 

2114 break # it is not necessary to continue the loop if the repo link was found... 

2115 

2116 return repo_url 

2117 

2118 def _repo_to_oci_url(self, repo): 

2119 db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False) 

2120 if db_repo and "oci" in db_repo: 

2121 return db_repo.get("url") 

2122 

2123 async def _prepare_helm_chart(self, kdu_model, cluster_id): 

2124 # e.g.: "stable/openldap", "1.0" 

2125 kdu_model, version = self._split_version(kdu_model) 

2126 # e.g.: "openldap, stable" 

2127 chart_name, repo = self._split_repo(kdu_model) 

2128 if repo and chart_name: # repo/chart case 

2129 oci_url = self._repo_to_oci_url(repo) 

2130 if oci_url: # oci does not require helm repo update 

2131 kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema 

2132 else: 

2133 await self.repo_update(cluster_id, repo) 

2134 return kdu_model, version 

2135 

2136 async def create_certificate( 

2137 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage 

2138 ): 

2139 paths, env = self._init_paths_env( 

2140 cluster_name=cluster_uuid, create_if_not_exist=True 

2141 ) 

2142 kubectl = Kubectl(config_file=paths["kube_config"]) 

2143 await kubectl.create_certificate( 

2144 namespace=namespace, 

2145 name=name, 

2146 dns_prefix=dns_prefix, 

2147 secret_name=secret_name, 

2148 usages=[usage], 

2149 issuer_name="ca-issuer", 

2150 ) 

2151 

2152 async def delete_certificate(self, cluster_uuid, namespace, certificate_name): 

2153 paths, env = self._init_paths_env( 

2154 cluster_name=cluster_uuid, create_if_not_exist=True 

2155 ) 

2156 kubectl = Kubectl(config_file=paths["kube_config"]) 

2157 await kubectl.delete_certificate(namespace, certificate_name) 

2158 

2159 async def create_namespace( 

2160 self, 

2161 namespace, 

2162 cluster_uuid, 

2163 labels, 

2164 ): 

2165 """ 

2166 Create a namespace in a specific cluster 

2167 

2168 :param namespace: Namespace to be created 

2169 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig 

2170 :param labels: Dictionary with labels for the new namespace 

2171 :returns: None 

2172 """ 

2173 paths, env = self._init_paths_env( 

2174 cluster_name=cluster_uuid, create_if_not_exist=True 

2175 ) 

2176 kubectl = Kubectl(config_file=paths["kube_config"]) 

2177 await kubectl.create_namespace( 

2178 name=namespace, 

2179 labels=labels, 

2180 ) 

2181 

2182 async def delete_namespace( 

2183 self, 

2184 namespace, 

2185 cluster_uuid, 

2186 ): 

2187 """ 

2188 Delete a namespace in a specific cluster 

2189 

2190 :param namespace: namespace to be deleted 

2191 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig 

2192 :returns: None 

2193 """ 

2194 paths, env = self._init_paths_env( 

2195 cluster_name=cluster_uuid, create_if_not_exist=True 

2196 ) 

2197 kubectl = Kubectl(config_file=paths["kube_config"]) 

2198 await kubectl.delete_namespace( 

2199 name=namespace, 

2200 ) 

2201 

2202 async def copy_secret_data( 

2203 self, 

2204 src_secret: str, 

2205 dst_secret: str, 

2206 cluster_uuid: str, 

2207 data_key: str, 

2208 src_namespace: str = "osm", 

2209 dst_namespace: str = "osm", 

2210 ): 

2211 """ 

2212 Copy a single key and value from an existing secret to a new one 

2213 

2214 :param src_secret: name of the existing secret 

2215 :param dst_secret: name of the new secret 

2216 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig 

2217 :param data_key: key of the existing secret to be copied 

2218 :param src_namespace: Namespace of the existing secret 

2219 :param dst_namespace: Namespace of the new secret 

2220 :returns: None 

2221 """ 

2222 paths, env = self._init_paths_env( 

2223 cluster_name=cluster_uuid, create_if_not_exist=True 

2224 ) 

2225 kubectl = Kubectl(config_file=paths["kube_config"]) 

2226 secret_data = await kubectl.get_secret_content( 

2227 name=src_secret, 

2228 namespace=src_namespace, 

2229 ) 

2230 # Only the corresponding data_key value needs to be copy 

2231 data = {data_key: secret_data.get(data_key)} 

2232 await kubectl.create_secret( 

2233 name=dst_secret, 

2234 data=data, 

2235 namespace=dst_namespace, 

2236 secret_type="Opaque", 

2237 ) 

2238 

2239 async def setup_default_rbac( 

2240 self, 

2241 name, 

2242 namespace, 

2243 cluster_uuid, 

2244 api_groups, 

2245 resources, 

2246 verbs, 

2247 service_account, 

2248 ): 

2249 """ 

2250 Create a basic RBAC for a new namespace. 

2251 

2252 :param name: name of both Role and Role Binding 

2253 :param namespace: K8s namespace 

2254 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig 

2255 :param api_groups: Api groups to be allowed in Policy Rule 

2256 :param resources: Resources to be allowed in Policy Rule 

2257 :param verbs: Verbs to be allowed in Policy Rule 

2258 :param service_account: Service Account name used to bind the Role 

2259 :returns: None 

2260 """ 

2261 paths, env = self._init_paths_env( 

2262 cluster_name=cluster_uuid, create_if_not_exist=True 

2263 ) 

2264 kubectl = Kubectl(config_file=paths["kube_config"]) 

2265 await kubectl.create_role( 

2266 name=name, 

2267 labels={}, 

2268 namespace=namespace, 

2269 api_groups=api_groups, 

2270 resources=resources, 

2271 verbs=verbs, 

2272 ) 

2273 await kubectl.create_role_binding( 

2274 name=name, 

2275 labels={}, 

2276 namespace=namespace, 

2277 role_name=name, 

2278 sa_name=service_account, 

2279 )