e934fd0e8f1d53f65ee2845b7bb88d3de1b8d8a1
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import os
18 import typing
19 import yaml
20
21 import time
22
23 import juju.errors
24 from juju.bundle import BundleHandler
25 from juju.model import Model
26 from juju.machine import Machine
27 from juju.application import Application
28 from juju.unit import Unit
29 from juju.url import URL
30 from juju.version import DEFAULT_ARCHITECTURE
31 from juju.client._definitions import (
32 FullStatus,
33 QueryApplicationOffersResults,
34 Cloud,
35 CloudCredential,
36 )
37 from juju.controller import Controller
38 from juju.client import client
39 from juju import tag
40
41 from n2vc.definitions import Offer, RelationEndpoint
42 from n2vc.juju_watcher import JujuModelWatcher
43 from n2vc.provisioner import AsyncSSHProvisioner
44 from n2vc.n2vc_conn import N2VCConnector
45 from n2vc.exceptions import (
46 JujuMachineNotFound,
47 JujuApplicationNotFound,
48 JujuLeaderUnitNotFound,
49 JujuActionNotFound,
50 JujuControllerFailedConnecting,
51 JujuApplicationExists,
52 JujuInvalidK8sConfiguration,
53 JujuError,
54 )
55 from n2vc.vca.cloud import Cloud as VcaCloud
56 from n2vc.vca.connection import Connection
57 from kubernetes.client.configuration import Configuration
58 from retrying_async import retry
59
60
61 RBAC_LABEL_KEY_NAME = "rbac-id"
62
63
64 class Libjuju:
65 def __init__(
66 self,
67 vca_connection: Connection,
68 log: logging.Logger = None,
69 n2vc: N2VCConnector = None,
70 ):
71 """
72 Constructor
73
74 :param: vca_connection: n2vc.vca.connection object
75 :param: log: Logger
76 :param: n2vc: N2VC object
77 """
78
79 self.log = log or logging.getLogger("Libjuju")
80 self.n2vc = n2vc
81 self.vca_connection = vca_connection
82
83 self.creating_model = asyncio.Lock()
84
85 if self.vca_connection.is_default:
86 self.health_check_task = self._create_health_check_task()
87
88 def _create_health_check_task(self):
89 return asyncio.get_event_loop().create_task(self.health_check())
90
91 async def get_controller(self, timeout: float = 60.0) -> Controller:
92 """
93 Get controller
94
95 :param: timeout: Time in seconds to wait for controller to connect
96 """
97 controller = None
98 try:
99 controller = Controller()
100 await asyncio.wait_for(
101 controller.connect(
102 endpoint=self.vca_connection.data.endpoints,
103 username=self.vca_connection.data.user,
104 password=self.vca_connection.data.secret,
105 cacert=self.vca_connection.data.cacert,
106 ),
107 timeout=timeout,
108 )
109 if self.vca_connection.is_default:
110 endpoints = await controller.api_endpoints
111 if not all(
112 endpoint in self.vca_connection.endpoints for endpoint in endpoints
113 ):
114 await self.vca_connection.update_endpoints(endpoints)
115 return controller
116 except asyncio.CancelledError as e:
117 raise e
118 except Exception as e:
119 self.log.error(
120 "Failed connecting to controller: {}... {}".format(
121 self.vca_connection.data.endpoints, e
122 )
123 )
124 if controller:
125 await self.disconnect_controller(controller)
126
127 raise JujuControllerFailedConnecting(
128 f"Error connecting to Juju controller: {e}"
129 )
130
131 async def disconnect(self):
132 """Disconnect"""
133 # Cancel health check task
134 self.health_check_task.cancel()
135 self.log.debug("Libjuju disconnected!")
136
137 async def disconnect_model(self, model: Model):
138 """
139 Disconnect model
140
141 :param: model: Model that will be disconnected
142 """
143 await model.disconnect()
144
145 async def disconnect_controller(self, controller: Controller):
146 """
147 Disconnect controller
148
149 :param: controller: Controller that will be disconnected
150 """
151 if controller:
152 await controller.disconnect()
153
154 @retry(attempts=3, delay=5, timeout=None)
155 async def add_model(self, model_name: str, cloud: VcaCloud):
156 """
157 Create model
158
159 :param: model_name: Model name
160 :param: cloud: Cloud object
161 """
162
163 # Get controller
164 controller = await self.get_controller()
165 model = None
166 try:
167 # Block until other workers have finished model creation
168 while self.creating_model.locked():
169 await asyncio.sleep(0.1)
170
171 # Create the model
172 async with self.creating_model:
173 if await self.model_exists(model_name, controller=controller):
174 return
175 self.log.debug("Creating model {}".format(model_name))
176 model = await controller.add_model(
177 model_name,
178 config=self.vca_connection.data.model_config,
179 cloud_name=cloud.name,
180 credential_name=cloud.credential_name,
181 )
182 except juju.errors.JujuAPIError as e:
183 if "already exists" in e.message:
184 pass
185 else:
186 raise e
187 finally:
188 if model:
189 await self.disconnect_model(model)
190 await self.disconnect_controller(controller)
191
192 async def get_executed_actions(self, model_name: str) -> list:
193 """
194 Get executed/history of actions for a model.
195
196 :param: model_name: Model name, str.
197 :return: List of executed actions for a model.
198 """
199 model = None
200 executed_actions = []
201 controller = await self.get_controller()
202 try:
203 model = await self.get_model(controller, model_name)
204 # Get all unique action names
205 actions = {}
206 for application in model.applications:
207 application_actions = await self.get_actions(application, model_name)
208 actions.update(application_actions)
209 # Get status of all actions
210 for application_action in actions:
211 app_action_status_list = await model.get_action_status(
212 name=application_action
213 )
214 for action_id, action_status in app_action_status_list.items():
215 executed_action = {
216 "id": action_id,
217 "action": application_action,
218 "status": action_status,
219 }
220 # Get action output by id
221 action_status = await model.get_action_output(executed_action["id"])
222 for k, v in action_status.items():
223 executed_action[k] = v
224 executed_actions.append(executed_action)
225 except Exception as e:
226 raise JujuError(
227 "Error in getting executed actions for model: {}. Error: {}".format(
228 model_name, str(e)
229 )
230 )
231 finally:
232 if model:
233 await self.disconnect_model(model)
234 await self.disconnect_controller(controller)
235 return executed_actions
236
237 async def get_application_configs(
238 self, model_name: str, application_name: str
239 ) -> dict:
240 """
241 Get available configs for an application.
242
243 :param: model_name: Model name, str.
244 :param: application_name: Application name, str.
245
246 :return: A dict which has key - action name, value - action description
247 """
248 model = None
249 application_configs = {}
250 controller = await self.get_controller()
251 try:
252 model = await self.get_model(controller, model_name)
253 application = self._get_application(
254 model, application_name=application_name
255 )
256 application_configs = await application.get_config()
257 except Exception as e:
258 raise JujuError(
259 "Error in getting configs for application: {} in model: {}. Error: {}".format(
260 application_name, model_name, str(e)
261 )
262 )
263 finally:
264 if model:
265 await self.disconnect_model(model)
266 await self.disconnect_controller(controller)
267 return application_configs
268
269 @retry(attempts=3, delay=5)
270 async def get_model(self, controller: Controller, model_name: str) -> Model:
271 """
272 Get model from controller
273
274 :param: controller: Controller
275 :param: model_name: Model name
276
277 :return: Model: The created Juju model object
278 """
279 return await controller.get_model(model_name)
280
281 async def model_exists(
282 self, model_name: str, controller: Controller = None
283 ) -> bool:
284 """
285 Check if model exists
286
287 :param: controller: Controller
288 :param: model_name: Model name
289
290 :return bool
291 """
292 need_to_disconnect = False
293
294 # Get controller if not passed
295 if not controller:
296 controller = await self.get_controller()
297 need_to_disconnect = True
298
299 # Check if model exists
300 try:
301 return model_name in await controller.list_models()
302 finally:
303 if need_to_disconnect:
304 await self.disconnect_controller(controller)
305
306 async def models_exist(self, model_names: [str]) -> (bool, list):
307 """
308 Check if models exists
309
310 :param: model_names: List of strings with model names
311
312 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
313 """
314 if not model_names:
315 raise Exception(
316 "model_names must be a non-empty array. Given value: {}".format(
317 model_names
318 )
319 )
320 non_existing_models = []
321 models = await self.list_models()
322 existing_models = list(set(models).intersection(model_names))
323 non_existing_models = list(set(model_names) - set(existing_models))
324
325 return (
326 len(non_existing_models) == 0,
327 non_existing_models,
328 )
329
330 async def get_model_status(self, model_name: str) -> FullStatus:
331 """
332 Get model status
333
334 :param: model_name: Model name
335
336 :return: Full status object
337 """
338 controller = await self.get_controller()
339 model = await self.get_model(controller, model_name)
340 try:
341 return await model.get_status()
342 finally:
343 await self.disconnect_model(model)
344 await self.disconnect_controller(controller)
345
346 async def create_machine(
347 self,
348 model_name: str,
349 machine_id: str = None,
350 db_dict: dict = None,
351 progress_timeout: float = None,
352 total_timeout: float = None,
353 series: str = "bionic",
354 wait: bool = True,
355 ) -> (Machine, bool):
356 """
357 Create machine
358
359 :param: model_name: Model name
360 :param: machine_id: Machine id
361 :param: db_dict: Dictionary with data of the DB to write the updates
362 :param: progress_timeout: Maximum time between two updates in the model
363 :param: total_timeout: Timeout for the entity to be active
364 :param: series: Series of the machine (xenial, bionic, focal, ...)
365 :param: wait: Wait until machine is ready
366
367 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
368 if the machine is new or it already existed
369 """
370 new = False
371 machine = None
372
373 self.log.debug(
374 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
375 )
376
377 # Get controller
378 controller = await self.get_controller()
379
380 # Get model
381 model = await self.get_model(controller, model_name)
382 try:
383 if machine_id is not None:
384 self.log.debug(
385 "Searching machine (id={}) in model {}".format(
386 machine_id, model_name
387 )
388 )
389
390 # Get machines from model and get the machine with machine_id if exists
391 machines = await model.get_machines()
392 if machine_id in machines:
393 self.log.debug(
394 "Machine (id={}) found in model {}".format(
395 machine_id, model_name
396 )
397 )
398 machine = machines[machine_id]
399 else:
400 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
401
402 if machine is None:
403 self.log.debug("Creating a new machine in model {}".format(model_name))
404
405 # Create machine
406 machine = await model.add_machine(
407 spec=None, constraints=None, disks=None, series=series
408 )
409 new = True
410
411 # Wait until the machine is ready
412 self.log.debug(
413 "Wait until machine {} is ready in model {}".format(
414 machine.entity_id, model_name
415 )
416 )
417 if wait:
418 await JujuModelWatcher.wait_for(
419 model=model,
420 entity=machine,
421 progress_timeout=progress_timeout,
422 total_timeout=total_timeout,
423 db_dict=db_dict,
424 n2vc=self.n2vc,
425 vca_id=self.vca_connection._vca_id,
426 )
427 finally:
428 await self.disconnect_model(model)
429 await self.disconnect_controller(controller)
430
431 self.log.debug(
432 "Machine {} ready at {} in model {}".format(
433 machine.entity_id, machine.dns_name, model_name
434 )
435 )
436 return machine, new
437
438 async def provision_machine(
439 self,
440 model_name: str,
441 hostname: str,
442 username: str,
443 private_key_path: str,
444 db_dict: dict = None,
445 progress_timeout: float = None,
446 total_timeout: float = None,
447 ) -> str:
448 """
449 Manually provisioning of a machine
450
451 :param: model_name: Model name
452 :param: hostname: IP to access the machine
453 :param: username: Username to login to the machine
454 :param: private_key_path: Local path for the private key
455 :param: db_dict: Dictionary with data of the DB to write the updates
456 :param: progress_timeout: Maximum time between two updates in the model
457 :param: total_timeout: Timeout for the entity to be active
458
459 :return: (Entity): Machine id
460 """
461 self.log.debug(
462 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
463 model_name, hostname, username
464 )
465 )
466
467 # Get controller
468 controller = await self.get_controller()
469
470 # Get model
471 model = await self.get_model(controller, model_name)
472
473 try:
474 # Get provisioner
475 provisioner = AsyncSSHProvisioner(
476 host=hostname,
477 user=username,
478 private_key_path=private_key_path,
479 log=self.log,
480 )
481
482 # Provision machine
483 params = await provisioner.provision_machine()
484
485 params.jobs = ["JobHostUnits"]
486
487 self.log.debug("Adding machine to model")
488 connection = model.connection()
489 client_facade = client.ClientFacade.from_connection(connection)
490
491 results = await client_facade.AddMachines(params=[params])
492 error = results.machines[0].error
493
494 if error:
495 msg = "Error adding machine: {}".format(error.message)
496 self.log.error(msg=msg)
497 raise ValueError(msg)
498
499 machine_id = results.machines[0].machine
500
501 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
502 asyncio.ensure_future(
503 provisioner.install_agent(
504 connection=connection,
505 nonce=params.nonce,
506 machine_id=machine_id,
507 proxy=self.vca_connection.data.api_proxy,
508 series=params.series,
509 )
510 )
511
512 machine = None
513 for _ in range(10):
514 machine_list = await model.get_machines()
515 if machine_id in machine_list:
516 self.log.debug("Machine {} found in model!".format(machine_id))
517 machine = model.machines.get(machine_id)
518 break
519 await asyncio.sleep(2)
520
521 if machine is None:
522 msg = "Machine {} not found in model".format(machine_id)
523 self.log.error(msg=msg)
524 raise JujuMachineNotFound(msg)
525
526 self.log.debug(
527 "Wait until machine {} is ready in model {}".format(
528 machine.entity_id, model_name
529 )
530 )
531 await JujuModelWatcher.wait_for(
532 model=model,
533 entity=machine,
534 progress_timeout=progress_timeout,
535 total_timeout=total_timeout,
536 db_dict=db_dict,
537 n2vc=self.n2vc,
538 vca_id=self.vca_connection._vca_id,
539 )
540 except Exception as e:
541 raise e
542 finally:
543 await self.disconnect_model(model)
544 await self.disconnect_controller(controller)
545
546 self.log.debug(
547 "Machine provisioned {} in model {}".format(machine_id, model_name)
548 )
549
550 return machine_id
551
552 async def deploy(
553 self,
554 uri: str,
555 model_name: str,
556 wait: bool = True,
557 timeout: float = 3600,
558 instantiation_params: dict = None,
559 ):
560 """
561 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
562
563 :param uri: Path or Charm Store uri in which the charm or bundle can be found
564 :param model_name: Model name
565 :param wait: Indicates whether to wait or not until all applications are active
566 :param timeout: Time in seconds to wait until all applications are active
567 :param instantiation_params: To be applied as overlay bundle over primary bundle.
568 """
569 controller = await self.get_controller()
570 model = await self.get_model(controller, model_name)
571 overlays = []
572 try:
573 await self._validate_instantiation_params(uri, model, instantiation_params)
574 overlays = self._get_overlays(model_name, instantiation_params)
575 await model.deploy(uri, trust=True, overlays=overlays)
576 if wait:
577 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
578 self.log.debug("All units active in model {}".format(model_name))
579 finally:
580 self._remove_overlay_file(overlays)
581 await self.disconnect_model(model)
582 await self.disconnect_controller(controller)
583
584 async def _validate_instantiation_params(
585 self, uri: str, model, instantiation_params: dict
586 ) -> None:
587 """Checks if all the applications in instantiation_params
588 exist ins the original bundle.
589
590 Raises:
591 JujuApplicationNotFound if there is an invalid app in
592 the instantiation params.
593 """
594 overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
595 if not overlay_apps:
596 return
597 original_apps = await self._get_apps_in_original_bundle(uri, model)
598 if not all(app in original_apps for app in overlay_apps):
599 raise JujuApplicationNotFound(
600 "Cannot find application {} in original bundle {}".format(
601 overlay_apps, original_apps
602 )
603 )
604
605 async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
606 """Bundle is downloaded in BundleHandler.fetch_plan.
607 That method takes care of opening and exception handling.
608
609 Resolve method gets all the information regarding the channel,
610 track, revision, type, source.
611
612 Returns:
613 Set with the names of the applications in original bundle.
614 """
615 url = URL.parse(uri)
616 architecture = DEFAULT_ARCHITECTURE # only AMD64 is allowed
617 res = await model.deploy_types[str(url.schema)].resolve(
618 url, architecture, entity_url=uri
619 )
620 handler = BundleHandler(model, trusted=True, forced=False)
621 await handler.fetch_plan(url, res.origin)
622 return handler.applications
623
624 def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
625 """Extract applications key in instantiation params.
626
627 Returns:
628 List with the names of the applications in instantiation params.
629
630 Raises:
631 JujuError if applications key is not found.
632 """
633 if not instantiation_params:
634 return []
635 try:
636 return [key for key in instantiation_params.get("applications")]
637 except Exception as e:
638 raise JujuError("Invalid overlay format. {}".format(str(e)))
639
640 def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
641 """Creates a temporary overlay file which includes the instantiation params.
642 Only one overlay file is created.
643
644 Returns:
645 List with one overlay filename. Empty list if there are no instantiation params.
646 """
647 if not instantiation_params:
648 return []
649 file_name = model_name + "-overlay.yaml"
650 self._write_overlay_file(file_name, instantiation_params)
651 return [file_name]
652
653 def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
654 with open(file_name, "w") as file:
655 yaml.dump(instantiation_params, file)
656
657 def _remove_overlay_file(self, overlay: list) -> None:
658 """Overlay contains either one or zero file names."""
659 if not overlay:
660 return
661 try:
662 filename = overlay[0]
663 os.remove(filename)
664 except OSError as e:
665 self.log.warning(
666 "Overlay file {} could not be removed: {}".format(filename, e)
667 )
668
669 async def add_unit(
670 self,
671 application_name: str,
672 model_name: str,
673 machine_id: str,
674 db_dict: dict = None,
675 progress_timeout: float = None,
676 total_timeout: float = None,
677 ):
678 """Add unit
679
680 :param: application_name: Application name
681 :param: model_name: Model name
682 :param: machine_id Machine id
683 :param: db_dict: Dictionary with data of the DB to write the updates
684 :param: progress_timeout: Maximum time between two updates in the model
685 :param: total_timeout: Timeout for the entity to be active
686
687 :return: None
688 """
689
690 model = None
691 controller = await self.get_controller()
692 try:
693 model = await self.get_model(controller, model_name)
694 application = self._get_application(model, application_name)
695
696 if application is not None:
697 # Checks if the given machine id in the model,
698 # otherwise function raises an error
699 _machine, _series = self._get_machine_info(model, machine_id)
700
701 self.log.debug(
702 "Adding unit (machine {}) to application {} in model ~{}".format(
703 machine_id, application_name, model_name
704 )
705 )
706
707 await application.add_unit(to=machine_id)
708
709 await JujuModelWatcher.wait_for(
710 model=model,
711 entity=application,
712 progress_timeout=progress_timeout,
713 total_timeout=total_timeout,
714 db_dict=db_dict,
715 n2vc=self.n2vc,
716 vca_id=self.vca_connection._vca_id,
717 )
718 self.log.debug(
719 "Unit is added to application {} in model {}".format(
720 application_name, model_name
721 )
722 )
723 else:
724 raise JujuApplicationNotFound(
725 "Application {} not exists".format(application_name)
726 )
727 finally:
728 if model:
729 await self.disconnect_model(model)
730 await self.disconnect_controller(controller)
731
732 async def destroy_unit(
733 self,
734 application_name: str,
735 model_name: str,
736 machine_id: str,
737 total_timeout: float = None,
738 ):
739 """Destroy unit
740
741 :param: application_name: Application name
742 :param: model_name: Model name
743 :param: machine_id Machine id
744 :param: total_timeout: Timeout for the entity to be active
745
746 :return: None
747 """
748
749 model = None
750 controller = await self.get_controller()
751 try:
752 model = await self.get_model(controller, model_name)
753 application = self._get_application(model, application_name)
754
755 if application is None:
756 raise JujuApplicationNotFound(
757 "Application not found: {} (model={})".format(
758 application_name, model_name
759 )
760 )
761
762 unit = self._get_unit(application, machine_id)
763 if not unit:
764 raise JujuError(
765 "A unit with machine id {} not in available units".format(
766 machine_id
767 )
768 )
769
770 unit_name = unit.name
771
772 self.log.debug(
773 "Destroying unit {} from application {} in model {}".format(
774 unit_name, application_name, model_name
775 )
776 )
777 await application.destroy_unit(unit_name)
778
779 self.log.debug(
780 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
781 unit_name, application_name, model_name
782 )
783 )
784
785 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
786 if total_timeout is None:
787 total_timeout = 3600
788 end = time.time() + total_timeout
789 while time.time() < end:
790 if not self._get_unit(application, machine_id):
791 self.log.debug(
792 "The unit {} was destroyed in application {} (model={}) ".format(
793 unit_name, application_name, model_name
794 )
795 )
796 return
797 await asyncio.sleep(5)
798 self.log.debug(
799 "Unit {} is destroyed from application {} in model {}".format(
800 unit_name, application_name, model_name
801 )
802 )
803 finally:
804 if model:
805 await self.disconnect_model(model)
806 await self.disconnect_controller(controller)
807
808 async def deploy_charm(
809 self,
810 application_name: str,
811 path: str,
812 model_name: str,
813 machine_id: str,
814 db_dict: dict = None,
815 progress_timeout: float = None,
816 total_timeout: float = None,
817 config: dict = None,
818 series: str = None,
819 num_units: int = 1,
820 ):
821 """Deploy charm
822
823 :param: application_name: Application name
824 :param: path: Local path to the charm
825 :param: model_name: Model name
826 :param: machine_id ID of the machine
827 :param: db_dict: Dictionary with data of the DB to write the updates
828 :param: progress_timeout: Maximum time between two updates in the model
829 :param: total_timeout: Timeout for the entity to be active
830 :param: config: Config for the charm
831 :param: series: Series of the charm
832 :param: num_units: Number of units
833
834 :return: (juju.application.Application): Juju application
835 """
836 self.log.debug(
837 "Deploying charm {} to machine {} in model ~{}".format(
838 application_name, machine_id, model_name
839 )
840 )
841 self.log.debug("charm: {}".format(path))
842
843 # Get controller
844 controller = await self.get_controller()
845
846 # Get model
847 model = await self.get_model(controller, model_name)
848
849 try:
850 if application_name not in model.applications:
851 if machine_id is not None:
852 machine, series = self._get_machine_info(model, machine_id)
853
854 application = await model.deploy(
855 entity_url=path,
856 application_name=application_name,
857 channel="stable",
858 num_units=1,
859 series=series,
860 to=machine_id,
861 config=config,
862 )
863
864 self.log.debug(
865 "Wait until application {} is ready in model {}".format(
866 application_name, model_name
867 )
868 )
869 if num_units > 1:
870 for _ in range(num_units - 1):
871 m, _ = await self.create_machine(model_name, wait=False)
872 await application.add_unit(to=m.entity_id)
873
874 await JujuModelWatcher.wait_for(
875 model=model,
876 entity=application,
877 progress_timeout=progress_timeout,
878 total_timeout=total_timeout,
879 db_dict=db_dict,
880 n2vc=self.n2vc,
881 vca_id=self.vca_connection._vca_id,
882 )
883 self.log.debug(
884 "Application {} is ready in model {}".format(
885 application_name, model_name
886 )
887 )
888 else:
889 raise JujuApplicationExists(
890 "Application {} exists".format(application_name)
891 )
892 except juju.errors.JujuError as e:
893 if "already exists" in e.message:
894 raise JujuApplicationExists(
895 "Application {} exists".format(application_name)
896 )
897 else:
898 raise e
899 finally:
900 await self.disconnect_model(model)
901 await self.disconnect_controller(controller)
902
903 return application
904
905 async def upgrade_charm(
906 self,
907 application_name: str,
908 path: str,
909 model_name: str,
910 total_timeout: float = None,
911 **kwargs,
912 ):
913 """Upgrade Charm
914
915 :param: application_name: Application name
916 :param: model_name: Model name
917 :param: path: Local path to the charm
918 :param: total_timeout: Timeout for the entity to be active
919
920 :return: (str, str): (output and status)
921 """
922
923 self.log.debug(
924 "Upgrading charm {} in model {} from path {}".format(
925 application_name, model_name, path
926 )
927 )
928
929 await self.resolve_application(
930 model_name=model_name, application_name=application_name
931 )
932
933 # Get controller
934 controller = await self.get_controller()
935
936 # Get model
937 model = await self.get_model(controller, model_name)
938
939 try:
940 # Get application
941 application = self._get_application(
942 model,
943 application_name=application_name,
944 )
945 if application is None:
946 raise JujuApplicationNotFound(
947 "Cannot find application {} to upgrade".format(application_name)
948 )
949
950 await application.refresh(path=path)
951
952 self.log.debug(
953 "Wait until charm upgrade is completed for application {} (model={})".format(
954 application_name, model_name
955 )
956 )
957
958 await JujuModelWatcher.ensure_units_idle(
959 model=model, application=application
960 )
961
962 if application.status == "error":
963 error_message = "Unknown"
964 for unit in application.units:
965 if (
966 unit.workload_status == "error"
967 and unit.workload_status_message != ""
968 ):
969 error_message = unit.workload_status_message
970
971 message = "Application {} failed update in {}: {}".format(
972 application_name, model_name, error_message
973 )
974 self.log.error(message)
975 raise JujuError(message=message)
976
977 self.log.debug(
978 "Application {} is ready in model {}".format(
979 application_name, model_name
980 )
981 )
982
983 finally:
984 await self.disconnect_model(model)
985 await self.disconnect_controller(controller)
986
987 return application
988
989 async def resolve_application(self, model_name: str, application_name: str):
990 controller = await self.get_controller()
991 model = await self.get_model(controller, model_name)
992
993 try:
994 application = self._get_application(
995 model,
996 application_name=application_name,
997 )
998 if application is None:
999 raise JujuApplicationNotFound(
1000 "Cannot find application {} to resolve".format(application_name)
1001 )
1002
1003 while application.status == "error":
1004 for unit in application.units:
1005 if unit.workload_status == "error":
1006 self.log.debug(
1007 "Model {}, Application {}, Unit {} in error state, resolving".format(
1008 model_name, application_name, unit.entity_id
1009 )
1010 )
1011 try:
1012 await unit.resolved(retry=False)
1013 except Exception:
1014 pass
1015
1016 await asyncio.sleep(1)
1017
1018 finally:
1019 await self.disconnect_model(model)
1020 await self.disconnect_controller(controller)
1021
1022 async def resolve(self, model_name: str):
1023 controller = await self.get_controller()
1024 model = await self.get_model(controller, model_name)
1025 all_units_active = False
1026 try:
1027 while not all_units_active:
1028 all_units_active = True
1029 for application_name, application in model.applications.items():
1030 if application.status == "error":
1031 for unit in application.units:
1032 if unit.workload_status == "error":
1033 self.log.debug(
1034 "Model {}, Application {}, Unit {} in error state, resolving".format(
1035 model_name, application_name, unit.entity_id
1036 )
1037 )
1038 try:
1039 await unit.resolved(retry=False)
1040 all_units_active = False
1041 except Exception:
1042 pass
1043
1044 if not all_units_active:
1045 await asyncio.sleep(5)
1046 finally:
1047 await self.disconnect_model(model)
1048 await self.disconnect_controller(controller)
1049
1050 async def scale_application(
1051 self,
1052 model_name: str,
1053 application_name: str,
1054 scale: int = 1,
1055 total_timeout: float = None,
1056 ):
1057 """
1058 Scale application (K8s)
1059
1060 :param: model_name: Model name
1061 :param: application_name: Application name
1062 :param: scale: Scale to which to set this application
1063 :param: total_timeout: Timeout for the entity to be active
1064 """
1065
1066 model = None
1067 controller = await self.get_controller()
1068 try:
1069 model = await self.get_model(controller, model_name)
1070
1071 self.log.debug(
1072 "Scaling application {} in model {}".format(
1073 application_name, model_name
1074 )
1075 )
1076 application = self._get_application(model, application_name)
1077 if application is None:
1078 raise JujuApplicationNotFound("Cannot scale application")
1079 await application.scale(scale=scale)
1080 # Wait until application is scaled in model
1081 self.log.debug(
1082 "Waiting for application {} to be scaled in model {}...".format(
1083 application_name, model_name
1084 )
1085 )
1086 if total_timeout is None:
1087 total_timeout = 1800
1088 end = time.time() + total_timeout
1089 while time.time() < end:
1090 application_scale = self._get_application_count(model, application_name)
1091 # Before calling wait_for_model function,
1092 # wait until application unit count and scale count are equal.
1093 # Because there is a delay before scaling triggers in Juju model.
1094 if application_scale == scale:
1095 await JujuModelWatcher.wait_for_model(
1096 model=model, timeout=total_timeout
1097 )
1098 self.log.debug(
1099 "Application {} is scaled in model {}".format(
1100 application_name, model_name
1101 )
1102 )
1103 return
1104 await asyncio.sleep(5)
1105 raise Exception(
1106 "Timeout waiting for application {} in model {} to be scaled".format(
1107 application_name, model_name
1108 )
1109 )
1110 finally:
1111 if model:
1112 await self.disconnect_model(model)
1113 await self.disconnect_controller(controller)
1114
1115 def _get_application_count(self, model: Model, application_name: str) -> int:
1116 """Get number of units of the application
1117
1118 :param: model: Model object
1119 :param: application_name: Application name
1120
1121 :return: int (or None if application doesn't exist)
1122 """
1123 application = self._get_application(model, application_name)
1124 if application is not None:
1125 return len(application.units)
1126
1127 def _get_application(self, model: Model, application_name: str) -> Application:
1128 """Get application
1129
1130 :param: model: Model object
1131 :param: application_name: Application name
1132
1133 :return: juju.application.Application (or None if it doesn't exist)
1134 """
1135 if model.applications and application_name in model.applications:
1136 return model.applications[application_name]
1137
1138 def _get_unit(self, application: Application, machine_id: str) -> Unit:
1139 """Get unit
1140
1141 :param: application: Application object
1142 :param: machine_id: Machine id
1143
1144 :return: Unit
1145 """
1146 unit = None
1147 for u in application.units:
1148 if u.machine_id == machine_id:
1149 unit = u
1150 break
1151 return unit
1152
1153 def _get_machine_info(
1154 self,
1155 model,
1156 machine_id: str,
1157 ) -> (str, str):
1158 """Get machine info
1159
1160 :param: model: Model object
1161 :param: machine_id: Machine id
1162
1163 :return: (str, str): (machine, series)
1164 """
1165 if machine_id not in model.machines:
1166 msg = "Machine {} not found in model".format(machine_id)
1167 self.log.error(msg=msg)
1168 raise JujuMachineNotFound(msg)
1169 machine = model.machines[machine_id]
1170 return machine, machine.series
1171
1172 async def execute_action(
1173 self,
1174 application_name: str,
1175 model_name: str,
1176 action_name: str,
1177 db_dict: dict = None,
1178 machine_id: str = None,
1179 progress_timeout: float = None,
1180 total_timeout: float = None,
1181 **kwargs,
1182 ):
1183 """Execute action
1184
1185 :param: application_name: Application name
1186 :param: model_name: Model name
1187 :param: action_name: Name of the action
1188 :param: db_dict: Dictionary with data of the DB to write the updates
1189 :param: machine_id Machine id
1190 :param: progress_timeout: Maximum time between two updates in the model
1191 :param: total_timeout: Timeout for the entity to be active
1192
1193 :return: (str, str): (output and status)
1194 """
1195 self.log.debug(
1196 "Executing action {} using params {}".format(action_name, kwargs)
1197 )
1198 # Get controller
1199 controller = await self.get_controller()
1200
1201 # Get model
1202 model = await self.get_model(controller, model_name)
1203
1204 try:
1205 # Get application
1206 application = self._get_application(
1207 model,
1208 application_name=application_name,
1209 )
1210 if application is None:
1211 raise JujuApplicationNotFound("Cannot execute action")
1212 # Racing condition:
1213 # Ocassionally, self._get_leader_unit() will return None
1214 # because the leader elected hook has not been triggered yet.
1215 # Therefore, we are doing some retries. If it happens again,
1216 # re-open bug 1236
1217 if machine_id is None:
1218 unit = await self._get_leader_unit(application)
1219 self.log.debug(
1220 "Action {} is being executed on the leader unit {}".format(
1221 action_name, unit.name
1222 )
1223 )
1224 else:
1225 unit = self._get_unit(application, machine_id)
1226 if not unit:
1227 raise JujuError(
1228 "A unit with machine id {} not in available units".format(
1229 machine_id
1230 )
1231 )
1232 self.log.debug(
1233 "Action {} is being executed on {} unit".format(
1234 action_name, unit.name
1235 )
1236 )
1237
1238 actions = await application.get_actions()
1239
1240 if action_name not in actions:
1241 raise JujuActionNotFound(
1242 "Action {} not in available actions".format(action_name)
1243 )
1244
1245 action = await unit.run_action(action_name, **kwargs)
1246
1247 self.log.debug(
1248 "Wait until action {} is completed in application {} (model={})".format(
1249 action_name, application_name, model_name
1250 )
1251 )
1252 await JujuModelWatcher.wait_for(
1253 model=model,
1254 entity=action,
1255 progress_timeout=progress_timeout,
1256 total_timeout=total_timeout,
1257 db_dict=db_dict,
1258 n2vc=self.n2vc,
1259 vca_id=self.vca_connection._vca_id,
1260 )
1261
1262 output = await model.get_action_output(action_uuid=action.entity_id)
1263 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1264 status = (
1265 status[action.entity_id] if action.entity_id in status else "failed"
1266 )
1267
1268 self.log.debug(
1269 "Action {} completed with status {} in application {} (model={})".format(
1270 action_name, action.status, application_name, model_name
1271 )
1272 )
1273 finally:
1274 await self.disconnect_model(model)
1275 await self.disconnect_controller(controller)
1276
1277 return output, status
1278
1279 async def get_actions(self, application_name: str, model_name: str) -> dict:
1280 """Get list of actions
1281
1282 :param: application_name: Application name
1283 :param: model_name: Model name
1284
1285 :return: Dict with this format
1286 {
1287 "action_name": "Description of the action",
1288 ...
1289 }
1290 """
1291 self.log.debug(
1292 "Getting list of actions for application {}".format(application_name)
1293 )
1294
1295 # Get controller
1296 controller = await self.get_controller()
1297
1298 # Get model
1299 model = await self.get_model(controller, model_name)
1300
1301 try:
1302 # Get application
1303 application = self._get_application(
1304 model,
1305 application_name=application_name,
1306 )
1307
1308 # Return list of actions
1309 return await application.get_actions()
1310
1311 finally:
1312 # Disconnect from model and controller
1313 await self.disconnect_model(model)
1314 await self.disconnect_controller(controller)
1315
1316 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1317 """Get the metrics collected by the VCA.
1318
1319 :param model_name The name or unique id of the network service
1320 :param application_name The name of the application
1321 """
1322 if not model_name or not application_name:
1323 raise Exception("model_name and application_name must be non-empty strings")
1324 metrics = {}
1325 controller = await self.get_controller()
1326 model = await self.get_model(controller, model_name)
1327 try:
1328 application = self._get_application(model, application_name)
1329 if application is not None:
1330 metrics = await application.get_metrics()
1331 finally:
1332 self.disconnect_model(model)
1333 self.disconnect_controller(controller)
1334 return metrics
1335
1336 async def add_relation(
1337 self,
1338 model_name: str,
1339 endpoint_1: str,
1340 endpoint_2: str,
1341 ):
1342 """Add relation
1343
1344 :param: model_name: Model name
1345 :param: endpoint_1 First endpoint name
1346 ("app:endpoint" format or directly the saas name)
1347 :param: endpoint_2: Second endpoint name (^ same format)
1348 """
1349
1350 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1351
1352 # Get controller
1353 controller = await self.get_controller()
1354
1355 # Get model
1356 model = await self.get_model(controller, model_name)
1357
1358 # Add relation
1359 try:
1360 await model.add_relation(endpoint_1, endpoint_2)
1361 except juju.errors.JujuAPIError as e:
1362 if self._relation_is_not_found(e):
1363 self.log.warning("Relation not found: {}".format(e.message))
1364 return
1365 if self._relation_already_exist(e):
1366 self.log.warning("Relation already exists: {}".format(e.message))
1367 return
1368 # another exception, raise it
1369 raise e
1370 finally:
1371 await self.disconnect_model(model)
1372 await self.disconnect_controller(controller)
1373
1374 def _relation_is_not_found(self, juju_error):
1375 text = "not found"
1376 return (text in juju_error.message) or (
1377 juju_error.error_code and text in juju_error.error_code
1378 )
1379
1380 def _relation_already_exist(self, juju_error):
1381 text = "already exists"
1382 return (text in juju_error.message) or (
1383 juju_error.error_code and text in juju_error.error_code
1384 )
1385
1386 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1387 """
1388 Create an offer from a RelationEndpoint
1389
1390 :param: endpoint: Relation endpoint
1391
1392 :return: Offer object
1393 """
1394 model_name = endpoint.model_name
1395 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1396 controller = await self.get_controller()
1397 model = None
1398 try:
1399 model = await self.get_model(controller, model_name)
1400 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1401 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1402 if offer_list:
1403 return Offer(offer_list[0].offer_url)
1404 else:
1405 raise Exception("offer was not created")
1406 except juju.errors.JujuError as e:
1407 if "application offer already exists" not in e.message:
1408 raise e
1409 finally:
1410 if model:
1411 self.disconnect_model(model)
1412 self.disconnect_controller(controller)
1413
1414 async def consume(
1415 self,
1416 model_name: str,
1417 offer: Offer,
1418 provider_libjuju: "Libjuju",
1419 ) -> str:
1420 """
1421 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1422
1423 :param: model_name: Model name
1424 :param: offer: Offer object to consume
1425 :param: provider_libjuju: Libjuju object of the provider endpoint
1426
1427 :raises ParseError if there's a problem parsing the offer_url
1428 :raises JujuError if remote offer includes and endpoint
1429 :raises JujuAPIError if the operation is not successful
1430
1431 :returns: Saas name. It is the application name in the model that reference the remote application.
1432 """
1433 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1434 if offer.vca_id:
1435 saas_name = f"{saas_name}-{offer.vca_id}"
1436 controller = await self.get_controller()
1437 model = None
1438 provider_controller = None
1439 try:
1440 model = await controller.get_model(model_name)
1441 provider_controller = await provider_libjuju.get_controller()
1442 await model.consume(
1443 offer.url, application_alias=saas_name, controller=provider_controller
1444 )
1445 return saas_name
1446 finally:
1447 if model:
1448 await self.disconnect_model(model)
1449 if provider_controller:
1450 await provider_libjuju.disconnect_controller(provider_controller)
1451 await self.disconnect_controller(controller)
1452
1453 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1454 """
1455 Destroy model
1456
1457 :param: model_name: Model name
1458 :param: total_timeout: Timeout
1459 """
1460
1461 controller = await self.get_controller()
1462 model = None
1463 try:
1464 if not await self.model_exists(model_name, controller=controller):
1465 self.log.warn(f"Model {model_name} doesn't exist")
1466 return
1467
1468 self.log.debug(f"Getting model {model_name} to be destroyed")
1469 model = await self.get_model(controller, model_name)
1470 self.log.debug(f"Destroying manual machines in model {model_name}")
1471 # Destroy machines that are manually provisioned
1472 # and still are in pending state
1473 await self._destroy_pending_machines(model, only_manual=True)
1474 await self.disconnect_model(model)
1475
1476 await asyncio.wait_for(
1477 self._destroy_model(model_name, controller),
1478 timeout=total_timeout,
1479 )
1480 except Exception as e:
1481 if not await self.model_exists(model_name, controller=controller):
1482 self.log.warn(
1483 f"Failed deleting model {model_name}: model doesn't exist"
1484 )
1485 return
1486 self.log.warn(f"Failed deleting model {model_name}: {e}")
1487 raise e
1488 finally:
1489 if model:
1490 await self.disconnect_model(model)
1491 await self.disconnect_controller(controller)
1492
1493 async def _destroy_model(
1494 self,
1495 model_name: str,
1496 controller: Controller,
1497 ):
1498 """
1499 Destroy model from controller
1500
1501 :param: model: Model name to be removed
1502 :param: controller: Controller object
1503 :param: timeout: Timeout in seconds
1504 """
1505 self.log.debug(f"Destroying model {model_name}")
1506
1507 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1508 self.log.info(f"Gracefully deleting model {model_name}")
1509 resolved = False
1510 while model_name in await controller.list_models():
1511 if not resolved:
1512 await self.resolve(model_name)
1513 resolved = True
1514 await controller.destroy_model(model_name, destroy_storage=True)
1515
1516 await asyncio.sleep(5)
1517 self.log.info(f"Model {model_name} deleted gracefully")
1518
1519 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1520 self.log.info(f"Forcefully deleting model {model_name}")
1521 while model_name in await controller.list_models():
1522 await controller.destroy_model(
1523 model_name, destroy_storage=True, force=True, max_wait=60
1524 )
1525 await asyncio.sleep(5)
1526 self.log.info(f"Model {model_name} deleted forcefully")
1527
1528 try:
1529 try:
1530 await asyncio.wait_for(
1531 _destroy_model_gracefully(model_name, controller), timeout=120
1532 )
1533 except asyncio.TimeoutError:
1534 await _destroy_model_forcefully(model_name, controller)
1535 except juju.errors.JujuError as e:
1536 if any("has been removed" in error for error in e.errors):
1537 return
1538 if any("model not found" in error for error in e.errors):
1539 return
1540 raise e
1541
1542 async def destroy_application(
1543 self, model_name: str, application_name: str, total_timeout: float
1544 ):
1545 """
1546 Destroy application
1547
1548 :param: model_name: Model name
1549 :param: application_name: Application name
1550 :param: total_timeout: Timeout
1551 """
1552
1553 controller = await self.get_controller()
1554 model = None
1555
1556 try:
1557 model = await self.get_model(controller, model_name)
1558 self.log.debug(
1559 "Destroying application {} in model {}".format(
1560 application_name, model_name
1561 )
1562 )
1563 application = self._get_application(model, application_name)
1564 if application:
1565 await application.destroy()
1566 else:
1567 self.log.warning("Application not found: {}".format(application_name))
1568
1569 self.log.debug(
1570 "Waiting for application {} to be destroyed in model {}...".format(
1571 application_name, model_name
1572 )
1573 )
1574 if total_timeout is None:
1575 total_timeout = 3600
1576 end = time.time() + total_timeout
1577 while time.time() < end:
1578 if not self._get_application(model, application_name):
1579 self.log.debug(
1580 "The application {} was destroyed in model {} ".format(
1581 application_name, model_name
1582 )
1583 )
1584 return
1585 await asyncio.sleep(5)
1586 raise Exception(
1587 "Timeout waiting for application {} to be destroyed in model {}".format(
1588 application_name, model_name
1589 )
1590 )
1591 finally:
1592 if model is not None:
1593 await self.disconnect_model(model)
1594 await self.disconnect_controller(controller)
1595
1596 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1597 """
1598 Destroy pending machines in a given model
1599
1600 :param: only_manual: Bool that indicates only manually provisioned
1601 machines should be destroyed (if True), or that
1602 all pending machines should be destroyed
1603 """
1604 status = await model.get_status()
1605 for machine_id in status.machines:
1606 machine_status = status.machines[machine_id]
1607 if machine_status.agent_status.status == "pending":
1608 if only_manual and not machine_status.instance_id.startswith("manual:"):
1609 break
1610 machine = model.machines[machine_id]
1611 await machine.destroy(force=True)
1612
1613 async def configure_application(
1614 self, model_name: str, application_name: str, config: dict = None
1615 ):
1616 """Configure application
1617
1618 :param: model_name: Model name
1619 :param: application_name: Application name
1620 :param: config: Config to apply to the charm
1621 """
1622 self.log.debug("Configuring application {}".format(application_name))
1623
1624 if config:
1625 controller = await self.get_controller()
1626 model = None
1627 try:
1628 model = await self.get_model(controller, model_name)
1629 application = self._get_application(
1630 model,
1631 application_name=application_name,
1632 )
1633 await application.set_config(config)
1634 finally:
1635 if model:
1636 await self.disconnect_model(model)
1637 await self.disconnect_controller(controller)
1638
1639 async def health_check(self, interval: float = 300.0):
1640 """
1641 Health check to make sure controller and controller_model connections are OK
1642
1643 :param: interval: Time in seconds between checks
1644 """
1645 controller = None
1646 while True:
1647 try:
1648 controller = await self.get_controller()
1649 # self.log.debug("VCA is alive")
1650 except Exception as e:
1651 self.log.error("Health check to VCA failed: {}".format(e))
1652 finally:
1653 await self.disconnect_controller(controller)
1654 await asyncio.sleep(interval)
1655
1656 async def list_models(self, contains: str = None) -> [str]:
1657 """List models with certain names
1658
1659 :param: contains: String that is contained in model name
1660
1661 :retur: [models] Returns list of model names
1662 """
1663
1664 controller = await self.get_controller()
1665 try:
1666 models = await controller.list_models()
1667 if contains:
1668 models = [model for model in models if contains in model]
1669 return models
1670 finally:
1671 await self.disconnect_controller(controller)
1672
1673 async def _list_offers(
1674 self, model_name: str, offer_name: str = None
1675 ) -> QueryApplicationOffersResults:
1676 """
1677 List offers within a model
1678
1679 :param: model_name: Model name
1680 :param: offer_name: Offer name to filter.
1681
1682 :return: Returns application offers results in the model
1683 """
1684
1685 controller = await self.get_controller()
1686 try:
1687 offers = (await controller.list_offers(model_name)).results
1688 if offer_name:
1689 matching_offer = []
1690 for offer in offers:
1691 if offer.offer_name == offer_name:
1692 matching_offer.append(offer)
1693 break
1694 offers = matching_offer
1695 return offers
1696 finally:
1697 await self.disconnect_controller(controller)
1698
1699 async def add_k8s(
1700 self,
1701 name: str,
1702 rbac_id: str,
1703 token: str,
1704 client_cert_data: str,
1705 configuration: Configuration,
1706 storage_class: str,
1707 credential_name: str = None,
1708 ):
1709 """
1710 Add a Kubernetes cloud to the controller
1711
1712 Similar to the `juju add-k8s` command in the CLI
1713
1714 :param: name: Name for the K8s cloud
1715 :param: configuration: Kubernetes configuration object
1716 :param: storage_class: Storage Class to use in the cloud
1717 :param: credential_name: Storage Class to use in the cloud
1718 """
1719
1720 if not storage_class:
1721 raise Exception("storage_class must be a non-empty string")
1722 if not name:
1723 raise Exception("name must be a non-empty string")
1724 if not configuration:
1725 raise Exception("configuration must be provided")
1726
1727 endpoint = configuration.host
1728 credential = self.get_k8s_cloud_credential(
1729 configuration,
1730 client_cert_data,
1731 token,
1732 )
1733 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1734 cloud = client.Cloud(
1735 type_="kubernetes",
1736 auth_types=[credential.auth_type],
1737 endpoint=endpoint,
1738 ca_certificates=[client_cert_data],
1739 config={
1740 "operator-storage": storage_class,
1741 "workload-storage": storage_class,
1742 },
1743 )
1744
1745 return await self.add_cloud(
1746 name, cloud, credential, credential_name=credential_name
1747 )
1748
1749 def get_k8s_cloud_credential(
1750 self,
1751 configuration: Configuration,
1752 client_cert_data: str,
1753 token: str = None,
1754 ) -> client.CloudCredential:
1755 attrs = {}
1756 # TODO: Test with AKS
1757 key = None # open(configuration.key_file, "r").read()
1758 username = configuration.username
1759 password = configuration.password
1760
1761 if client_cert_data:
1762 attrs["ClientCertificateData"] = client_cert_data
1763 if key:
1764 attrs["ClientKeyData"] = key
1765 if token:
1766 if username or password:
1767 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1768 attrs["Token"] = token
1769
1770 auth_type = None
1771 if key:
1772 auth_type = "oauth2"
1773 if client_cert_data:
1774 auth_type = "oauth2withcert"
1775 if not token:
1776 raise JujuInvalidK8sConfiguration(
1777 "missing token for auth type {}".format(auth_type)
1778 )
1779 elif username:
1780 if not password:
1781 self.log.debug(
1782 "credential for user {} has empty password".format(username)
1783 )
1784 attrs["username"] = username
1785 attrs["password"] = password
1786 if client_cert_data:
1787 auth_type = "userpasswithcert"
1788 else:
1789 auth_type = "userpass"
1790 elif client_cert_data and token:
1791 auth_type = "certificate"
1792 else:
1793 raise JujuInvalidK8sConfiguration("authentication method not supported")
1794 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1795
1796 async def add_cloud(
1797 self,
1798 name: str,
1799 cloud: Cloud,
1800 credential: CloudCredential = None,
1801 credential_name: str = None,
1802 ) -> Cloud:
1803 """
1804 Add cloud to the controller
1805
1806 :param: name: Name of the cloud to be added
1807 :param: cloud: Cloud object
1808 :param: credential: CloudCredentials object for the cloud
1809 :param: credential_name: Credential name.
1810 If not defined, cloud of the name will be used.
1811 """
1812 controller = await self.get_controller()
1813 try:
1814 _ = await controller.add_cloud(name, cloud)
1815 if credential:
1816 await controller.add_credential(
1817 credential_name or name, credential=credential, cloud=name
1818 )
1819 # Need to return the object returned by the controller.add_cloud() function
1820 # I'm returning the original value now until this bug is fixed:
1821 # https://github.com/juju/python-libjuju/issues/443
1822 return cloud
1823 finally:
1824 await self.disconnect_controller(controller)
1825
1826 async def remove_cloud(self, name: str):
1827 """
1828 Remove cloud
1829
1830 :param: name: Name of the cloud to be removed
1831 """
1832 controller = await self.get_controller()
1833 try:
1834 await controller.remove_cloud(name)
1835 except juju.errors.JujuError as e:
1836 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1837 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1838 else:
1839 raise e
1840 finally:
1841 await self.disconnect_controller(controller)
1842
1843 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1844 async def _get_leader_unit(self, application: Application) -> Unit:
1845 unit = None
1846 for u in application.units:
1847 if await u.is_leader_from_status():
1848 unit = u
1849 break
1850 if not unit:
1851 raise Exception()
1852 return unit
1853
1854 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1855 """
1856 Get cloud credentials
1857
1858 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1859
1860 :return: List of credentials object associated to the specified cloud
1861
1862 """
1863 controller = await self.get_controller()
1864 try:
1865 facade = client.CloudFacade.from_connection(controller.connection())
1866 cloud_cred_tag = tag.credential(
1867 cloud.name, self.vca_connection.data.user, cloud.credential_name
1868 )
1869 params = [client.Entity(cloud_cred_tag)]
1870 return (await facade.Credential(params)).results
1871 finally:
1872 await self.disconnect_controller(controller)
1873
1874 async def check_application_exists(self, model_name, application_name) -> bool:
1875 """Check application exists
1876
1877 :param: model_name: Model Name
1878 :param: application_name: Application Name
1879
1880 :return: Boolean
1881 """
1882
1883 model = None
1884 controller = await self.get_controller()
1885 try:
1886 model = await self.get_model(controller, model_name)
1887 self.log.debug(
1888 "Checking if application {} exists in model {}".format(
1889 application_name, model_name
1890 )
1891 )
1892 return self._get_application(model, application_name) is not None
1893 finally:
1894 if model:
1895 await self.disconnect_model(model)
1896 await self.disconnect_controller(controller)