Fix black errors
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.definitions import Offer, RelationEndpoint
37 from n2vc.juju_watcher import JujuModelWatcher
38 from n2vc.provisioner import AsyncSSHProvisioner
39 from n2vc.n2vc_conn import N2VCConnector
40 from n2vc.exceptions import (
41 JujuMachineNotFound,
42 JujuApplicationNotFound,
43 JujuLeaderUnitNotFound,
44 JujuActionNotFound,
45 JujuControllerFailedConnecting,
46 JujuApplicationExists,
47 JujuInvalidK8sConfiguration,
48 JujuError,
49 )
50 from n2vc.vca.cloud import Cloud as VcaCloud
51 from n2vc.vca.connection import Connection
52 from kubernetes.client.configuration import Configuration
53 from retrying_async import retry
54
55
56 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 class Libjuju:
60 def __init__(
61 self,
62 vca_connection: Connection,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 n2vc: N2VCConnector = None,
66 ):
67 """
68 Constructor
69
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
72 :param: log: Logger
73 :param: n2vc: N2VC object
74 """
75
76 self.log = log or logging.getLogger("Libjuju")
77 self.n2vc = n2vc
78 self.vca_connection = vca_connection
79
80 self.loop = loop or asyncio.get_event_loop()
81 self.loop.set_exception_handler(self.handle_exception)
82 self.creating_model = asyncio.Lock(loop=self.loop)
83
84 if self.vca_connection.is_default:
85 self.health_check_task = self._create_health_check_task()
86
87 def _create_health_check_task(self):
88 return self.loop.create_task(self.health_check())
89
90 async def get_controller(self, timeout: float = 60.0) -> Controller:
91 """
92 Get controller
93
94 :param: timeout: Time in seconds to wait for controller to connect
95 """
96 controller = None
97 try:
98 controller = Controller()
99 await asyncio.wait_for(
100 controller.connect(
101 endpoint=self.vca_connection.data.endpoints,
102 username=self.vca_connection.data.user,
103 password=self.vca_connection.data.secret,
104 cacert=self.vca_connection.data.cacert,
105 ),
106 timeout=timeout,
107 )
108 if self.vca_connection.is_default:
109 endpoints = await controller.api_endpoints
110 if not all(
111 endpoint in self.vca_connection.endpoints for endpoint in endpoints
112 ):
113 await self.vca_connection.update_endpoints(endpoints)
114 return controller
115 except asyncio.CancelledError as e:
116 raise e
117 except Exception as e:
118 self.log.error(
119 "Failed connecting to controller: {}... {}".format(
120 self.vca_connection.data.endpoints, e
121 )
122 )
123 if controller:
124 await self.disconnect_controller(controller)
125
126 raise JujuControllerFailedConnecting(
127 f"Error connecting to Juju controller: {e}"
128 )
129
130 async def disconnect(self):
131 """Disconnect"""
132 # Cancel health check task
133 self.health_check_task.cancel()
134 self.log.debug("Libjuju disconnected!")
135
136 async def disconnect_model(self, model: Model):
137 """
138 Disconnect model
139
140 :param: model: Model that will be disconnected
141 """
142 await model.disconnect()
143
144 async def disconnect_controller(self, controller: Controller):
145 """
146 Disconnect controller
147
148 :param: controller: Controller that will be disconnected
149 """
150 if controller:
151 await controller.disconnect()
152
153 @retry(attempts=3, delay=5, timeout=None)
154 async def add_model(self, model_name: str, cloud: VcaCloud):
155 """
156 Create model
157
158 :param: model_name: Model name
159 :param: cloud: Cloud object
160 """
161
162 # Get controller
163 controller = await self.get_controller()
164 model = None
165 try:
166 # Block until other workers have finished model creation
167 while self.creating_model.locked():
168 await asyncio.sleep(0.1)
169
170 # Create the model
171 async with self.creating_model:
172 if await self.model_exists(model_name, controller=controller):
173 return
174 self.log.debug("Creating model {}".format(model_name))
175 model = await controller.add_model(
176 model_name,
177 config=self.vca_connection.data.model_config,
178 cloud_name=cloud.name,
179 credential_name=cloud.credential_name,
180 )
181 except juju.errors.JujuAPIError as e:
182 if "already exists" in e.message:
183 pass
184 else:
185 raise e
186 finally:
187 if model:
188 await self.disconnect_model(model)
189 await self.disconnect_controller(controller)
190
191 async def get_executed_actions(self, model_name: str) -> list:
192 """
193 Get executed/history of actions for a model.
194
195 :param: model_name: Model name, str.
196 :return: List of executed actions for a model.
197 """
198 model = None
199 executed_actions = []
200 controller = await self.get_controller()
201 try:
202 model = await self.get_model(controller, model_name)
203 # Get all unique action names
204 actions = {}
205 for application in model.applications:
206 application_actions = await self.get_actions(application, model_name)
207 actions.update(application_actions)
208 # Get status of all actions
209 for application_action in actions:
210 app_action_status_list = await model.get_action_status(
211 name=application_action
212 )
213 for action_id, action_status in app_action_status_list.items():
214 executed_action = {
215 "id": action_id,
216 "action": application_action,
217 "status": action_status,
218 }
219 # Get action output by id
220 action_status = await model.get_action_output(executed_action["id"])
221 for k, v in action_status.items():
222 executed_action[k] = v
223 executed_actions.append(executed_action)
224 except Exception as e:
225 raise JujuError(
226 "Error in getting executed actions for model: {}. Error: {}".format(
227 model_name, str(e)
228 )
229 )
230 finally:
231 if model:
232 await self.disconnect_model(model)
233 await self.disconnect_controller(controller)
234 return executed_actions
235
236 async def get_application_configs(
237 self, model_name: str, application_name: str
238 ) -> dict:
239 """
240 Get available configs for an application.
241
242 :param: model_name: Model name, str.
243 :param: application_name: Application name, str.
244
245 :return: A dict which has key - action name, value - action description
246 """
247 model = None
248 application_configs = {}
249 controller = await self.get_controller()
250 try:
251 model = await self.get_model(controller, model_name)
252 application = self._get_application(
253 model, application_name=application_name
254 )
255 application_configs = await application.get_config()
256 except Exception as e:
257 raise JujuError(
258 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259 application_name, model_name, str(e)
260 )
261 )
262 finally:
263 if model:
264 await self.disconnect_model(model)
265 await self.disconnect_controller(controller)
266 return application_configs
267
268 @retry(attempts=3, delay=5)
269 async def get_model(self, controller: Controller, model_name: str) -> Model:
270 """
271 Get model from controller
272
273 :param: controller: Controller
274 :param: model_name: Model name
275
276 :return: Model: The created Juju model object
277 """
278 return await controller.get_model(model_name)
279
280 async def model_exists(
281 self, model_name: str, controller: Controller = None
282 ) -> bool:
283 """
284 Check if model exists
285
286 :param: controller: Controller
287 :param: model_name: Model name
288
289 :return bool
290 """
291 need_to_disconnect = False
292
293 # Get controller if not passed
294 if not controller:
295 controller = await self.get_controller()
296 need_to_disconnect = True
297
298 # Check if model exists
299 try:
300 return model_name in await controller.list_models()
301 finally:
302 if need_to_disconnect:
303 await self.disconnect_controller(controller)
304
305 async def models_exist(self, model_names: [str]) -> (bool, list):
306 """
307 Check if models exists
308
309 :param: model_names: List of strings with model names
310
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 """
313 if not model_names:
314 raise Exception(
315 "model_names must be a non-empty array. Given value: {}".format(
316 model_names
317 )
318 )
319 non_existing_models = []
320 models = await self.list_models()
321 existing_models = list(set(models).intersection(model_names))
322 non_existing_models = list(set(model_names) - set(existing_models))
323
324 return (
325 len(non_existing_models) == 0,
326 non_existing_models,
327 )
328
329 async def get_model_status(self, model_name: str) -> FullStatus:
330 """
331 Get model status
332
333 :param: model_name: Model name
334
335 :return: Full status object
336 """
337 controller = await self.get_controller()
338 model = await self.get_model(controller, model_name)
339 try:
340 return await model.get_status()
341 finally:
342 await self.disconnect_model(model)
343 await self.disconnect_controller(controller)
344
345 async def create_machine(
346 self,
347 model_name: str,
348 machine_id: str = None,
349 db_dict: dict = None,
350 progress_timeout: float = None,
351 total_timeout: float = None,
352 series: str = "bionic",
353 wait: bool = True,
354 ) -> (Machine, bool):
355 """
356 Create machine
357
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
365
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
368 """
369 new = False
370 machine = None
371
372 self.log.debug(
373 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374 )
375
376 # Get controller
377 controller = await self.get_controller()
378
379 # Get model
380 model = await self.get_model(controller, model_name)
381 try:
382 if machine_id is not None:
383 self.log.debug(
384 "Searching machine (id={}) in model {}".format(
385 machine_id, model_name
386 )
387 )
388
389 # Get machines from model and get the machine with machine_id if exists
390 machines = await model.get_machines()
391 if machine_id in machines:
392 self.log.debug(
393 "Machine (id={}) found in model {}".format(
394 machine_id, model_name
395 )
396 )
397 machine = machines[machine_id]
398 else:
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 if machine is None:
402 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404 # Create machine
405 machine = await model.add_machine(
406 spec=None, constraints=None, disks=None, series=series
407 )
408 new = True
409
410 # Wait until the machine is ready
411 self.log.debug(
412 "Wait until machine {} is ready in model {}".format(
413 machine.entity_id, model_name
414 )
415 )
416 if wait:
417 await JujuModelWatcher.wait_for(
418 model=model,
419 entity=machine,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout,
422 db_dict=db_dict,
423 n2vc=self.n2vc,
424 vca_id=self.vca_connection._vca_id,
425 )
426 finally:
427 await self.disconnect_model(model)
428 await self.disconnect_controller(controller)
429
430 self.log.debug(
431 "Machine {} ready at {} in model {}".format(
432 machine.entity_id, machine.dns_name, model_name
433 )
434 )
435 return machine, new
436
437 async def provision_machine(
438 self,
439 model_name: str,
440 hostname: str,
441 username: str,
442 private_key_path: str,
443 db_dict: dict = None,
444 progress_timeout: float = None,
445 total_timeout: float = None,
446 ) -> str:
447 """
448 Manually provisioning of a machine
449
450 :param: model_name: Model name
451 :param: hostname: IP to access the machine
452 :param: username: Username to login to the machine
453 :param: private_key_path: Local path for the private key
454 :param: db_dict: Dictionary with data of the DB to write the updates
455 :param: progress_timeout: Maximum time between two updates in the model
456 :param: total_timeout: Timeout for the entity to be active
457
458 :return: (Entity): Machine id
459 """
460 self.log.debug(
461 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462 model_name, hostname, username
463 )
464 )
465
466 # Get controller
467 controller = await self.get_controller()
468
469 # Get model
470 model = await self.get_model(controller, model_name)
471
472 try:
473 # Get provisioner
474 provisioner = AsyncSSHProvisioner(
475 host=hostname,
476 user=username,
477 private_key_path=private_key_path,
478 log=self.log,
479 )
480
481 # Provision machine
482 params = await provisioner.provision_machine()
483
484 params.jobs = ["JobHostUnits"]
485
486 self.log.debug("Adding machine to model")
487 connection = model.connection()
488 client_facade = client.ClientFacade.from_connection(connection)
489
490 results = await client_facade.AddMachines(params=[params])
491 error = results.machines[0].error
492
493 if error:
494 msg = "Error adding machine: {}".format(error.message)
495 self.log.error(msg=msg)
496 raise ValueError(msg)
497
498 machine_id = results.machines[0].machine
499
500 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
501 asyncio.ensure_future(
502 provisioner.install_agent(
503 connection=connection,
504 nonce=params.nonce,
505 machine_id=machine_id,
506 proxy=self.vca_connection.data.api_proxy,
507 series=params.series,
508 )
509 )
510
511 machine = None
512 for _ in range(10):
513 machine_list = await model.get_machines()
514 if machine_id in machine_list:
515 self.log.debug("Machine {} found in model!".format(machine_id))
516 machine = model.machines.get(machine_id)
517 break
518 await asyncio.sleep(2)
519
520 if machine is None:
521 msg = "Machine {} not found in model".format(machine_id)
522 self.log.error(msg=msg)
523 raise JujuMachineNotFound(msg)
524
525 self.log.debug(
526 "Wait until machine {} is ready in model {}".format(
527 machine.entity_id, model_name
528 )
529 )
530 await JujuModelWatcher.wait_for(
531 model=model,
532 entity=machine,
533 progress_timeout=progress_timeout,
534 total_timeout=total_timeout,
535 db_dict=db_dict,
536 n2vc=self.n2vc,
537 vca_id=self.vca_connection._vca_id,
538 )
539 except Exception as e:
540 raise e
541 finally:
542 await self.disconnect_model(model)
543 await self.disconnect_controller(controller)
544
545 self.log.debug(
546 "Machine provisioned {} in model {}".format(machine_id, model_name)
547 )
548
549 return machine_id
550
551 async def deploy(
552 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
553 ):
554 """
555 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
556
557 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
558 :param: model_name: Model name
559 :param: wait: Indicates whether to wait or not until all applications are active
560 :param: timeout: Time in seconds to wait until all applications are active
561 """
562 controller = await self.get_controller()
563 model = await self.get_model(controller, model_name)
564 try:
565 await model.deploy(uri, trust=True)
566 if wait:
567 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
568 self.log.debug("All units active in model {}".format(model_name))
569 finally:
570 await self.disconnect_model(model)
571 await self.disconnect_controller(controller)
572
573 async def add_unit(
574 self,
575 application_name: str,
576 model_name: str,
577 machine_id: str,
578 db_dict: dict = None,
579 progress_timeout: float = None,
580 total_timeout: float = None,
581 ):
582 """Add unit
583
584 :param: application_name: Application name
585 :param: model_name: Model name
586 :param: machine_id Machine id
587 :param: db_dict: Dictionary with data of the DB to write the updates
588 :param: progress_timeout: Maximum time between two updates in the model
589 :param: total_timeout: Timeout for the entity to be active
590
591 :return: None
592 """
593
594 model = None
595 controller = await self.get_controller()
596 try:
597 model = await self.get_model(controller, model_name)
598 application = self._get_application(model, application_name)
599
600 if application is not None:
601 # Checks if the given machine id in the model,
602 # otherwise function raises an error
603 _machine, _series = self._get_machine_info(model, machine_id)
604
605 self.log.debug(
606 "Adding unit (machine {}) to application {} in model ~{}".format(
607 machine_id, application_name, model_name
608 )
609 )
610
611 await application.add_unit(to=machine_id)
612
613 await JujuModelWatcher.wait_for(
614 model=model,
615 entity=application,
616 progress_timeout=progress_timeout,
617 total_timeout=total_timeout,
618 db_dict=db_dict,
619 n2vc=self.n2vc,
620 vca_id=self.vca_connection._vca_id,
621 )
622 self.log.debug(
623 "Unit is added to application {} in model {}".format(
624 application_name, model_name
625 )
626 )
627 else:
628 raise JujuApplicationNotFound(
629 "Application {} not exists".format(application_name)
630 )
631 finally:
632 if model:
633 await self.disconnect_model(model)
634 await self.disconnect_controller(controller)
635
636 async def destroy_unit(
637 self,
638 application_name: str,
639 model_name: str,
640 machine_id: str,
641 total_timeout: float = None,
642 ):
643 """Destroy unit
644
645 :param: application_name: Application name
646 :param: model_name: Model name
647 :param: machine_id Machine id
648 :param: total_timeout: Timeout for the entity to be active
649
650 :return: None
651 """
652
653 model = None
654 controller = await self.get_controller()
655 try:
656 model = await self.get_model(controller, model_name)
657 application = self._get_application(model, application_name)
658
659 if application is None:
660 raise JujuApplicationNotFound(
661 "Application not found: {} (model={})".format(
662 application_name, model_name
663 )
664 )
665
666 unit = self._get_unit(application, machine_id)
667 if not unit:
668 raise JujuError(
669 "A unit with machine id {} not in available units".format(
670 machine_id
671 )
672 )
673
674 unit_name = unit.name
675
676 self.log.debug(
677 "Destroying unit {} from application {} in model {}".format(
678 unit_name, application_name, model_name
679 )
680 )
681 await application.destroy_unit(unit_name)
682
683 self.log.debug(
684 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
685 unit_name, application_name, model_name
686 )
687 )
688
689 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
690 if total_timeout is None:
691 total_timeout = 3600
692 end = time.time() + total_timeout
693 while time.time() < end:
694 if not self._get_unit(application, machine_id):
695 self.log.debug(
696 "The unit {} was destroyed in application {} (model={}) ".format(
697 unit_name, application_name, model_name
698 )
699 )
700 return
701 await asyncio.sleep(5)
702 self.log.debug(
703 "Unit {} is destroyed from application {} in model {}".format(
704 unit_name, application_name, model_name
705 )
706 )
707 finally:
708 if model:
709 await self.disconnect_model(model)
710 await self.disconnect_controller(controller)
711
712 async def deploy_charm(
713 self,
714 application_name: str,
715 path: str,
716 model_name: str,
717 machine_id: str,
718 db_dict: dict = None,
719 progress_timeout: float = None,
720 total_timeout: float = None,
721 config: dict = None,
722 series: str = None,
723 num_units: int = 1,
724 ):
725 """Deploy charm
726
727 :param: application_name: Application name
728 :param: path: Local path to the charm
729 :param: model_name: Model name
730 :param: machine_id ID of the machine
731 :param: db_dict: Dictionary with data of the DB to write the updates
732 :param: progress_timeout: Maximum time between two updates in the model
733 :param: total_timeout: Timeout for the entity to be active
734 :param: config: Config for the charm
735 :param: series: Series of the charm
736 :param: num_units: Number of units
737
738 :return: (juju.application.Application): Juju application
739 """
740 self.log.debug(
741 "Deploying charm {} to machine {} in model ~{}".format(
742 application_name, machine_id, model_name
743 )
744 )
745 self.log.debug("charm: {}".format(path))
746
747 # Get controller
748 controller = await self.get_controller()
749
750 # Get model
751 model = await self.get_model(controller, model_name)
752
753 try:
754 if application_name not in model.applications:
755 if machine_id is not None:
756 machine, series = self._get_machine_info(model, machine_id)
757
758 application = await model.deploy(
759 entity_url=path,
760 application_name=application_name,
761 channel="stable",
762 num_units=1,
763 series=series,
764 to=machine_id,
765 config=config,
766 )
767
768 self.log.debug(
769 "Wait until application {} is ready in model {}".format(
770 application_name, model_name
771 )
772 )
773 if num_units > 1:
774 for _ in range(num_units - 1):
775 m, _ = await self.create_machine(model_name, wait=False)
776 await application.add_unit(to=m.entity_id)
777
778 await JujuModelWatcher.wait_for(
779 model=model,
780 entity=application,
781 progress_timeout=progress_timeout,
782 total_timeout=total_timeout,
783 db_dict=db_dict,
784 n2vc=self.n2vc,
785 vca_id=self.vca_connection._vca_id,
786 )
787 self.log.debug(
788 "Application {} is ready in model {}".format(
789 application_name, model_name
790 )
791 )
792 else:
793 raise JujuApplicationExists(
794 "Application {} exists".format(application_name)
795 )
796 except juju.errors.JujuError as e:
797 if "already exists" in e.message:
798 raise JujuApplicationExists(
799 "Application {} exists".format(application_name)
800 )
801 else:
802 raise e
803 finally:
804 await self.disconnect_model(model)
805 await self.disconnect_controller(controller)
806
807 return application
808
809 async def upgrade_charm(
810 self,
811 application_name: str,
812 path: str,
813 model_name: str,
814 total_timeout: float = None,
815 **kwargs,
816 ):
817 """Upgrade Charm
818
819 :param: application_name: Application name
820 :param: model_name: Model name
821 :param: path: Local path to the charm
822 :param: total_timeout: Timeout for the entity to be active
823
824 :return: (str, str): (output and status)
825 """
826
827 self.log.debug(
828 "Upgrading charm {} in model {} from path {}".format(
829 application_name, model_name, path
830 )
831 )
832
833 await self.resolve_application(
834 model_name=model_name, application_name=application_name
835 )
836
837 # Get controller
838 controller = await self.get_controller()
839
840 # Get model
841 model = await self.get_model(controller, model_name)
842
843 try:
844 # Get application
845 application = self._get_application(
846 model,
847 application_name=application_name,
848 )
849 if application is None:
850 raise JujuApplicationNotFound(
851 "Cannot find application {} to upgrade".format(application_name)
852 )
853
854 await application.refresh(path=path)
855
856 self.log.debug(
857 "Wait until charm upgrade is completed for application {} (model={})".format(
858 application_name, model_name
859 )
860 )
861
862 await JujuModelWatcher.ensure_units_idle(
863 model=model, application=application
864 )
865
866 if application.status == "error":
867 error_message = "Unknown"
868 for unit in application.units:
869 if (
870 unit.workload_status == "error"
871 and unit.workload_status_message != ""
872 ):
873 error_message = unit.workload_status_message
874
875 message = "Application {} failed update in {}: {}".format(
876 application_name, model_name, error_message
877 )
878 self.log.error(message)
879 raise JujuError(message=message)
880
881 self.log.debug(
882 "Application {} is ready in model {}".format(
883 application_name, model_name
884 )
885 )
886
887 finally:
888 await self.disconnect_model(model)
889 await self.disconnect_controller(controller)
890
891 return application
892
893 async def resolve_application(self, model_name: str, application_name: str):
894 controller = await self.get_controller()
895 model = await self.get_model(controller, model_name)
896
897 try:
898 application = self._get_application(
899 model,
900 application_name=application_name,
901 )
902 if application is None:
903 raise JujuApplicationNotFound(
904 "Cannot find application {} to resolve".format(application_name)
905 )
906
907 while application.status == "error":
908 for unit in application.units:
909 if unit.workload_status == "error":
910 self.log.debug(
911 "Model {}, Application {}, Unit {} in error state, resolving".format(
912 model_name, application_name, unit.entity_id
913 )
914 )
915 try:
916 await unit.resolved(retry=False)
917 except Exception:
918 pass
919
920 await asyncio.sleep(1)
921
922 finally:
923 await self.disconnect_model(model)
924 await self.disconnect_controller(controller)
925
926 async def resolve(self, model_name: str):
927 controller = await self.get_controller()
928 model = await self.get_model(controller, model_name)
929 all_units_active = False
930 try:
931 while not all_units_active:
932 all_units_active = True
933 for application_name, application in model.applications.items():
934 if application.status == "error":
935 for unit in application.units:
936 if unit.workload_status == "error":
937 self.log.debug(
938 "Model {}, Application {}, Unit {} in error state, resolving".format(
939 model_name, application_name, unit.entity_id
940 )
941 )
942 try:
943 await unit.resolved(retry=False)
944 all_units_active = False
945 except Exception:
946 pass
947
948 if not all_units_active:
949 await asyncio.sleep(5)
950 finally:
951 await self.disconnect_model(model)
952 await self.disconnect_controller(controller)
953
954 async def scale_application(
955 self,
956 model_name: str,
957 application_name: str,
958 scale: int = 1,
959 total_timeout: float = None,
960 ):
961 """
962 Scale application (K8s)
963
964 :param: model_name: Model name
965 :param: application_name: Application name
966 :param: scale: Scale to which to set this application
967 :param: total_timeout: Timeout for the entity to be active
968 """
969
970 model = None
971 controller = await self.get_controller()
972 try:
973 model = await self.get_model(controller, model_name)
974
975 self.log.debug(
976 "Scaling application {} in model {}".format(
977 application_name, model_name
978 )
979 )
980 application = self._get_application(model, application_name)
981 if application is None:
982 raise JujuApplicationNotFound("Cannot scale application")
983 await application.scale(scale=scale)
984 # Wait until application is scaled in model
985 self.log.debug(
986 "Waiting for application {} to be scaled in model {}...".format(
987 application_name, model_name
988 )
989 )
990 if total_timeout is None:
991 total_timeout = 1800
992 end = time.time() + total_timeout
993 while time.time() < end:
994 application_scale = self._get_application_count(model, application_name)
995 # Before calling wait_for_model function,
996 # wait until application unit count and scale count are equal.
997 # Because there is a delay before scaling triggers in Juju model.
998 if application_scale == scale:
999 await JujuModelWatcher.wait_for_model(
1000 model=model, timeout=total_timeout
1001 )
1002 self.log.debug(
1003 "Application {} is scaled in model {}".format(
1004 application_name, model_name
1005 )
1006 )
1007 return
1008 await asyncio.sleep(5)
1009 raise Exception(
1010 "Timeout waiting for application {} in model {} to be scaled".format(
1011 application_name, model_name
1012 )
1013 )
1014 finally:
1015 if model:
1016 await self.disconnect_model(model)
1017 await self.disconnect_controller(controller)
1018
1019 def _get_application_count(self, model: Model, application_name: str) -> int:
1020 """Get number of units of the application
1021
1022 :param: model: Model object
1023 :param: application_name: Application name
1024
1025 :return: int (or None if application doesn't exist)
1026 """
1027 application = self._get_application(model, application_name)
1028 if application is not None:
1029 return len(application.units)
1030
1031 def _get_application(self, model: Model, application_name: str) -> Application:
1032 """Get application
1033
1034 :param: model: Model object
1035 :param: application_name: Application name
1036
1037 :return: juju.application.Application (or None if it doesn't exist)
1038 """
1039 if model.applications and application_name in model.applications:
1040 return model.applications[application_name]
1041
1042 def _get_unit(self, application: Application, machine_id: str) -> Unit:
1043 """Get unit
1044
1045 :param: application: Application object
1046 :param: machine_id: Machine id
1047
1048 :return: Unit
1049 """
1050 unit = None
1051 for u in application.units:
1052 if u.machine_id == machine_id:
1053 unit = u
1054 break
1055 return unit
1056
1057 def _get_machine_info(
1058 self,
1059 model,
1060 machine_id: str,
1061 ) -> (str, str):
1062 """Get machine info
1063
1064 :param: model: Model object
1065 :param: machine_id: Machine id
1066
1067 :return: (str, str): (machine, series)
1068 """
1069 if machine_id not in model.machines:
1070 msg = "Machine {} not found in model".format(machine_id)
1071 self.log.error(msg=msg)
1072 raise JujuMachineNotFound(msg)
1073 machine = model.machines[machine_id]
1074 return machine, machine.series
1075
1076 async def execute_action(
1077 self,
1078 application_name: str,
1079 model_name: str,
1080 action_name: str,
1081 db_dict: dict = None,
1082 machine_id: str = None,
1083 progress_timeout: float = None,
1084 total_timeout: float = None,
1085 **kwargs,
1086 ):
1087 """Execute action
1088
1089 :param: application_name: Application name
1090 :param: model_name: Model name
1091 :param: action_name: Name of the action
1092 :param: db_dict: Dictionary with data of the DB to write the updates
1093 :param: machine_id Machine id
1094 :param: progress_timeout: Maximum time between two updates in the model
1095 :param: total_timeout: Timeout for the entity to be active
1096
1097 :return: (str, str): (output and status)
1098 """
1099 self.log.debug(
1100 "Executing action {} using params {}".format(action_name, kwargs)
1101 )
1102 # Get controller
1103 controller = await self.get_controller()
1104
1105 # Get model
1106 model = await self.get_model(controller, model_name)
1107
1108 try:
1109 # Get application
1110 application = self._get_application(
1111 model,
1112 application_name=application_name,
1113 )
1114 if application is None:
1115 raise JujuApplicationNotFound("Cannot execute action")
1116 # Racing condition:
1117 # Ocassionally, self._get_leader_unit() will return None
1118 # because the leader elected hook has not been triggered yet.
1119 # Therefore, we are doing some retries. If it happens again,
1120 # re-open bug 1236
1121 if machine_id is None:
1122 unit = await self._get_leader_unit(application)
1123 self.log.debug(
1124 "Action {} is being executed on the leader unit {}".format(
1125 action_name, unit.name
1126 )
1127 )
1128 else:
1129 unit = self._get_unit(application, machine_id)
1130 if not unit:
1131 raise JujuError(
1132 "A unit with machine id {} not in available units".format(
1133 machine_id
1134 )
1135 )
1136 self.log.debug(
1137 "Action {} is being executed on {} unit".format(
1138 action_name, unit.name
1139 )
1140 )
1141
1142 actions = await application.get_actions()
1143
1144 if action_name not in actions:
1145 raise JujuActionNotFound(
1146 "Action {} not in available actions".format(action_name)
1147 )
1148
1149 action = await unit.run_action(action_name, **kwargs)
1150
1151 self.log.debug(
1152 "Wait until action {} is completed in application {} (model={})".format(
1153 action_name, application_name, model_name
1154 )
1155 )
1156 await JujuModelWatcher.wait_for(
1157 model=model,
1158 entity=action,
1159 progress_timeout=progress_timeout,
1160 total_timeout=total_timeout,
1161 db_dict=db_dict,
1162 n2vc=self.n2vc,
1163 vca_id=self.vca_connection._vca_id,
1164 )
1165
1166 output = await model.get_action_output(action_uuid=action.entity_id)
1167 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1168 status = (
1169 status[action.entity_id] if action.entity_id in status else "failed"
1170 )
1171
1172 self.log.debug(
1173 "Action {} completed with status {} in application {} (model={})".format(
1174 action_name, action.status, application_name, model_name
1175 )
1176 )
1177 finally:
1178 await self.disconnect_model(model)
1179 await self.disconnect_controller(controller)
1180
1181 return output, status
1182
1183 async def get_actions(self, application_name: str, model_name: str) -> dict:
1184 """Get list of actions
1185
1186 :param: application_name: Application name
1187 :param: model_name: Model name
1188
1189 :return: Dict with this format
1190 {
1191 "action_name": "Description of the action",
1192 ...
1193 }
1194 """
1195 self.log.debug(
1196 "Getting list of actions for application {}".format(application_name)
1197 )
1198
1199 # Get controller
1200 controller = await self.get_controller()
1201
1202 # Get model
1203 model = await self.get_model(controller, model_name)
1204
1205 try:
1206 # Get application
1207 application = self._get_application(
1208 model,
1209 application_name=application_name,
1210 )
1211
1212 # Return list of actions
1213 return await application.get_actions()
1214
1215 finally:
1216 # Disconnect from model and controller
1217 await self.disconnect_model(model)
1218 await self.disconnect_controller(controller)
1219
1220 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1221 """Get the metrics collected by the VCA.
1222
1223 :param model_name The name or unique id of the network service
1224 :param application_name The name of the application
1225 """
1226 if not model_name or not application_name:
1227 raise Exception("model_name and application_name must be non-empty strings")
1228 metrics = {}
1229 controller = await self.get_controller()
1230 model = await self.get_model(controller, model_name)
1231 try:
1232 application = self._get_application(model, application_name)
1233 if application is not None:
1234 metrics = await application.get_metrics()
1235 finally:
1236 self.disconnect_model(model)
1237 self.disconnect_controller(controller)
1238 return metrics
1239
1240 async def add_relation(
1241 self,
1242 model_name: str,
1243 endpoint_1: str,
1244 endpoint_2: str,
1245 ):
1246 """Add relation
1247
1248 :param: model_name: Model name
1249 :param: endpoint_1 First endpoint name
1250 ("app:endpoint" format or directly the saas name)
1251 :param: endpoint_2: Second endpoint name (^ same format)
1252 """
1253
1254 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1255
1256 # Get controller
1257 controller = await self.get_controller()
1258
1259 # Get model
1260 model = await self.get_model(controller, model_name)
1261
1262 # Add relation
1263 try:
1264 await model.add_relation(endpoint_1, endpoint_2)
1265 except juju.errors.JujuAPIError as e:
1266 if "not found" in e.message:
1267 self.log.warning("Relation not found: {}".format(e.message))
1268 return
1269 if "already exists" in e.message:
1270 self.log.warning("Relation already exists: {}".format(e.message))
1271 return
1272 # another exception, raise it
1273 raise e
1274 finally:
1275 await self.disconnect_model(model)
1276 await self.disconnect_controller(controller)
1277
1278 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1279 """
1280 Create an offer from a RelationEndpoint
1281
1282 :param: endpoint: Relation endpoint
1283
1284 :return: Offer object
1285 """
1286 model_name = endpoint.model_name
1287 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1288 controller = await self.get_controller()
1289 model = None
1290 try:
1291 model = await self.get_model(controller, model_name)
1292 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1293 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1294 if offer_list:
1295 return Offer(offer_list[0].offer_url)
1296 else:
1297 raise Exception("offer was not created")
1298 except juju.errors.JujuError as e:
1299 if "application offer already exists" not in e.message:
1300 raise e
1301 finally:
1302 if model:
1303 self.disconnect_model(model)
1304 self.disconnect_controller(controller)
1305
1306 async def consume(
1307 self,
1308 model_name: str,
1309 offer: Offer,
1310 provider_libjuju: "Libjuju",
1311 ) -> str:
1312 """
1313 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1314
1315 :param: model_name: Model name
1316 :param: offer: Offer object to consume
1317 :param: provider_libjuju: Libjuju object of the provider endpoint
1318
1319 :raises ParseError if there's a problem parsing the offer_url
1320 :raises JujuError if remote offer includes and endpoint
1321 :raises JujuAPIError if the operation is not successful
1322
1323 :returns: Saas name. It is the application name in the model that reference the remote application.
1324 """
1325 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1326 if offer.vca_id:
1327 saas_name = f"{saas_name}-{offer.vca_id}"
1328 controller = await self.get_controller()
1329 model = None
1330 provider_controller = None
1331 try:
1332 model = await controller.get_model(model_name)
1333 provider_controller = await provider_libjuju.get_controller()
1334 await model.consume(
1335 offer.url, application_alias=saas_name, controller=provider_controller
1336 )
1337 return saas_name
1338 finally:
1339 if model:
1340 await self.disconnect_model(model)
1341 if provider_controller:
1342 await provider_libjuju.disconnect_controller(provider_controller)
1343 await self.disconnect_controller(controller)
1344
1345 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1346 """
1347 Destroy model
1348
1349 :param: model_name: Model name
1350 :param: total_timeout: Timeout
1351 """
1352
1353 controller = await self.get_controller()
1354 model = None
1355 try:
1356 if not await self.model_exists(model_name, controller=controller):
1357 self.log.warn(f"Model {model_name} doesn't exist")
1358 return
1359
1360 self.log.debug(f"Getting model {model_name} to be destroyed")
1361 model = await self.get_model(controller, model_name)
1362 self.log.debug(f"Destroying manual machines in model {model_name}")
1363 # Destroy machines that are manually provisioned
1364 # and still are in pending state
1365 await self._destroy_pending_machines(model, only_manual=True)
1366 await self.disconnect_model(model)
1367
1368 await asyncio.wait_for(
1369 self._destroy_model(model_name, controller),
1370 timeout=total_timeout,
1371 )
1372 except Exception as e:
1373 if not await self.model_exists(model_name, controller=controller):
1374 self.log.warn(
1375 f"Failed deleting model {model_name}: model doesn't exist"
1376 )
1377 return
1378 self.log.warn(f"Failed deleting model {model_name}: {e}")
1379 raise e
1380 finally:
1381 if model:
1382 await self.disconnect_model(model)
1383 await self.disconnect_controller(controller)
1384
1385 async def _destroy_model(
1386 self,
1387 model_name: str,
1388 controller: Controller,
1389 ):
1390 """
1391 Destroy model from controller
1392
1393 :param: model: Model name to be removed
1394 :param: controller: Controller object
1395 :param: timeout: Timeout in seconds
1396 """
1397 self.log.debug(f"Destroying model {model_name}")
1398
1399 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1400 self.log.info(f"Gracefully deleting model {model_name}")
1401 resolved = False
1402 while model_name in await controller.list_models():
1403 if not resolved:
1404 await self.resolve(model_name)
1405 resolved = True
1406 await controller.destroy_model(model_name, destroy_storage=True)
1407
1408 await asyncio.sleep(5)
1409 self.log.info(f"Model {model_name} deleted gracefully")
1410
1411 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1412 self.log.info(f"Forcefully deleting model {model_name}")
1413 while model_name in await controller.list_models():
1414 await controller.destroy_model(
1415 model_name, destroy_storage=True, force=True, max_wait=60
1416 )
1417 await asyncio.sleep(5)
1418 self.log.info(f"Model {model_name} deleted forcefully")
1419
1420 try:
1421 try:
1422 await asyncio.wait_for(
1423 _destroy_model_gracefully(model_name, controller), timeout=120
1424 )
1425 except asyncio.TimeoutError:
1426 await _destroy_model_forcefully(model_name, controller)
1427 except juju.errors.JujuError as e:
1428 if any("has been removed" in error for error in e.errors):
1429 return
1430 if any("model not found" in error for error in e.errors):
1431 return
1432 raise e
1433
1434 async def destroy_application(
1435 self, model_name: str, application_name: str, total_timeout: float
1436 ):
1437 """
1438 Destroy application
1439
1440 :param: model_name: Model name
1441 :param: application_name: Application name
1442 :param: total_timeout: Timeout
1443 """
1444
1445 controller = await self.get_controller()
1446 model = None
1447
1448 try:
1449 model = await self.get_model(controller, model_name)
1450 self.log.debug(
1451 "Destroying application {} in model {}".format(
1452 application_name, model_name
1453 )
1454 )
1455 application = self._get_application(model, application_name)
1456 if application:
1457 await application.destroy()
1458 else:
1459 self.log.warning("Application not found: {}".format(application_name))
1460
1461 self.log.debug(
1462 "Waiting for application {} to be destroyed in model {}...".format(
1463 application_name, model_name
1464 )
1465 )
1466 if total_timeout is None:
1467 total_timeout = 3600
1468 end = time.time() + total_timeout
1469 while time.time() < end:
1470 if not self._get_application(model, application_name):
1471 self.log.debug(
1472 "The application {} was destroyed in model {} ".format(
1473 application_name, model_name
1474 )
1475 )
1476 return
1477 await asyncio.sleep(5)
1478 raise Exception(
1479 "Timeout waiting for application {} to be destroyed in model {}".format(
1480 application_name, model_name
1481 )
1482 )
1483 finally:
1484 if model is not None:
1485 await self.disconnect_model(model)
1486 await self.disconnect_controller(controller)
1487
1488 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1489 """
1490 Destroy pending machines in a given model
1491
1492 :param: only_manual: Bool that indicates only manually provisioned
1493 machines should be destroyed (if True), or that
1494 all pending machines should be destroyed
1495 """
1496 status = await model.get_status()
1497 for machine_id in status.machines:
1498 machine_status = status.machines[machine_id]
1499 if machine_status.agent_status.status == "pending":
1500 if only_manual and not machine_status.instance_id.startswith("manual:"):
1501 break
1502 machine = model.machines[machine_id]
1503 await machine.destroy(force=True)
1504
1505 async def configure_application(
1506 self, model_name: str, application_name: str, config: dict = None
1507 ):
1508 """Configure application
1509
1510 :param: model_name: Model name
1511 :param: application_name: Application name
1512 :param: config: Config to apply to the charm
1513 """
1514 self.log.debug("Configuring application {}".format(application_name))
1515
1516 if config:
1517 controller = await self.get_controller()
1518 model = None
1519 try:
1520 model = await self.get_model(controller, model_name)
1521 application = self._get_application(
1522 model,
1523 application_name=application_name,
1524 )
1525 await application.set_config(config)
1526 finally:
1527 if model:
1528 await self.disconnect_model(model)
1529 await self.disconnect_controller(controller)
1530
1531 def handle_exception(self, loop, context):
1532 # All unhandled exceptions by libjuju are handled here.
1533 pass
1534
1535 async def health_check(self, interval: float = 300.0):
1536 """
1537 Health check to make sure controller and controller_model connections are OK
1538
1539 :param: interval: Time in seconds between checks
1540 """
1541 controller = None
1542 while True:
1543 try:
1544 controller = await self.get_controller()
1545 # self.log.debug("VCA is alive")
1546 except Exception as e:
1547 self.log.error("Health check to VCA failed: {}".format(e))
1548 finally:
1549 await self.disconnect_controller(controller)
1550 await asyncio.sleep(interval)
1551
1552 async def list_models(self, contains: str = None) -> [str]:
1553 """List models with certain names
1554
1555 :param: contains: String that is contained in model name
1556
1557 :retur: [models] Returns list of model names
1558 """
1559
1560 controller = await self.get_controller()
1561 try:
1562 models = await controller.list_models()
1563 if contains:
1564 models = [model for model in models if contains in model]
1565 return models
1566 finally:
1567 await self.disconnect_controller(controller)
1568
1569 async def _list_offers(
1570 self, model_name: str, offer_name: str = None
1571 ) -> QueryApplicationOffersResults:
1572 """
1573 List offers within a model
1574
1575 :param: model_name: Model name
1576 :param: offer_name: Offer name to filter.
1577
1578 :return: Returns application offers results in the model
1579 """
1580
1581 controller = await self.get_controller()
1582 try:
1583 offers = (await controller.list_offers(model_name)).results
1584 if offer_name:
1585 matching_offer = []
1586 for offer in offers:
1587 if offer.offer_name == offer_name:
1588 matching_offer.append(offer)
1589 break
1590 offers = matching_offer
1591 return offers
1592 finally:
1593 await self.disconnect_controller(controller)
1594
1595 async def add_k8s(
1596 self,
1597 name: str,
1598 rbac_id: str,
1599 token: str,
1600 client_cert_data: str,
1601 configuration: Configuration,
1602 storage_class: str,
1603 credential_name: str = None,
1604 ):
1605 """
1606 Add a Kubernetes cloud to the controller
1607
1608 Similar to the `juju add-k8s` command in the CLI
1609
1610 :param: name: Name for the K8s cloud
1611 :param: configuration: Kubernetes configuration object
1612 :param: storage_class: Storage Class to use in the cloud
1613 :param: credential_name: Storage Class to use in the cloud
1614 """
1615
1616 if not storage_class:
1617 raise Exception("storage_class must be a non-empty string")
1618 if not name:
1619 raise Exception("name must be a non-empty string")
1620 if not configuration:
1621 raise Exception("configuration must be provided")
1622
1623 endpoint = configuration.host
1624 credential = self.get_k8s_cloud_credential(
1625 configuration,
1626 client_cert_data,
1627 token,
1628 )
1629 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1630 cloud = client.Cloud(
1631 type_="kubernetes",
1632 auth_types=[credential.auth_type],
1633 endpoint=endpoint,
1634 ca_certificates=[client_cert_data],
1635 config={
1636 "operator-storage": storage_class,
1637 "workload-storage": storage_class,
1638 },
1639 )
1640
1641 return await self.add_cloud(
1642 name, cloud, credential, credential_name=credential_name
1643 )
1644
1645 def get_k8s_cloud_credential(
1646 self,
1647 configuration: Configuration,
1648 client_cert_data: str,
1649 token: str = None,
1650 ) -> client.CloudCredential:
1651 attrs = {}
1652 # TODO: Test with AKS
1653 key = None # open(configuration.key_file, "r").read()
1654 username = configuration.username
1655 password = configuration.password
1656
1657 if client_cert_data:
1658 attrs["ClientCertificateData"] = client_cert_data
1659 if key:
1660 attrs["ClientKeyData"] = key
1661 if token:
1662 if username or password:
1663 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1664 attrs["Token"] = token
1665
1666 auth_type = None
1667 if key:
1668 auth_type = "oauth2"
1669 if client_cert_data:
1670 auth_type = "oauth2withcert"
1671 if not token:
1672 raise JujuInvalidK8sConfiguration(
1673 "missing token for auth type {}".format(auth_type)
1674 )
1675 elif username:
1676 if not password:
1677 self.log.debug(
1678 "credential for user {} has empty password".format(username)
1679 )
1680 attrs["username"] = username
1681 attrs["password"] = password
1682 if client_cert_data:
1683 auth_type = "userpasswithcert"
1684 else:
1685 auth_type = "userpass"
1686 elif client_cert_data and token:
1687 auth_type = "certificate"
1688 else:
1689 raise JujuInvalidK8sConfiguration("authentication method not supported")
1690 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1691
1692 async def add_cloud(
1693 self,
1694 name: str,
1695 cloud: Cloud,
1696 credential: CloudCredential = None,
1697 credential_name: str = None,
1698 ) -> Cloud:
1699 """
1700 Add cloud to the controller
1701
1702 :param: name: Name of the cloud to be added
1703 :param: cloud: Cloud object
1704 :param: credential: CloudCredentials object for the cloud
1705 :param: credential_name: Credential name.
1706 If not defined, cloud of the name will be used.
1707 """
1708 controller = await self.get_controller()
1709 try:
1710 _ = await controller.add_cloud(name, cloud)
1711 if credential:
1712 await controller.add_credential(
1713 credential_name or name, credential=credential, cloud=name
1714 )
1715 # Need to return the object returned by the controller.add_cloud() function
1716 # I'm returning the original value now until this bug is fixed:
1717 # https://github.com/juju/python-libjuju/issues/443
1718 return cloud
1719 finally:
1720 await self.disconnect_controller(controller)
1721
1722 async def remove_cloud(self, name: str):
1723 """
1724 Remove cloud
1725
1726 :param: name: Name of the cloud to be removed
1727 """
1728 controller = await self.get_controller()
1729 try:
1730 await controller.remove_cloud(name)
1731 except juju.errors.JujuError as e:
1732 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1733 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1734 else:
1735 raise e
1736 finally:
1737 await self.disconnect_controller(controller)
1738
1739 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1740 async def _get_leader_unit(self, application: Application) -> Unit:
1741 unit = None
1742 for u in application.units:
1743 if await u.is_leader_from_status():
1744 unit = u
1745 break
1746 if not unit:
1747 raise Exception()
1748 return unit
1749
1750 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1751 """
1752 Get cloud credentials
1753
1754 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1755
1756 :return: List of credentials object associated to the specified cloud
1757
1758 """
1759 controller = await self.get_controller()
1760 try:
1761 facade = client.CloudFacade.from_connection(controller.connection())
1762 cloud_cred_tag = tag.credential(
1763 cloud.name, self.vca_connection.data.user, cloud.credential_name
1764 )
1765 params = [client.Entity(cloud_cred_tag)]
1766 return (await facade.Credential(params)).results
1767 finally:
1768 await self.disconnect_controller(controller)
1769
1770 async def check_application_exists(self, model_name, application_name) -> bool:
1771 """Check application exists
1772
1773 :param: model_name: Model Name
1774 :param: application_name: Application Name
1775
1776 :return: Boolean
1777 """
1778
1779 model = None
1780 controller = await self.get_controller()
1781 try:
1782 model = await self.get_model(controller, model_name)
1783 self.log.debug(
1784 "Checking if application {} exists in model {}".format(
1785 application_name, model_name
1786 )
1787 )
1788 return self._get_application(model, application_name) is not None
1789 finally:
1790 if model:
1791 await self.disconnect_model(model)
1792 await self.disconnect_controller(controller)