Coverage for n2vc/k8s_juju_conn.py: 88%

322 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-29 09:03 +0000

1# Copyright 2019 Canonical Ltd. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import asyncio 

16from typing import Union 

17import os 

18import uuid 

19import yaml 

20import tempfile 

21import binascii 

22 

23from n2vc.config import EnvironConfig 

24from n2vc.definitions import RelationEndpoint 

25from n2vc.exceptions import K8sException 

26from n2vc.k8s_conn import K8sConnector 

27from n2vc.kubectl import Kubectl 

28from .exceptions import MethodNotImplemented 

29from n2vc.libjuju import Libjuju 

30from n2vc.utils import obj_to_dict, obj_to_yaml 

31from n2vc.store import MotorStore 

32from n2vc.vca.cloud import Cloud 

33from n2vc.vca.connection import get_connection 

34 

35 

36RBAC_LABEL_KEY_NAME = "rbac-id" 

37RBAC_STACK_PREFIX = "juju-credential" 

38 

39 

40def generate_rbac_id(): 

41 return binascii.hexlify(os.urandom(4)).decode() 

42 

43 

44class K8sJujuConnector(K8sConnector): 

45 libjuju = None 

46 

47 def __init__( 

48 self, 

49 fs: object, 

50 db: object, 

51 kubectl_command: str = "/usr/bin/kubectl", 

52 juju_command: str = "/usr/bin/juju", 

53 log: object = None, 

54 on_update_db=None, 

55 ): 

56 """ 

57 :param fs: file system for kubernetes and helm configuration 

58 :param db: Database object 

59 :param kubectl_command: path to kubectl executable 

60 :param helm_command: path to helm executable 

61 :param log: logger 

62 """ 

63 

64 # parent class 

65 K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db) 

66 

67 self.fs = fs 

68 self.log.debug("Initializing K8S Juju connector") 

69 

70 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri") 

71 self._store = MotorStore(db_uri) 

72 self.loading_libjuju = asyncio.Lock() 

73 self.uninstall_locks = {} 

74 

75 self.log.debug("K8S Juju connector initialized") 

76 # TODO: Remove these commented lines: 

77 # self.authenticated = False 

78 # self.models = {} 

79 # self.juju_secret = "" 

80 

81 """Initialization""" 

82 

83 async def init_env( 

84 self, 

85 k8s_creds: str, 

86 namespace: str = "kube-system", 

87 reuse_cluster_uuid: str = None, 

88 **kwargs, 

89 ) -> (str, bool): 

90 """ 

91 It prepares a given K8s cluster environment to run Juju bundles. 

92 

93 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid 

94 '.kube/config' 

95 :param namespace: optional namespace to be used for juju. By default, 

96 'kube-system' will be used 

97 :param reuse_cluster_uuid: existing cluster uuid for reuse 

98 :param: kwargs: Additional parameters 

99 vca_id (str): VCA ID 

100 

101 :return: uuid of the K8s cluster and True if connector has installed some 

102 software in the cluster 

103 (on error, an exception will be raised) 

104 """ 

105 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

106 

107 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4()) 

108 kubectl = self._get_kubectl(k8s_creds) 

109 

110 # CREATING RESOURCES IN K8S 

111 rbac_id = generate_rbac_id() 

112 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id) 

113 labels = {RBAC_STACK_PREFIX: rbac_id} 

114 

115 # Create cleanup dictionary to clean up created resources 

116 # if it fails in the middle of the process 

117 cleanup_data = [] 

118 try: 

119 self.log.debug("Initializing K8s cluster for juju") 

120 kubectl.create_cluster_role(name=metadata_name, labels=labels) 

121 self.log.debug("Cluster role created") 

122 cleanup_data.append( 

123 {"delete": kubectl.delete_cluster_role, "args": (metadata_name,)} 

124 ) 

125 

126 kubectl.create_service_account(name=metadata_name, labels=labels) 

127 self.log.debug("Service account created") 

128 cleanup_data.append( 

129 {"delete": kubectl.delete_service_account, "args": (metadata_name,)} 

130 ) 

131 

132 kubectl.create_cluster_role_binding(name=metadata_name, labels=labels) 

133 self.log.debug("Role binding created") 

134 cleanup_data.append( 

135 { 

136 "delete": kubectl.delete_cluster_role_binding, 

137 "args": (metadata_name,), 

138 } 

139 ) 

140 token, client_cert_data = await kubectl.get_secret_data(metadata_name) 

141 

142 default_storage_class = kubectl.get_default_storage_class() 

143 self.log.debug("Default storage class: {}".format(default_storage_class)) 

144 await libjuju.add_k8s( 

145 name=cluster_uuid, 

146 rbac_id=rbac_id, 

147 token=token, 

148 client_cert_data=client_cert_data, 

149 configuration=kubectl.configuration, 

150 storage_class=default_storage_class, 

151 credential_name=self._get_credential_name(cluster_uuid), 

152 ) 

153 self.log.debug("K8s cluster added to juju controller") 

154 return cluster_uuid, True 

155 except Exception as e: 

156 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True) 

157 if len(cleanup_data) > 0: 

158 self.log.debug("Cleaning up created resources in k8s cluster...") 

159 for item in cleanup_data: 

160 delete_function = item["delete"] 

161 delete_args = item["args"] 

162 delete_function(*delete_args) 

163 self.log.debug("Cleanup finished") 

164 raise e 

165 

166 """Repo Management""" 

167 

168 async def repo_add( 

169 self, 

170 name: str, 

171 url: str, 

172 _type: str = "charm", 

173 cert: str = None, 

174 user: str = None, 

175 password: str = None, 

176 ): 

177 raise MethodNotImplemented() 

178 

179 async def repo_list(self): 

180 raise MethodNotImplemented() 

181 

182 async def repo_remove(self, name: str): 

183 raise MethodNotImplemented() 

184 

185 async def synchronize_repos(self, cluster_uuid: str, name: str): 

186 """ 

187 Returns None as currently add_repo is not implemented 

188 """ 

189 return None 

190 

191 """Reset""" 

192 

193 async def reset( 

194 self, 

195 cluster_uuid: str, 

196 force: bool = False, 

197 uninstall_sw: bool = False, 

198 **kwargs, 

199 ) -> bool: 

200 """Reset a cluster 

201 

202 Resets the Kubernetes cluster by removing the model that represents it. 

203 

204 :param cluster_uuid str: The UUID of the cluster to reset 

205 :param force: Force reset 

206 :param uninstall_sw: Boolean to uninstall sw 

207 :param: kwargs: Additional parameters 

208 vca_id (str): VCA ID 

209 

210 :return: Returns True if successful or raises an exception. 

211 """ 

212 

213 try: 

214 self.log.debug("[reset] Removing k8s cloud") 

215 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

216 

217 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) 

218 

219 cloud_creds = await libjuju.get_cloud_credentials(cloud) 

220 

221 await libjuju.remove_cloud(cluster_uuid) 

222 

223 credentials = self.get_credentials(cluster_uuid=cluster_uuid) 

224 

225 kubectl = self._get_kubectl(credentials) 

226 

227 delete_functions = [ 

228 kubectl.delete_cluster_role_binding, 

229 kubectl.delete_service_account, 

230 kubectl.delete_cluster_role, 

231 ] 

232 

233 credential_attrs = cloud_creds[0].result["attrs"] 

234 if RBAC_LABEL_KEY_NAME in credential_attrs: 

235 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME] 

236 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id) 

237 for delete_func in delete_functions: 

238 try: 

239 delete_func(metadata_name) 

240 except Exception as e: 

241 self.log.warning("Cannot remove resource in K8s {}".format(e)) 

242 

243 except Exception as e: 

244 self.log.debug("Caught exception during reset: {}".format(e)) 

245 raise e 

246 return True 

247 

248 """Deployment""" 

249 

250 async def install( 

251 self, 

252 cluster_uuid: str, 

253 kdu_model: str, 

254 kdu_instance: str, 

255 atomic: bool = True, 

256 timeout: float = 1800, 

257 params: dict = None, 

258 db_dict: dict = None, 

259 kdu_name: str = None, 

260 namespace: str = None, 

261 **kwargs, 

262 ) -> bool: 

263 """Install a bundle 

264 

265 :param cluster_uuid str: The UUID of the cluster to install to 

266 :param kdu_model str: The name or path of a bundle to install 

267 :param kdu_instance: Kdu instance name 

268 :param atomic bool: If set, waits until the model is active and resets 

269 the cluster on failure. 

270 :param timeout int: The time, in seconds, to wait for the install 

271 to finish 

272 :param params dict: Key-value pairs of instantiation parameters 

273 :param kdu_name: Name of the KDU instance to be installed 

274 :param namespace: K8s namespace to use for the KDU instance 

275 :param kwargs: Additional parameters 

276 vca_id (str): VCA ID 

277 

278 :return: If successful, returns ? 

279 """ 

280 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

281 bundle = kdu_model 

282 

283 if not db_dict: 

284 raise K8sException("db_dict must be set") 

285 if not bundle: 

286 raise K8sException("bundle must be set") 

287 

288 if bundle.startswith("cs:"): 

289 # For Juju Bundles provided by the Charm Store 

290 pass 

291 elif bundle.startswith("ch:"): 

292 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9) 

293 pass 

294 elif bundle.startswith("http"): 

295 # Download the file 

296 pass 

297 else: 

298 new_workdir = kdu_model.strip(kdu_model.split("/")[-1]) 

299 os.chdir(new_workdir) 

300 bundle = "local:{}".format(kdu_model) 

301 

302 # default namespace to kdu_instance 

303 if not namespace: 

304 namespace = kdu_instance 

305 

306 self.log.debug("Checking for model named {}".format(namespace)) 

307 

308 # Create the new model 

309 self.log.debug("Adding model: {}".format(namespace)) 

310 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) 

311 await libjuju.add_model(namespace, cloud) 

312 

313 # if model: 

314 # TODO: Instantiation parameters 

315 

316 """ 

317 "Juju bundle that models the KDU, in any of the following ways: 

318 - <juju-repo>/<juju-bundle> 

319 - <juju-bundle folder under k8s_models folder in the package> 

320 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder 

321 in the package> 

322 - <URL_where_to_fetch_juju_bundle> 

323 """ 

324 try: 

325 previous_workdir = os.getcwd() 

326 except FileNotFoundError: 

327 previous_workdir = "/app/storage" 

328 

329 self.log.debug("[install] deploying {}".format(bundle)) 

330 instantiation_params = params.get("overlay") if params else None 

331 await libjuju.deploy( 

332 bundle, 

333 model_name=namespace, 

334 wait=atomic, 

335 timeout=timeout, 

336 instantiation_params=instantiation_params, 

337 ) 

338 os.chdir(previous_workdir) 

339 

340 # update information in the database (first, the VCA status, and then, the namespace) 

341 if self.on_update_db: 

342 await self.on_update_db( 

343 cluster_uuid, 

344 kdu_instance, 

345 filter=db_dict["filter"], 

346 vca_id=kwargs.get("vca_id"), 

347 ) 

348 

349 self.db.set_one( 

350 table="nsrs", 

351 q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance}, 

352 update_dict={"_admin.deployed.K8s.$.namespace": namespace}, 

353 ) 

354 

355 return True 

356 

357 async def scale( 

358 self, 

359 kdu_instance: str, 

360 scale: int, 

361 resource_name: str, 

362 total_timeout: float = 1800, 

363 namespace: str = None, 

364 **kwargs, 

365 ) -> bool: 

366 """Scale an application in a model 

367 

368 :param: kdu_instance str: KDU instance name 

369 :param: scale int: Scale to which to set the application 

370 :param: resource_name str: The application name in the Juju Bundle 

371 :param: timeout float: The time, in seconds, to wait for the install 

372 to finish 

373 :param namespace str: The namespace (model) where the Bundle was deployed 

374 :param kwargs: Additional parameters 

375 vca_id (str): VCA ID 

376 

377 :return: If successful, returns True 

378 """ 

379 

380 model_name = self._obtain_namespace( 

381 kdu_instance=kdu_instance, namespace=namespace 

382 ) 

383 try: 

384 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

385 await libjuju.scale_application( 

386 model_name=model_name, 

387 application_name=resource_name, 

388 scale=scale, 

389 total_timeout=total_timeout, 

390 ) 

391 except Exception as e: 

392 error_msg = "Error scaling application {} of the model {} of the kdu instance {}: {}".format( 

393 resource_name, model_name, kdu_instance, e 

394 ) 

395 self.log.error(error_msg) 

396 raise K8sException(message=error_msg) 

397 return True 

398 

399 async def get_scale_count( 

400 self, resource_name: str, kdu_instance: str, namespace: str = None, **kwargs 

401 ) -> int: 

402 """Get an application scale count 

403 

404 :param: resource_name str: The application name in the Juju Bundle 

405 :param: kdu_instance str: KDU instance name 

406 :param namespace str: The namespace (model) where the Bundle was deployed 

407 :param kwargs: Additional parameters 

408 vca_id (str): VCA ID 

409 :return: Return application instance count 

410 """ 

411 

412 model_name = self._obtain_namespace( 

413 kdu_instance=kdu_instance, namespace=namespace 

414 ) 

415 try: 

416 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

417 status = await libjuju.get_model_status(model_name=model_name) 

418 return len(status.applications[resource_name].units) 

419 except Exception as e: 

420 error_msg = ( 

421 f"Error getting scale count from application {resource_name} of the model {model_name} of " 

422 f"the kdu instance {kdu_instance}: {e}" 

423 ) 

424 self.log.error(error_msg) 

425 raise K8sException(message=error_msg) 

426 

427 async def instances_list(self, cluster_uuid: str) -> list: 

428 """ 

429 returns a list of deployed releases in a cluster 

430 

431 :param cluster_uuid: the cluster 

432 :return: 

433 """ 

434 return [] 

435 

436 async def upgrade( 

437 self, 

438 cluster_uuid: str, 

439 kdu_instance: str, 

440 kdu_model: str = None, 

441 params: dict = None, 

442 ) -> str: 

443 """Upgrade a model 

444 

445 :param cluster_uuid str: The UUID of the cluster to upgrade 

446 :param kdu_instance str: The unique name of the KDU instance 

447 :param kdu_model str: The name or path of the bundle to upgrade to 

448 :param params dict: Key-value pairs of instantiation parameters 

449 

450 :return: If successful, reference to the new revision number of the 

451 KDU instance. 

452 """ 

453 

454 # TODO: Loop through the bundle and upgrade each charm individually 

455 

456 """ 

457 The API doesn't have a concept of bundle upgrades, because there are 

458 many possible changes: charm revision, disk, number of units, etc. 

459 

460 As such, we are only supporting a limited subset of upgrades. We'll 

461 upgrade the charm revision but leave storage and scale untouched. 

462 

463 Scale changes should happen through OSM constructs, and changes to 

464 storage would require a redeployment of the service, at least in this 

465 initial release. 

466 """ 

467 raise MethodNotImplemented() 

468 

469 """Rollback""" 

470 

471 async def rollback( 

472 self, cluster_uuid: str, kdu_instance: str, revision: int = 0 

473 ) -> str: 

474 """Rollback a model 

475 

476 :param cluster_uuid str: The UUID of the cluster to rollback 

477 :param kdu_instance str: The unique name of the KDU instance 

478 :param revision int: The revision to revert to. If omitted, rolls back 

479 the previous upgrade. 

480 

481 :return: If successful, returns the revision of active KDU instance, 

482 or raises an exception 

483 """ 

484 raise MethodNotImplemented() 

485 

486 """Deletion""" 

487 

488 async def uninstall( 

489 self, cluster_uuid: str, kdu_instance: str, namespace: str = None, **kwargs 

490 ) -> bool: 

491 """Uninstall a KDU instance 

492 

493 :param cluster_uuid str: The UUID of the cluster 

494 :param kdu_instance str: The unique name of the KDU instance 

495 :param namespace str: The namespace (model) where the Bundle was deployed 

496 :param kwargs: Additional parameters 

497 vca_id (str): VCA ID 

498 

499 :return: Returns True if successful, or raises an exception 

500 """ 

501 model_name = self._obtain_namespace( 

502 kdu_instance=kdu_instance, namespace=namespace 

503 ) 

504 

505 self.log.debug(f"[uninstall] Destroying model: {model_name}") 

506 

507 will_not_delete = False 

508 if model_name not in self.uninstall_locks: 

509 self.uninstall_locks[model_name] = asyncio.Lock() 

510 delete_lock = self.uninstall_locks[model_name] 

511 

512 while delete_lock.locked(): 

513 will_not_delete = True 

514 await asyncio.sleep(0.1) 

515 

516 if will_not_delete: 

517 self.log.info("Model {} deleted by another worker.".format(model_name)) 

518 return True 

519 

520 try: 

521 async with delete_lock: 

522 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

523 

524 await libjuju.destroy_model(model_name, total_timeout=3600) 

525 finally: 

526 self.uninstall_locks.pop(model_name) 

527 

528 self.log.debug(f"[uninstall] Model {model_name} destroyed") 

529 return True 

530 

531 async def upgrade_charm( 

532 self, 

533 ee_id: str = None, 

534 path: str = None, 

535 charm_id: str = None, 

536 charm_type: str = None, 

537 timeout: float = None, 

538 ) -> str: 

539 """This method upgrade charms in VNFs 

540 

541 Args: 

542 ee_id: Execution environment id 

543 path: Local path to the charm 

544 charm_id: charm-id 

545 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm 

546 timeout: (Float) Timeout for the ns update operation 

547 

548 Returns: 

549 The output of the update operation if status equals to "completed" 

550 """ 

551 raise K8sException( 

552 "KDUs deployed with Juju Bundle do not support charm upgrade" 

553 ) 

554 

555 async def exec_primitive( 

556 self, 

557 cluster_uuid: str = None, 

558 kdu_instance: str = None, 

559 primitive_name: str = None, 

560 timeout: float = 300, 

561 params: dict = None, 

562 db_dict: dict = None, 

563 namespace: str = None, 

564 **kwargs, 

565 ) -> str: 

566 """Exec primitive (Juju action) 

567 

568 :param cluster_uuid str: The UUID of the cluster 

569 :param kdu_instance str: The unique name of the KDU instance 

570 :param primitive_name: Name of action that will be executed 

571 :param timeout: Timeout for action execution 

572 :param params: Dictionary of all the parameters needed for the action 

573 :param db_dict: Dictionary for any additional data 

574 :param namespace str: The namespace (model) where the Bundle was deployed 

575 :param kwargs: Additional parameters 

576 vca_id (str): VCA ID 

577 

578 :return: Returns the output of the action 

579 """ 

580 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

581 

582 namespace = self._obtain_namespace( 

583 kdu_instance=kdu_instance, namespace=namespace 

584 ) 

585 

586 if not params or "application-name" not in params: 

587 raise K8sException( 

588 "Missing application-name argument, \ 

589 argument needed for K8s actions" 

590 ) 

591 try: 

592 self.log.debug( 

593 "[exec_primitive] Getting model " 

594 "{} for the kdu_instance: {}".format(namespace, kdu_instance) 

595 ) 

596 application_name = params["application-name"] 

597 actions = await libjuju.get_actions( 

598 application_name=application_name, model_name=namespace 

599 ) 

600 if primitive_name not in actions: 

601 raise K8sException("Primitive {} not found".format(primitive_name)) 

602 output, status = await libjuju.execute_action( 

603 application_name=application_name, 

604 model_name=namespace, 

605 action_name=primitive_name, 

606 **params, 

607 ) 

608 

609 if status != "completed": 

610 raise K8sException( 

611 "status is not completed: {} output: {}".format(status, output) 

612 ) 

613 if self.on_update_db: 

614 await self.on_update_db( 

615 cluster_uuid=cluster_uuid, 

616 kdu_instance=kdu_instance, 

617 filter=db_dict["filter"], 

618 ) 

619 

620 return output 

621 

622 except Exception as e: 

623 error_msg = "Error executing primitive {}: {}".format(primitive_name, e) 

624 self.log.error(error_msg) 

625 raise K8sException(message=error_msg) 

626 

627 """Introspection""" 

628 

629 async def inspect_kdu(self, kdu_model: str) -> dict: 

630 """Inspect a KDU 

631 

632 Inspects a bundle and returns a dictionary of config parameters and 

633 their default values. 

634 

635 :param kdu_model str: The name or path of the bundle to inspect. 

636 

637 :return: If successful, returns a dictionary of available parameters 

638 and their default values. 

639 """ 

640 

641 kdu = {} 

642 if not os.path.exists(kdu_model): 

643 raise K8sException("file {} not found".format(kdu_model)) 

644 

645 with open(kdu_model, "r") as f: 

646 bundle = yaml.safe_load(f.read()) 

647 

648 """ 

649 { 

650 'description': 'Test bundle', 

651 'bundle': 'kubernetes', 

652 'applications': { 

653 'mariadb-k8s': { 

654 'charm': 'cs:~charmed-osm/mariadb-k8s-20', 

655 'scale': 1, 

656 'options': { 

657 'password': 'manopw', 

658 'root_password': 'osm4u', 

659 'user': 'mano' 

660 }, 

661 'series': 'kubernetes' 

662 } 

663 } 

664 } 

665 """ 

666 # TODO: This should be returned in an agreed-upon format 

667 kdu = bundle["applications"] 

668 

669 return kdu 

670 

671 async def help_kdu(self, kdu_model: str) -> str: 

672 """View the README 

673 

674 If available, returns the README of the bundle. 

675 

676 :param kdu_model str: The name or path of a bundle 

677 f 

678 :return: If found, returns the contents of the README. 

679 """ 

680 readme = None 

681 

682 files = ["README", "README.txt", "README.md"] 

683 path = os.path.dirname(kdu_model) 

684 for file in os.listdir(path): 

685 if file in files: 

686 with open(file, "r") as f: 

687 readme = f.read() 

688 break 

689 

690 return readme 

691 

692 async def status_kdu( 

693 self, 

694 cluster_uuid: str, 

695 kdu_instance: str, 

696 complete_status: bool = False, 

697 yaml_format: bool = False, 

698 namespace: str = None, 

699 **kwargs, 

700 ) -> Union[str, dict]: 

701 """Get the status of the KDU 

702 

703 Get the current status of the KDU instance. 

704 

705 :param cluster_uuid str: The UUID of the cluster 

706 :param kdu_instance str: The unique id of the KDU instance 

707 :param complete_status: To get the complete_status of the KDU 

708 :param yaml_format: To get the status in proper format for NSR record 

709 :param namespace str: The namespace (model) where the Bundle was deployed 

710 :param: kwargs: Additional parameters 

711 vca_id (str): VCA ID 

712 

713 :return: Returns a dictionary containing namespace, state, resources, 

714 and deployment_time and returns complete_status if complete_status is True 

715 """ 

716 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

717 status = {} 

718 

719 model_name = self._obtain_namespace( 

720 kdu_instance=kdu_instance, namespace=namespace 

721 ) 

722 model_status = await libjuju.get_model_status(model_name=model_name) 

723 

724 if not complete_status: 

725 for name in model_status.applications: 

726 application = model_status.applications[name] 

727 status[name] = {"status": application["status"]["status"]} 

728 else: 

729 if yaml_format: 

730 return obj_to_yaml(model_status) 

731 else: 

732 return obj_to_dict(model_status) 

733 

734 return status 

735 

736 async def add_relation( 

737 self, provider: RelationEndpoint, requirer: RelationEndpoint 

738 ): 

739 """ 

740 Add relation between two charmed endpoints 

741 

742 :param: provider: Provider relation endpoint 

743 :param: requirer: Requirer relation endpoint 

744 """ 

745 self.log.debug(f"adding new relation between {provider} and {requirer}") 

746 cross_model_relation = ( 

747 provider.model_name != requirer.model_name 

748 or provider.vca_id != requirer.vca_id 

749 ) 

750 try: 

751 if cross_model_relation: 

752 # Cross-model relation 

753 provider_libjuju = await self._get_libjuju(provider.vca_id) 

754 requirer_libjuju = await self._get_libjuju(requirer.vca_id) 

755 offer = await provider_libjuju.offer(provider) 

756 if offer: 

757 saas_name = await requirer_libjuju.consume( 

758 requirer.model_name, offer, provider_libjuju 

759 ) 

760 await requirer_libjuju.add_relation( 

761 requirer.model_name, requirer.endpoint, saas_name 

762 ) 

763 else: 

764 # Standard relation 

765 vca_id = provider.vca_id 

766 model = provider.model_name 

767 libjuju = await self._get_libjuju(vca_id) 

768 # add juju relations between two applications 

769 await libjuju.add_relation( 

770 model_name=model, 

771 endpoint_1=provider.endpoint, 

772 endpoint_2=requirer.endpoint, 

773 ) 

774 except Exception as e: 

775 message = f"Error adding relation between {provider} and {requirer}: {e}" 

776 self.log.error(message) 

777 raise Exception(message=message) 

778 

779 async def update_vca_status( 

780 self, vcastatus: dict, kdu_instance: str, namespace: str = None, **kwargs 

781 ): 

782 """ 

783 Add all configs, actions, executed actions of all applications in a model to vcastatus dict 

784 

785 :param vcastatus dict: dict containing vcastatus 

786 :param kdu_instance str: The unique id of the KDU instance 

787 :param namespace str: The namespace (model) where the Bundle was deployed 

788 :param: kwargs: Additional parameters 

789 vca_id (str): VCA ID 

790 

791 :return: None 

792 """ 

793 

794 model_name = self._obtain_namespace( 

795 kdu_instance=kdu_instance, namespace=namespace 

796 ) 

797 

798 libjuju = await self._get_libjuju(kwargs.get("vca_id")) 

799 try: 

800 for vca_model_name in vcastatus: 

801 # Adding executed actions 

802 vcastatus[vca_model_name][ 

803 "executedActions" 

804 ] = await libjuju.get_executed_actions(model_name=model_name) 

805 

806 for application in vcastatus[vca_model_name]["applications"]: 

807 # Adding application actions 

808 vcastatus[vca_model_name]["applications"][application][ 

809 "actions" 

810 ] = {} 

811 # Adding application configs 

812 vcastatus[vca_model_name]["applications"][application][ 

813 "configs" 

814 ] = await libjuju.get_application_configs( 

815 model_name=model_name, application_name=application 

816 ) 

817 

818 except Exception as e: 

819 self.log.debug("Error in updating vca status: {}".format(str(e))) 

820 

821 async def get_services( 

822 self, cluster_uuid: str, kdu_instance: str, namespace: str 

823 ) -> list: 

824 """Return a list of services of a kdu_instance""" 

825 

826 namespace = self._obtain_namespace( 

827 kdu_instance=kdu_instance, namespace=namespace 

828 ) 

829 

830 credentials = self.get_credentials(cluster_uuid=cluster_uuid) 

831 kubectl = self._get_kubectl(credentials) 

832 return kubectl.get_services( 

833 field_selector="metadata.namespace={}".format(namespace) 

834 ) 

835 

836 async def get_service( 

837 self, cluster_uuid: str, service_name: str, namespace: str 

838 ) -> object: 

839 """Return data for a specific service inside a namespace""" 

840 

841 credentials = self.get_credentials(cluster_uuid=cluster_uuid) 

842 kubectl = self._get_kubectl(credentials) 

843 return kubectl.get_services( 

844 field_selector="metadata.name={},metadata.namespace={}".format( 

845 service_name, namespace 

846 ) 

847 )[0] 

848 

849 def get_credentials(self, cluster_uuid: str) -> str: 

850 """ 

851 Get Cluster Kubeconfig 

852 """ 

853 k8scluster = self.db.get_one( 

854 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False 

855 ) 

856 

857 self.db.encrypt_decrypt_fields( 

858 k8scluster.get("credentials"), 

859 "decrypt", 

860 ["password", "secret"], 

861 schema_version=k8scluster["schema_version"], 

862 salt=k8scluster["_id"], 

863 ) 

864 

865 return yaml.safe_dump(k8scluster.get("credentials")) 

866 

867 def _get_credential_name(self, cluster_uuid: str) -> str: 

868 """ 

869 Get credential name for a k8s cloud 

870 

871 We cannot use the cluster_uuid for the credential name directly, 

872 because it cannot start with a number, it must start with a letter. 

873 Therefore, the k8s cloud credential name will be "cred-" followed 

874 by the cluster uuid. 

875 

876 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name) 

877 

878 :return: Name to use for the credential name. 

879 """ 

880 return "cred-{}".format(cluster_uuid) 

881 

882 def get_namespace(self, cluster_uuid: str) -> str: 

883 """Get the namespace UUID 

884 Gets the namespace's unique name 

885 

886 :param cluster_uuid str: The UUID of the cluster 

887 :returns: The namespace UUID, or raises an exception 

888 """ 

889 pass 

890 

891 @staticmethod 

892 def generate_kdu_instance_name(**kwargs): 

893 db_dict = kwargs.get("db_dict") 

894 kdu_name = kwargs.get("kdu_name", None) 

895 if kdu_name: 

896 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"]) 

897 else: 

898 kdu_instance = db_dict["filter"]["_id"] 

899 return kdu_instance 

900 

901 async def _get_libjuju(self, vca_id: str = None) -> Libjuju: 

902 """ 

903 Get libjuju object 

904 

905 :param: vca_id: VCA ID 

906 If None, get a libjuju object with a Connection to the default VCA 

907 Else, geta libjuju object with a Connection to the specified VCA 

908 """ 

909 if not vca_id: 

910 while self.loading_libjuju.locked(): 

911 await asyncio.sleep(0.1) 

912 if not self.libjuju: 

913 async with self.loading_libjuju: 

914 vca_connection = await get_connection(self._store) 

915 self.libjuju = Libjuju(vca_connection, log=self.log) 

916 return self.libjuju 

917 else: 

918 vca_connection = await get_connection(self._store, vca_id) 

919 return Libjuju(vca_connection, log=self.log, n2vc=self) 

920 

921 def _get_kubectl(self, credentials: str) -> Kubectl: 

922 """ 

923 Get Kubectl object 

924 

925 :param: kubeconfig_credentials: Kubeconfig credentials 

926 """ 

927 kubecfg = tempfile.NamedTemporaryFile() 

928 with open(kubecfg.name, "w") as kubecfg_file: 

929 kubecfg_file.write(credentials) 

930 return Kubectl(config_file=kubecfg.name) 

931 

932 def _obtain_namespace(self, kdu_instance: str, namespace: str = None) -> str: 

933 """ 

934 Obtain the namespace/model name to use in the instantiation of a Juju Bundle in K8s. The default namespace is 

935 the kdu_instance name. However, if the user passes the namespace where he wants to deploy the bundle, 

936 that namespace will be used. 

937 

938 :param kdu_instance: the default KDU instance name 

939 :param namespace: the namespace passed by the User 

940 """ 

941 

942 # deault the namespace/model name to the kdu_instance name TODO -> this should be the real return... But 

943 # once the namespace is not passed in most methods, I had to do this in another way. But I think this should 

944 # be the procedure in the future return namespace if namespace else kdu_instance 

945 

946 # TODO -> has referred above, this should be avoided in the future, this is temporary, in order to avoid 

947 # compatibility issues 

948 return ( 

949 namespace 

950 if namespace 

951 else self._obtain_namespace_from_db(kdu_instance=kdu_instance) 

952 ) 

953 

954 def _obtain_namespace_from_db(self, kdu_instance: str) -> str: 

955 db_nsrs = self.db.get_one( 

956 table="nsrs", q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance} 

957 ) 

958 for k8s in db_nsrs["_admin"]["deployed"]["K8s"]: 

959 if k8s.get("kdu-instance") == kdu_instance: 

960 return k8s.get("namespace") 

961 return ""