Fix bug 2060: Add logic to delete models gracefully
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.definitions import Offer, RelationEndpoint
37 from n2vc.juju_watcher import JujuModelWatcher
38 from n2vc.provisioner import AsyncSSHProvisioner
39 from n2vc.n2vc_conn import N2VCConnector
40 from n2vc.exceptions import (
41 JujuMachineNotFound,
42 JujuApplicationNotFound,
43 JujuLeaderUnitNotFound,
44 JujuActionNotFound,
45 JujuControllerFailedConnecting,
46 JujuApplicationExists,
47 JujuInvalidK8sConfiguration,
48 JujuError,
49 )
50 from n2vc.vca.cloud import Cloud as VcaCloud
51 from n2vc.vca.connection import Connection
52 from kubernetes.client.configuration import Configuration
53 from retrying_async import retry
54
55
56 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 class Libjuju:
60 def __init__(
61 self,
62 vca_connection: Connection,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 n2vc: N2VCConnector = None,
66 ):
67 """
68 Constructor
69
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
72 :param: log: Logger
73 :param: n2vc: N2VC object
74 """
75
76 self.log = log or logging.getLogger("Libjuju")
77 self.n2vc = n2vc
78 self.vca_connection = vca_connection
79
80 self.loop = loop or asyncio.get_event_loop()
81 self.loop.set_exception_handler(self.handle_exception)
82 self.creating_model = asyncio.Lock(loop=self.loop)
83
84 if self.vca_connection.is_default:
85 self.health_check_task = self._create_health_check_task()
86
87 def _create_health_check_task(self):
88 return self.loop.create_task(self.health_check())
89
90 async def get_controller(self, timeout: float = 60.0) -> Controller:
91 """
92 Get controller
93
94 :param: timeout: Time in seconds to wait for controller to connect
95 """
96 controller = None
97 try:
98 controller = Controller()
99 await asyncio.wait_for(
100 controller.connect(
101 endpoint=self.vca_connection.data.endpoints,
102 username=self.vca_connection.data.user,
103 password=self.vca_connection.data.secret,
104 cacert=self.vca_connection.data.cacert,
105 ),
106 timeout=timeout,
107 )
108 if self.vca_connection.is_default:
109 endpoints = await controller.api_endpoints
110 if not all(
111 endpoint in self.vca_connection.endpoints for endpoint in endpoints
112 ):
113 await self.vca_connection.update_endpoints(endpoints)
114 return controller
115 except asyncio.CancelledError as e:
116 raise e
117 except Exception as e:
118 self.log.error(
119 "Failed connecting to controller: {}... {}".format(
120 self.vca_connection.data.endpoints, e
121 )
122 )
123 if controller:
124 await self.disconnect_controller(controller)
125
126 raise JujuControllerFailedConnecting(
127 f"Error connecting to Juju controller: {e}"
128 )
129
130 async def disconnect(self):
131 """Disconnect"""
132 # Cancel health check task
133 self.health_check_task.cancel()
134 self.log.debug("Libjuju disconnected!")
135
136 async def disconnect_model(self, model: Model):
137 """
138 Disconnect model
139
140 :param: model: Model that will be disconnected
141 """
142 await model.disconnect()
143
144 async def disconnect_controller(self, controller: Controller):
145 """
146 Disconnect controller
147
148 :param: controller: Controller that will be disconnected
149 """
150 if controller:
151 await controller.disconnect()
152
153 @retry(attempts=3, delay=5, timeout=None)
154 async def add_model(self, model_name: str, cloud: VcaCloud):
155 """
156 Create model
157
158 :param: model_name: Model name
159 :param: cloud: Cloud object
160 """
161
162 # Get controller
163 controller = await self.get_controller()
164 model = None
165 try:
166 # Block until other workers have finished model creation
167 while self.creating_model.locked():
168 await asyncio.sleep(0.1)
169
170 # Create the model
171 async with self.creating_model:
172 if await self.model_exists(model_name, controller=controller):
173 return
174 self.log.debug("Creating model {}".format(model_name))
175 model = await controller.add_model(
176 model_name,
177 config=self.vca_connection.data.model_config,
178 cloud_name=cloud.name,
179 credential_name=cloud.credential_name,
180 )
181 except juju.errors.JujuAPIError as e:
182 if "already exists" in e.message:
183 pass
184 else:
185 raise e
186 finally:
187 if model:
188 await self.disconnect_model(model)
189 await self.disconnect_controller(controller)
190
191 async def get_executed_actions(self, model_name: str) -> list:
192 """
193 Get executed/history of actions for a model.
194
195 :param: model_name: Model name, str.
196 :return: List of executed actions for a model.
197 """
198 model = None
199 executed_actions = []
200 controller = await self.get_controller()
201 try:
202 model = await self.get_model(controller, model_name)
203 # Get all unique action names
204 actions = {}
205 for application in model.applications:
206 application_actions = await self.get_actions(application, model_name)
207 actions.update(application_actions)
208 # Get status of all actions
209 for application_action in actions:
210 app_action_status_list = await model.get_action_status(
211 name=application_action
212 )
213 for action_id, action_status in app_action_status_list.items():
214 executed_action = {
215 "id": action_id,
216 "action": application_action,
217 "status": action_status,
218 }
219 # Get action output by id
220 action_status = await model.get_action_output(executed_action["id"])
221 for k, v in action_status.items():
222 executed_action[k] = v
223 executed_actions.append(executed_action)
224 except Exception as e:
225 raise JujuError(
226 "Error in getting executed actions for model: {}. Error: {}".format(
227 model_name, str(e)
228 )
229 )
230 finally:
231 if model:
232 await self.disconnect_model(model)
233 await self.disconnect_controller(controller)
234 return executed_actions
235
236 async def get_application_configs(
237 self, model_name: str, application_name: str
238 ) -> dict:
239 """
240 Get available configs for an application.
241
242 :param: model_name: Model name, str.
243 :param: application_name: Application name, str.
244
245 :return: A dict which has key - action name, value - action description
246 """
247 model = None
248 application_configs = {}
249 controller = await self.get_controller()
250 try:
251 model = await self.get_model(controller, model_name)
252 application = self._get_application(
253 model, application_name=application_name
254 )
255 application_configs = await application.get_config()
256 except Exception as e:
257 raise JujuError(
258 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259 application_name, model_name, str(e)
260 )
261 )
262 finally:
263 if model:
264 await self.disconnect_model(model)
265 await self.disconnect_controller(controller)
266 return application_configs
267
268 @retry(attempts=3, delay=5)
269 async def get_model(self, controller: Controller, model_name: str) -> Model:
270 """
271 Get model from controller
272
273 :param: controller: Controller
274 :param: model_name: Model name
275
276 :return: Model: The created Juju model object
277 """
278 return await controller.get_model(model_name)
279
280 async def model_exists(
281 self, model_name: str, controller: Controller = None
282 ) -> bool:
283 """
284 Check if model exists
285
286 :param: controller: Controller
287 :param: model_name: Model name
288
289 :return bool
290 """
291 need_to_disconnect = False
292
293 # Get controller if not passed
294 if not controller:
295 controller = await self.get_controller()
296 need_to_disconnect = True
297
298 # Check if model exists
299 try:
300 return model_name in await controller.list_models()
301 finally:
302 if need_to_disconnect:
303 await self.disconnect_controller(controller)
304
305 async def models_exist(self, model_names: [str]) -> (bool, list):
306 """
307 Check if models exists
308
309 :param: model_names: List of strings with model names
310
311 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312 """
313 if not model_names:
314 raise Exception(
315 "model_names must be a non-empty array. Given value: {}".format(
316 model_names
317 )
318 )
319 non_existing_models = []
320 models = await self.list_models()
321 existing_models = list(set(models).intersection(model_names))
322 non_existing_models = list(set(model_names) - set(existing_models))
323
324 return (
325 len(non_existing_models) == 0,
326 non_existing_models,
327 )
328
329 async def get_model_status(self, model_name: str) -> FullStatus:
330 """
331 Get model status
332
333 :param: model_name: Model name
334
335 :return: Full status object
336 """
337 controller = await self.get_controller()
338 model = await self.get_model(controller, model_name)
339 try:
340 return await model.get_status()
341 finally:
342 await self.disconnect_model(model)
343 await self.disconnect_controller(controller)
344
345 async def create_machine(
346 self,
347 model_name: str,
348 machine_id: str = None,
349 db_dict: dict = None,
350 progress_timeout: float = None,
351 total_timeout: float = None,
352 series: str = "bionic",
353 wait: bool = True,
354 ) -> (Machine, bool):
355 """
356 Create machine
357
358 :param: model_name: Model name
359 :param: machine_id: Machine id
360 :param: db_dict: Dictionary with data of the DB to write the updates
361 :param: progress_timeout: Maximum time between two updates in the model
362 :param: total_timeout: Timeout for the entity to be active
363 :param: series: Series of the machine (xenial, bionic, focal, ...)
364 :param: wait: Wait until machine is ready
365
366 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
367 if the machine is new or it already existed
368 """
369 new = False
370 machine = None
371
372 self.log.debug(
373 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374 )
375
376 # Get controller
377 controller = await self.get_controller()
378
379 # Get model
380 model = await self.get_model(controller, model_name)
381 try:
382 if machine_id is not None:
383 self.log.debug(
384 "Searching machine (id={}) in model {}".format(
385 machine_id, model_name
386 )
387 )
388
389 # Get machines from model and get the machine with machine_id if exists
390 machines = await model.get_machines()
391 if machine_id in machines:
392 self.log.debug(
393 "Machine (id={}) found in model {}".format(
394 machine_id, model_name
395 )
396 )
397 machine = machines[machine_id]
398 else:
399 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 if machine is None:
402 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404 # Create machine
405 machine = await model.add_machine(
406 spec=None, constraints=None, disks=None, series=series
407 )
408 new = True
409
410 # Wait until the machine is ready
411 self.log.debug(
412 "Wait until machine {} is ready in model {}".format(
413 machine.entity_id, model_name
414 )
415 )
416 if wait:
417 await JujuModelWatcher.wait_for(
418 model=model,
419 entity=machine,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout,
422 db_dict=db_dict,
423 n2vc=self.n2vc,
424 vca_id=self.vca_connection._vca_id,
425 )
426 finally:
427 await self.disconnect_model(model)
428 await self.disconnect_controller(controller)
429
430 self.log.debug(
431 "Machine {} ready at {} in model {}".format(
432 machine.entity_id, machine.dns_name, model_name
433 )
434 )
435 return machine, new
436
437 async def provision_machine(
438 self,
439 model_name: str,
440 hostname: str,
441 username: str,
442 private_key_path: str,
443 db_dict: dict = None,
444 progress_timeout: float = None,
445 total_timeout: float = None,
446 ) -> str:
447 """
448 Manually provisioning of a machine
449
450 :param: model_name: Model name
451 :param: hostname: IP to access the machine
452 :param: username: Username to login to the machine
453 :param: private_key_path: Local path for the private key
454 :param: db_dict: Dictionary with data of the DB to write the updates
455 :param: progress_timeout: Maximum time between two updates in the model
456 :param: total_timeout: Timeout for the entity to be active
457
458 :return: (Entity): Machine id
459 """
460 self.log.debug(
461 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462 model_name, hostname, username
463 )
464 )
465
466 # Get controller
467 controller = await self.get_controller()
468
469 # Get model
470 model = await self.get_model(controller, model_name)
471
472 try:
473 # Get provisioner
474 provisioner = AsyncSSHProvisioner(
475 host=hostname,
476 user=username,
477 private_key_path=private_key_path,
478 log=self.log,
479 )
480
481 # Provision machine
482 params = await provisioner.provision_machine()
483
484 params.jobs = ["JobHostUnits"]
485
486 self.log.debug("Adding machine to model")
487 connection = model.connection()
488 client_facade = client.ClientFacade.from_connection(connection)
489
490 results = await client_facade.AddMachines(params=[params])
491 error = results.machines[0].error
492
493 if error:
494 msg = "Error adding machine: {}".format(error.message)
495 self.log.error(msg=msg)
496 raise ValueError(msg)
497
498 machine_id = results.machines[0].machine
499
500 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
501 asyncio.ensure_future(
502 provisioner.install_agent(
503 connection=connection,
504 nonce=params.nonce,
505 machine_id=machine_id,
506 proxy=self.vca_connection.data.api_proxy,
507 series=params.series,
508 )
509 )
510
511 machine = None
512 for _ in range(10):
513 machine_list = await model.get_machines()
514 if machine_id in machine_list:
515 self.log.debug("Machine {} found in model!".format(machine_id))
516 machine = model.machines.get(machine_id)
517 break
518 await asyncio.sleep(2)
519
520 if machine is None:
521 msg = "Machine {} not found in model".format(machine_id)
522 self.log.error(msg=msg)
523 raise JujuMachineNotFound(msg)
524
525 self.log.debug(
526 "Wait until machine {} is ready in model {}".format(
527 machine.entity_id, model_name
528 )
529 )
530 await JujuModelWatcher.wait_for(
531 model=model,
532 entity=machine,
533 progress_timeout=progress_timeout,
534 total_timeout=total_timeout,
535 db_dict=db_dict,
536 n2vc=self.n2vc,
537 vca_id=self.vca_connection._vca_id,
538 )
539 except Exception as e:
540 raise e
541 finally:
542 await self.disconnect_model(model)
543 await self.disconnect_controller(controller)
544
545 self.log.debug(
546 "Machine provisioned {} in model {}".format(machine_id, model_name)
547 )
548
549 return machine_id
550
551 async def deploy(
552 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
553 ):
554 """
555 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
556
557 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
558 :param: model_name: Model name
559 :param: wait: Indicates whether to wait or not until all applications are active
560 :param: timeout: Time in seconds to wait until all applications are active
561 """
562 controller = await self.get_controller()
563 model = await self.get_model(controller, model_name)
564 try:
565 await model.deploy(uri, trust=True)
566 if wait:
567 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
568 self.log.debug("All units active in model {}".format(model_name))
569 finally:
570 await self.disconnect_model(model)
571 await self.disconnect_controller(controller)
572
573 async def add_unit(
574 self,
575 application_name: str,
576 model_name: str,
577 machine_id: str,
578 db_dict: dict = None,
579 progress_timeout: float = None,
580 total_timeout: float = None,
581 ):
582 """Add unit
583
584 :param: application_name: Application name
585 :param: model_name: Model name
586 :param: machine_id Machine id
587 :param: db_dict: Dictionary with data of the DB to write the updates
588 :param: progress_timeout: Maximum time between two updates in the model
589 :param: total_timeout: Timeout for the entity to be active
590
591 :return: None
592 """
593
594 model = None
595 controller = await self.get_controller()
596 try:
597 model = await self.get_model(controller, model_name)
598 application = self._get_application(model, application_name)
599
600 if application is not None:
601
602 # Checks if the given machine id in the model,
603 # otherwise function raises an error
604 _machine, _series = self._get_machine_info(model, machine_id)
605
606 self.log.debug(
607 "Adding unit (machine {}) to application {} in model ~{}".format(
608 machine_id, application_name, model_name
609 )
610 )
611
612 await application.add_unit(to=machine_id)
613
614 await JujuModelWatcher.wait_for(
615 model=model,
616 entity=application,
617 progress_timeout=progress_timeout,
618 total_timeout=total_timeout,
619 db_dict=db_dict,
620 n2vc=self.n2vc,
621 vca_id=self.vca_connection._vca_id,
622 )
623 self.log.debug(
624 "Unit is added to application {} in model {}".format(
625 application_name, model_name
626 )
627 )
628 else:
629 raise JujuApplicationNotFound(
630 "Application {} not exists".format(application_name)
631 )
632 finally:
633 if model:
634 await self.disconnect_model(model)
635 await self.disconnect_controller(controller)
636
637 async def destroy_unit(
638 self,
639 application_name: str,
640 model_name: str,
641 machine_id: str,
642 total_timeout: float = None,
643 ):
644 """Destroy unit
645
646 :param: application_name: Application name
647 :param: model_name: Model name
648 :param: machine_id Machine id
649 :param: total_timeout: Timeout for the entity to be active
650
651 :return: None
652 """
653
654 model = None
655 controller = await self.get_controller()
656 try:
657 model = await self.get_model(controller, model_name)
658 application = self._get_application(model, application_name)
659
660 if application is None:
661 raise JujuApplicationNotFound(
662 "Application not found: {} (model={})".format(
663 application_name, model_name
664 )
665 )
666
667 unit = self._get_unit(application, machine_id)
668 if not unit:
669 raise JujuError(
670 "A unit with machine id {} not in available units".format(
671 machine_id
672 )
673 )
674
675 unit_name = unit.name
676
677 self.log.debug(
678 "Destroying unit {} from application {} in model {}".format(
679 unit_name, application_name, model_name
680 )
681 )
682 await application.destroy_unit(unit_name)
683
684 self.log.debug(
685 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
686 unit_name, application_name, model_name
687 )
688 )
689
690 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
691 if total_timeout is None:
692 total_timeout = 3600
693 end = time.time() + total_timeout
694 while time.time() < end:
695 if not self._get_unit(application, machine_id):
696 self.log.debug(
697 "The unit {} was destroyed in application {} (model={}) ".format(
698 unit_name, application_name, model_name
699 )
700 )
701 return
702 await asyncio.sleep(5)
703 self.log.debug(
704 "Unit {} is destroyed from application {} in model {}".format(
705 unit_name, application_name, model_name
706 )
707 )
708 finally:
709 if model:
710 await self.disconnect_model(model)
711 await self.disconnect_controller(controller)
712
713 async def deploy_charm(
714 self,
715 application_name: str,
716 path: str,
717 model_name: str,
718 machine_id: str,
719 db_dict: dict = None,
720 progress_timeout: float = None,
721 total_timeout: float = None,
722 config: dict = None,
723 series: str = None,
724 num_units: int = 1,
725 ):
726 """Deploy charm
727
728 :param: application_name: Application name
729 :param: path: Local path to the charm
730 :param: model_name: Model name
731 :param: machine_id ID of the machine
732 :param: db_dict: Dictionary with data of the DB to write the updates
733 :param: progress_timeout: Maximum time between two updates in the model
734 :param: total_timeout: Timeout for the entity to be active
735 :param: config: Config for the charm
736 :param: series: Series of the charm
737 :param: num_units: Number of units
738
739 :return: (juju.application.Application): Juju application
740 """
741 self.log.debug(
742 "Deploying charm {} to machine {} in model ~{}".format(
743 application_name, machine_id, model_name
744 )
745 )
746 self.log.debug("charm: {}".format(path))
747
748 # Get controller
749 controller = await self.get_controller()
750
751 # Get model
752 model = await self.get_model(controller, model_name)
753
754 try:
755 if application_name not in model.applications:
756
757 if machine_id is not None:
758 machine, series = self._get_machine_info(model, machine_id)
759
760 application = await model.deploy(
761 entity_url=path,
762 application_name=application_name,
763 channel="stable",
764 num_units=1,
765 series=series,
766 to=machine_id,
767 config=config,
768 )
769
770 self.log.debug(
771 "Wait until application {} is ready in model {}".format(
772 application_name, model_name
773 )
774 )
775 if num_units > 1:
776 for _ in range(num_units - 1):
777 m, _ = await self.create_machine(model_name, wait=False)
778 await application.add_unit(to=m.entity_id)
779
780 await JujuModelWatcher.wait_for(
781 model=model,
782 entity=application,
783 progress_timeout=progress_timeout,
784 total_timeout=total_timeout,
785 db_dict=db_dict,
786 n2vc=self.n2vc,
787 vca_id=self.vca_connection._vca_id,
788 )
789 self.log.debug(
790 "Application {} is ready in model {}".format(
791 application_name, model_name
792 )
793 )
794 else:
795 raise JujuApplicationExists(
796 "Application {} exists".format(application_name)
797 )
798 except juju.errors.JujuError as e:
799 if "already exists" in e.message:
800 raise JujuApplicationExists(
801 "Application {} exists".format(application_name)
802 )
803 else:
804 raise e
805 finally:
806 await self.disconnect_model(model)
807 await self.disconnect_controller(controller)
808
809 return application
810
811 async def upgrade_charm(
812 self,
813 application_name: str,
814 path: str,
815 model_name: str,
816 total_timeout: float = None,
817 **kwargs,
818 ):
819 """Upgrade Charm
820
821 :param: application_name: Application name
822 :param: model_name: Model name
823 :param: path: Local path to the charm
824 :param: total_timeout: Timeout for the entity to be active
825
826 :return: (str, str): (output and status)
827 """
828
829 self.log.debug(
830 "Upgrading charm {} in model {} from path {}".format(
831 application_name, model_name, path
832 )
833 )
834
835 await self.resolve_application(
836 model_name=model_name, application_name=application_name
837 )
838
839 # Get controller
840 controller = await self.get_controller()
841
842 # Get model
843 model = await self.get_model(controller, model_name)
844
845 try:
846 # Get application
847 application = self._get_application(
848 model,
849 application_name=application_name,
850 )
851 if application is None:
852 raise JujuApplicationNotFound(
853 "Cannot find application {} to upgrade".format(application_name)
854 )
855
856 await application.refresh(path=path)
857
858 self.log.debug(
859 "Wait until charm upgrade is completed for application {} (model={})".format(
860 application_name, model_name
861 )
862 )
863
864 await JujuModelWatcher.ensure_units_idle(
865 model=model, application=application
866 )
867
868 if application.status == "error":
869 error_message = "Unknown"
870 for unit in application.units:
871 if (
872 unit.workload_status == "error"
873 and unit.workload_status_message != ""
874 ):
875 error_message = unit.workload_status_message
876
877 message = "Application {} failed update in {}: {}".format(
878 application_name, model_name, error_message
879 )
880 self.log.error(message)
881 raise JujuError(message=message)
882
883 self.log.debug(
884 "Application {} is ready in model {}".format(
885 application_name, model_name
886 )
887 )
888
889 finally:
890 await self.disconnect_model(model)
891 await self.disconnect_controller(controller)
892
893 return application
894
895 async def resolve_application(self, model_name: str, application_name: str):
896
897 controller = await self.get_controller()
898 model = await self.get_model(controller, model_name)
899
900 try:
901 application = self._get_application(
902 model,
903 application_name=application_name,
904 )
905 if application is None:
906 raise JujuApplicationNotFound(
907 "Cannot find application {} to resolve".format(application_name)
908 )
909
910 while application.status == "error":
911 for unit in application.units:
912 if unit.workload_status == "error":
913 self.log.debug(
914 "Model {}, Application {}, Unit {} in error state, resolving".format(
915 model_name, application_name, unit.entity_id
916 )
917 )
918 try:
919 await unit.resolved(retry=False)
920 except Exception:
921 pass
922
923 await asyncio.sleep(1)
924
925 finally:
926 await self.disconnect_model(model)
927 await self.disconnect_controller(controller)
928
929 async def resolve(self, model_name: str):
930
931 controller = await self.get_controller()
932 model = await self.get_model(controller, model_name)
933 all_units_active = False
934 try:
935 while not all_units_active:
936 all_units_active = True
937 for application_name, application in model.applications.items():
938 if application.status == "error":
939 for unit in application.units:
940 if unit.workload_status == "error":
941 self.log.debug(
942 "Model {}, Application {}, Unit {} in error state, resolving".format(
943 model_name, application_name, unit.entity_id
944 )
945 )
946 try:
947 await unit.resolved(retry=False)
948 all_units_active = False
949 except Exception:
950 pass
951
952 if not all_units_active:
953 await asyncio.sleep(5)
954 finally:
955 await self.disconnect_model(model)
956 await self.disconnect_controller(controller)
957
958 async def scale_application(
959 self,
960 model_name: str,
961 application_name: str,
962 scale: int = 1,
963 total_timeout: float = None,
964 ):
965 """
966 Scale application (K8s)
967
968 :param: model_name: Model name
969 :param: application_name: Application name
970 :param: scale: Scale to which to set this application
971 :param: total_timeout: Timeout for the entity to be active
972 """
973
974 model = None
975 controller = await self.get_controller()
976 try:
977 model = await self.get_model(controller, model_name)
978
979 self.log.debug(
980 "Scaling application {} in model {}".format(
981 application_name, model_name
982 )
983 )
984 application = self._get_application(model, application_name)
985 if application is None:
986 raise JujuApplicationNotFound("Cannot scale application")
987 await application.scale(scale=scale)
988 # Wait until application is scaled in model
989 self.log.debug(
990 "Waiting for application {} to be scaled in model {}...".format(
991 application_name, model_name
992 )
993 )
994 if total_timeout is None:
995 total_timeout = 1800
996 end = time.time() + total_timeout
997 while time.time() < end:
998 application_scale = self._get_application_count(model, application_name)
999 # Before calling wait_for_model function,
1000 # wait until application unit count and scale count are equal.
1001 # Because there is a delay before scaling triggers in Juju model.
1002 if application_scale == scale:
1003 await JujuModelWatcher.wait_for_model(
1004 model=model, timeout=total_timeout
1005 )
1006 self.log.debug(
1007 "Application {} is scaled in model {}".format(
1008 application_name, model_name
1009 )
1010 )
1011 return
1012 await asyncio.sleep(5)
1013 raise Exception(
1014 "Timeout waiting for application {} in model {} to be scaled".format(
1015 application_name, model_name
1016 )
1017 )
1018 finally:
1019 if model:
1020 await self.disconnect_model(model)
1021 await self.disconnect_controller(controller)
1022
1023 def _get_application_count(self, model: Model, application_name: str) -> int:
1024 """Get number of units of the application
1025
1026 :param: model: Model object
1027 :param: application_name: Application name
1028
1029 :return: int (or None if application doesn't exist)
1030 """
1031 application = self._get_application(model, application_name)
1032 if application is not None:
1033 return len(application.units)
1034
1035 def _get_application(self, model: Model, application_name: str) -> Application:
1036 """Get application
1037
1038 :param: model: Model object
1039 :param: application_name: Application name
1040
1041 :return: juju.application.Application (or None if it doesn't exist)
1042 """
1043 if model.applications and application_name in model.applications:
1044 return model.applications[application_name]
1045
1046 def _get_unit(self, application: Application, machine_id: str) -> Unit:
1047 """Get unit
1048
1049 :param: application: Application object
1050 :param: machine_id: Machine id
1051
1052 :return: Unit
1053 """
1054 unit = None
1055 for u in application.units:
1056 if u.machine_id == machine_id:
1057 unit = u
1058 break
1059 return unit
1060
1061 def _get_machine_info(
1062 self,
1063 model,
1064 machine_id: str,
1065 ) -> (str, str):
1066 """Get machine info
1067
1068 :param: model: Model object
1069 :param: machine_id: Machine id
1070
1071 :return: (str, str): (machine, series)
1072 """
1073 if machine_id not in model.machines:
1074 msg = "Machine {} not found in model".format(machine_id)
1075 self.log.error(msg=msg)
1076 raise JujuMachineNotFound(msg)
1077 machine = model.machines[machine_id]
1078 return machine, machine.series
1079
1080 async def execute_action(
1081 self,
1082 application_name: str,
1083 model_name: str,
1084 action_name: str,
1085 db_dict: dict = None,
1086 machine_id: str = None,
1087 progress_timeout: float = None,
1088 total_timeout: float = None,
1089 **kwargs,
1090 ):
1091 """Execute action
1092
1093 :param: application_name: Application name
1094 :param: model_name: Model name
1095 :param: action_name: Name of the action
1096 :param: db_dict: Dictionary with data of the DB to write the updates
1097 :param: machine_id Machine id
1098 :param: progress_timeout: Maximum time between two updates in the model
1099 :param: total_timeout: Timeout for the entity to be active
1100
1101 :return: (str, str): (output and status)
1102 """
1103 self.log.debug(
1104 "Executing action {} using params {}".format(action_name, kwargs)
1105 )
1106 # Get controller
1107 controller = await self.get_controller()
1108
1109 # Get model
1110 model = await self.get_model(controller, model_name)
1111
1112 try:
1113 # Get application
1114 application = self._get_application(
1115 model,
1116 application_name=application_name,
1117 )
1118 if application is None:
1119 raise JujuApplicationNotFound("Cannot execute action")
1120 # Racing condition:
1121 # Ocassionally, self._get_leader_unit() will return None
1122 # because the leader elected hook has not been triggered yet.
1123 # Therefore, we are doing some retries. If it happens again,
1124 # re-open bug 1236
1125 if machine_id is None:
1126 unit = await self._get_leader_unit(application)
1127 self.log.debug(
1128 "Action {} is being executed on the leader unit {}".format(
1129 action_name, unit.name
1130 )
1131 )
1132 else:
1133 unit = self._get_unit(application, machine_id)
1134 if not unit:
1135 raise JujuError(
1136 "A unit with machine id {} not in available units".format(
1137 machine_id
1138 )
1139 )
1140 self.log.debug(
1141 "Action {} is being executed on {} unit".format(
1142 action_name, unit.name
1143 )
1144 )
1145
1146 actions = await application.get_actions()
1147
1148 if action_name not in actions:
1149 raise JujuActionNotFound(
1150 "Action {} not in available actions".format(action_name)
1151 )
1152
1153 action = await unit.run_action(action_name, **kwargs)
1154
1155 self.log.debug(
1156 "Wait until action {} is completed in application {} (model={})".format(
1157 action_name, application_name, model_name
1158 )
1159 )
1160 await JujuModelWatcher.wait_for(
1161 model=model,
1162 entity=action,
1163 progress_timeout=progress_timeout,
1164 total_timeout=total_timeout,
1165 db_dict=db_dict,
1166 n2vc=self.n2vc,
1167 vca_id=self.vca_connection._vca_id,
1168 )
1169
1170 output = await model.get_action_output(action_uuid=action.entity_id)
1171 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1172 status = (
1173 status[action.entity_id] if action.entity_id in status else "failed"
1174 )
1175
1176 self.log.debug(
1177 "Action {} completed with status {} in application {} (model={})".format(
1178 action_name, action.status, application_name, model_name
1179 )
1180 )
1181 finally:
1182 await self.disconnect_model(model)
1183 await self.disconnect_controller(controller)
1184
1185 return output, status
1186
1187 async def get_actions(self, application_name: str, model_name: str) -> dict:
1188 """Get list of actions
1189
1190 :param: application_name: Application name
1191 :param: model_name: Model name
1192
1193 :return: Dict with this format
1194 {
1195 "action_name": "Description of the action",
1196 ...
1197 }
1198 """
1199 self.log.debug(
1200 "Getting list of actions for application {}".format(application_name)
1201 )
1202
1203 # Get controller
1204 controller = await self.get_controller()
1205
1206 # Get model
1207 model = await self.get_model(controller, model_name)
1208
1209 try:
1210 # Get application
1211 application = self._get_application(
1212 model,
1213 application_name=application_name,
1214 )
1215
1216 # Return list of actions
1217 return await application.get_actions()
1218
1219 finally:
1220 # Disconnect from model and controller
1221 await self.disconnect_model(model)
1222 await self.disconnect_controller(controller)
1223
1224 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1225 """Get the metrics collected by the VCA.
1226
1227 :param model_name The name or unique id of the network service
1228 :param application_name The name of the application
1229 """
1230 if not model_name or not application_name:
1231 raise Exception("model_name and application_name must be non-empty strings")
1232 metrics = {}
1233 controller = await self.get_controller()
1234 model = await self.get_model(controller, model_name)
1235 try:
1236 application = self._get_application(model, application_name)
1237 if application is not None:
1238 metrics = await application.get_metrics()
1239 finally:
1240 self.disconnect_model(model)
1241 self.disconnect_controller(controller)
1242 return metrics
1243
1244 async def add_relation(
1245 self,
1246 model_name: str,
1247 endpoint_1: str,
1248 endpoint_2: str,
1249 ):
1250 """Add relation
1251
1252 :param: model_name: Model name
1253 :param: endpoint_1 First endpoint name
1254 ("app:endpoint" format or directly the saas name)
1255 :param: endpoint_2: Second endpoint name (^ same format)
1256 """
1257
1258 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1259
1260 # Get controller
1261 controller = await self.get_controller()
1262
1263 # Get model
1264 model = await self.get_model(controller, model_name)
1265
1266 # Add relation
1267 try:
1268 await model.add_relation(endpoint_1, endpoint_2)
1269 except juju.errors.JujuAPIError as e:
1270 if "not found" in e.message:
1271 self.log.warning("Relation not found: {}".format(e.message))
1272 return
1273 if "already exists" in e.message:
1274 self.log.warning("Relation already exists: {}".format(e.message))
1275 return
1276 # another exception, raise it
1277 raise e
1278 finally:
1279 await self.disconnect_model(model)
1280 await self.disconnect_controller(controller)
1281
1282 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1283 """
1284 Create an offer from a RelationEndpoint
1285
1286 :param: endpoint: Relation endpoint
1287
1288 :return: Offer object
1289 """
1290 model_name = endpoint.model_name
1291 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1292 controller = await self.get_controller()
1293 model = None
1294 try:
1295 model = await self.get_model(controller, model_name)
1296 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1297 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1298 if offer_list:
1299 return Offer(offer_list[0].offer_url)
1300 else:
1301 raise Exception("offer was not created")
1302 except juju.errors.JujuError as e:
1303 if "application offer already exists" not in e.message:
1304 raise e
1305 finally:
1306 if model:
1307 self.disconnect_model(model)
1308 self.disconnect_controller(controller)
1309
1310 async def consume(
1311 self,
1312 model_name: str,
1313 offer: Offer,
1314 provider_libjuju: "Libjuju",
1315 ) -> str:
1316 """
1317 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1318
1319 :param: model_name: Model name
1320 :param: offer: Offer object to consume
1321 :param: provider_libjuju: Libjuju object of the provider endpoint
1322
1323 :raises ParseError if there's a problem parsing the offer_url
1324 :raises JujuError if remote offer includes and endpoint
1325 :raises JujuAPIError if the operation is not successful
1326
1327 :returns: Saas name. It is the application name in the model that reference the remote application.
1328 """
1329 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1330 if offer.vca_id:
1331 saas_name = f"{saas_name}-{offer.vca_id}"
1332 controller = await self.get_controller()
1333 model = None
1334 provider_controller = None
1335 try:
1336 model = await controller.get_model(model_name)
1337 provider_controller = await provider_libjuju.get_controller()
1338 await model.consume(
1339 offer.url, application_alias=saas_name, controller=provider_controller
1340 )
1341 return saas_name
1342 finally:
1343 if model:
1344 await self.disconnect_model(model)
1345 if provider_controller:
1346 await provider_libjuju.disconnect_controller(provider_controller)
1347 await self.disconnect_controller(controller)
1348
1349 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1350 """
1351 Destroy model
1352
1353 :param: model_name: Model name
1354 :param: total_timeout: Timeout
1355 """
1356
1357 controller = await self.get_controller()
1358 model = None
1359 try:
1360 if not await self.model_exists(model_name, controller=controller):
1361 self.log.warn(f"Model {model_name} doesn't exist")
1362 return
1363
1364 self.log.debug(f"Getting model {model_name} to be destroyed")
1365 model = await self.get_model(controller, model_name)
1366 self.log.debug(f"Destroying manual machines in model {model_name}")
1367 # Destroy machines that are manually provisioned
1368 # and still are in pending state
1369 await self._destroy_pending_machines(model, only_manual=True)
1370 await self.disconnect_model(model)
1371
1372 await asyncio.wait_for(
1373 self._destroy_model(model_name, controller),
1374 timeout=total_timeout,
1375 )
1376 except Exception as e:
1377 if not await self.model_exists(model_name, controller=controller):
1378 self.log.warn(
1379 f"Failed deleting model {model_name}: model doesn't exist"
1380 )
1381 return
1382 self.log.warn(f"Failed deleting model {model_name}: {e}")
1383 raise e
1384 finally:
1385 if model:
1386 await self.disconnect_model(model)
1387 await self.disconnect_controller(controller)
1388
1389 async def _destroy_model(
1390 self,
1391 model_name: str,
1392 controller: Controller,
1393 ):
1394 """
1395 Destroy model from controller
1396
1397 :param: model: Model name to be removed
1398 :param: controller: Controller object
1399 :param: timeout: Timeout in seconds
1400 """
1401 self.log.debug(f"Destroying model {model_name}")
1402
1403 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1404 self.log.info(f"Gracefully deleting model {model_name}")
1405 resolved = False
1406 while model_name in await controller.list_models():
1407 if not resolved:
1408 await self.resolve(model_name)
1409 resolved = True
1410 await controller.destroy_model(model_name, destroy_storage=True)
1411
1412 await asyncio.sleep(5)
1413 self.log.info(f"Model {model_name} deleted gracefully")
1414
1415 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1416 self.log.info(f"Forcefully deleting model {model_name}")
1417 while model_name in await controller.list_models():
1418 await controller.destroy_model(
1419 model_name, destroy_storage=True, force=True, max_wait=60
1420 )
1421 await asyncio.sleep(5)
1422 self.log.info(f"Model {model_name} deleted forcefully")
1423
1424 try:
1425 await asyncio.wait_for(
1426 _destroy_model_gracefully(model_name, controller), timeout=120
1427 )
1428 except asyncio.TimeoutError:
1429 await _destroy_model_forcefully(model_name, controller)
1430 except juju.errors.JujuError as e:
1431 if any("has been removed" in error for error in e.errors):
1432 return
1433 raise e
1434
1435 async def destroy_application(
1436 self, model_name: str, application_name: str, total_timeout: float
1437 ):
1438 """
1439 Destroy application
1440
1441 :param: model_name: Model name
1442 :param: application_name: Application name
1443 :param: total_timeout: Timeout
1444 """
1445
1446 controller = await self.get_controller()
1447 model = None
1448
1449 try:
1450 model = await self.get_model(controller, model_name)
1451 self.log.debug(
1452 "Destroying application {} in model {}".format(
1453 application_name, model_name
1454 )
1455 )
1456 application = self._get_application(model, application_name)
1457 if application:
1458 await application.destroy()
1459 else:
1460 self.log.warning("Application not found: {}".format(application_name))
1461
1462 self.log.debug(
1463 "Waiting for application {} to be destroyed in model {}...".format(
1464 application_name, model_name
1465 )
1466 )
1467 if total_timeout is None:
1468 total_timeout = 3600
1469 end = time.time() + total_timeout
1470 while time.time() < end:
1471 if not self._get_application(model, application_name):
1472 self.log.debug(
1473 "The application {} was destroyed in model {} ".format(
1474 application_name, model_name
1475 )
1476 )
1477 return
1478 await asyncio.sleep(5)
1479 raise Exception(
1480 "Timeout waiting for application {} to be destroyed in model {}".format(
1481 application_name, model_name
1482 )
1483 )
1484 finally:
1485 if model is not None:
1486 await self.disconnect_model(model)
1487 await self.disconnect_controller(controller)
1488
1489 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1490 """
1491 Destroy pending machines in a given model
1492
1493 :param: only_manual: Bool that indicates only manually provisioned
1494 machines should be destroyed (if True), or that
1495 all pending machines should be destroyed
1496 """
1497 status = await model.get_status()
1498 for machine_id in status.machines:
1499 machine_status = status.machines[machine_id]
1500 if machine_status.agent_status.status == "pending":
1501 if only_manual and not machine_status.instance_id.startswith("manual:"):
1502 break
1503 machine = model.machines[machine_id]
1504 await machine.destroy(force=True)
1505
1506 async def configure_application(
1507 self, model_name: str, application_name: str, config: dict = None
1508 ):
1509 """Configure application
1510
1511 :param: model_name: Model name
1512 :param: application_name: Application name
1513 :param: config: Config to apply to the charm
1514 """
1515 self.log.debug("Configuring application {}".format(application_name))
1516
1517 if config:
1518 controller = await self.get_controller()
1519 model = None
1520 try:
1521 model = await self.get_model(controller, model_name)
1522 application = self._get_application(
1523 model,
1524 application_name=application_name,
1525 )
1526 await application.set_config(config)
1527 finally:
1528 if model:
1529 await self.disconnect_model(model)
1530 await self.disconnect_controller(controller)
1531
1532 def handle_exception(self, loop, context):
1533 # All unhandled exceptions by libjuju are handled here.
1534 pass
1535
1536 async def health_check(self, interval: float = 300.0):
1537 """
1538 Health check to make sure controller and controller_model connections are OK
1539
1540 :param: interval: Time in seconds between checks
1541 """
1542 controller = None
1543 while True:
1544 try:
1545 controller = await self.get_controller()
1546 # self.log.debug("VCA is alive")
1547 except Exception as e:
1548 self.log.error("Health check to VCA failed: {}".format(e))
1549 finally:
1550 await self.disconnect_controller(controller)
1551 await asyncio.sleep(interval)
1552
1553 async def list_models(self, contains: str = None) -> [str]:
1554 """List models with certain names
1555
1556 :param: contains: String that is contained in model name
1557
1558 :retur: [models] Returns list of model names
1559 """
1560
1561 controller = await self.get_controller()
1562 try:
1563 models = await controller.list_models()
1564 if contains:
1565 models = [model for model in models if contains in model]
1566 return models
1567 finally:
1568 await self.disconnect_controller(controller)
1569
1570 async def _list_offers(
1571 self, model_name: str, offer_name: str = None
1572 ) -> QueryApplicationOffersResults:
1573 """
1574 List offers within a model
1575
1576 :param: model_name: Model name
1577 :param: offer_name: Offer name to filter.
1578
1579 :return: Returns application offers results in the model
1580 """
1581
1582 controller = await self.get_controller()
1583 try:
1584 offers = (await controller.list_offers(model_name)).results
1585 if offer_name:
1586 matching_offer = []
1587 for offer in offers:
1588 if offer.offer_name == offer_name:
1589 matching_offer.append(offer)
1590 break
1591 offers = matching_offer
1592 return offers
1593 finally:
1594 await self.disconnect_controller(controller)
1595
1596 async def add_k8s(
1597 self,
1598 name: str,
1599 rbac_id: str,
1600 token: str,
1601 client_cert_data: str,
1602 configuration: Configuration,
1603 storage_class: str,
1604 credential_name: str = None,
1605 ):
1606 """
1607 Add a Kubernetes cloud to the controller
1608
1609 Similar to the `juju add-k8s` command in the CLI
1610
1611 :param: name: Name for the K8s cloud
1612 :param: configuration: Kubernetes configuration object
1613 :param: storage_class: Storage Class to use in the cloud
1614 :param: credential_name: Storage Class to use in the cloud
1615 """
1616
1617 if not storage_class:
1618 raise Exception("storage_class must be a non-empty string")
1619 if not name:
1620 raise Exception("name must be a non-empty string")
1621 if not configuration:
1622 raise Exception("configuration must be provided")
1623
1624 endpoint = configuration.host
1625 credential = self.get_k8s_cloud_credential(
1626 configuration,
1627 client_cert_data,
1628 token,
1629 )
1630 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1631 cloud = client.Cloud(
1632 type_="kubernetes",
1633 auth_types=[credential.auth_type],
1634 endpoint=endpoint,
1635 ca_certificates=[client_cert_data],
1636 config={
1637 "operator-storage": storage_class,
1638 "workload-storage": storage_class,
1639 },
1640 )
1641
1642 return await self.add_cloud(
1643 name, cloud, credential, credential_name=credential_name
1644 )
1645
1646 def get_k8s_cloud_credential(
1647 self,
1648 configuration: Configuration,
1649 client_cert_data: str,
1650 token: str = None,
1651 ) -> client.CloudCredential:
1652 attrs = {}
1653 # TODO: Test with AKS
1654 key = None # open(configuration.key_file, "r").read()
1655 username = configuration.username
1656 password = configuration.password
1657
1658 if client_cert_data:
1659 attrs["ClientCertificateData"] = client_cert_data
1660 if key:
1661 attrs["ClientKeyData"] = key
1662 if token:
1663 if username or password:
1664 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1665 attrs["Token"] = token
1666
1667 auth_type = None
1668 if key:
1669 auth_type = "oauth2"
1670 if client_cert_data:
1671 auth_type = "oauth2withcert"
1672 if not token:
1673 raise JujuInvalidK8sConfiguration(
1674 "missing token for auth type {}".format(auth_type)
1675 )
1676 elif username:
1677 if not password:
1678 self.log.debug(
1679 "credential for user {} has empty password".format(username)
1680 )
1681 attrs["username"] = username
1682 attrs["password"] = password
1683 if client_cert_data:
1684 auth_type = "userpasswithcert"
1685 else:
1686 auth_type = "userpass"
1687 elif client_cert_data and token:
1688 auth_type = "certificate"
1689 else:
1690 raise JujuInvalidK8sConfiguration("authentication method not supported")
1691 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1692
1693 async def add_cloud(
1694 self,
1695 name: str,
1696 cloud: Cloud,
1697 credential: CloudCredential = None,
1698 credential_name: str = None,
1699 ) -> Cloud:
1700 """
1701 Add cloud to the controller
1702
1703 :param: name: Name of the cloud to be added
1704 :param: cloud: Cloud object
1705 :param: credential: CloudCredentials object for the cloud
1706 :param: credential_name: Credential name.
1707 If not defined, cloud of the name will be used.
1708 """
1709 controller = await self.get_controller()
1710 try:
1711 _ = await controller.add_cloud(name, cloud)
1712 if credential:
1713 await controller.add_credential(
1714 credential_name or name, credential=credential, cloud=name
1715 )
1716 # Need to return the object returned by the controller.add_cloud() function
1717 # I'm returning the original value now until this bug is fixed:
1718 # https://github.com/juju/python-libjuju/issues/443
1719 return cloud
1720 finally:
1721 await self.disconnect_controller(controller)
1722
1723 async def remove_cloud(self, name: str):
1724 """
1725 Remove cloud
1726
1727 :param: name: Name of the cloud to be removed
1728 """
1729 controller = await self.get_controller()
1730 try:
1731 await controller.remove_cloud(name)
1732 except juju.errors.JujuError as e:
1733 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1734 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1735 else:
1736 raise e
1737 finally:
1738 await self.disconnect_controller(controller)
1739
1740 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1741 async def _get_leader_unit(self, application: Application) -> Unit:
1742 unit = None
1743 for u in application.units:
1744 if await u.is_leader_from_status():
1745 unit = u
1746 break
1747 if not unit:
1748 raise Exception()
1749 return unit
1750
1751 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1752 """
1753 Get cloud credentials
1754
1755 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1756
1757 :return: List of credentials object associated to the specified cloud
1758
1759 """
1760 controller = await self.get_controller()
1761 try:
1762 facade = client.CloudFacade.from_connection(controller.connection())
1763 cloud_cred_tag = tag.credential(
1764 cloud.name, self.vca_connection.data.user, cloud.credential_name
1765 )
1766 params = [client.Entity(cloud_cred_tag)]
1767 return (await facade.Credential(params)).results
1768 finally:
1769 await self.disconnect_controller(controller)
1770
1771 async def check_application_exists(self, model_name, application_name) -> bool:
1772 """Check application exists
1773
1774 :param: model_name: Model Name
1775 :param: application_name: Application Name
1776
1777 :return: Boolean
1778 """
1779
1780 model = None
1781 controller = await self.get_controller()
1782 try:
1783 model = await self.get_model(controller, model_name)
1784 self.log.debug(
1785 "Checking if application {} exists in model {}".format(
1786 application_name, model_name
1787 )
1788 )
1789 return self._get_application(model, application_name) is not None
1790 finally:
1791 if model:
1792 await self.disconnect_model(model)
1793 await self.disconnect_controller(controller)