Add logs
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import typing
18
19 import time
20
21 import juju.errors
22 from juju.model import Model
23 from juju.machine import Machine
24 from juju.application import Application
25 from juju.unit import Unit
26 from juju.client._definitions import (
27 FullStatus,
28 QueryApplicationOffersResults,
29 Cloud,
30 CloudCredential,
31 )
32 from juju.controller import Controller
33 from juju.client import client
34 from juju import tag
35
36 from n2vc.definitions import Offer, RelationEndpoint
37 from n2vc.juju_watcher import JujuModelWatcher
38 from n2vc.provisioner import AsyncSSHProvisioner
39 from n2vc.n2vc_conn import N2VCConnector
40 from n2vc.exceptions import (
41 JujuMachineNotFound,
42 JujuApplicationNotFound,
43 JujuLeaderUnitNotFound,
44 JujuActionNotFound,
45 JujuControllerFailedConnecting,
46 JujuApplicationExists,
47 JujuInvalidK8sConfiguration,
48 JujuError,
49 )
50 from n2vc.vca.cloud import Cloud as VcaCloud
51 from n2vc.vca.connection import Connection
52 from kubernetes.client.configuration import Configuration
53 from retrying_async import retry
54
55
56 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 class Libjuju:
60 def __init__(
61 self,
62 vca_connection: Connection,
63 loop: asyncio.AbstractEventLoop = None,
64 log: logging.Logger = None,
65 n2vc: N2VCConnector = None,
66 ):
67 """
68 Constructor
69
70 :param: vca_connection: n2vc.vca.connection object
71 :param: loop: Asyncio loop
72 :param: log: Logger
73 :param: n2vc: N2VC object
74 """
75
76 self.log = log or logging.getLogger("Libjuju")
77 self.n2vc = n2vc
78 self.vca_connection = vca_connection
79
80 self.loop = loop or asyncio.get_event_loop()
81 self.loop.set_exception_handler(self.handle_exception)
82 self.creating_model = asyncio.Lock(loop=self.loop)
83
84 if self.vca_connection.is_default:
85 self.health_check_task = self._create_health_check_task()
86
87 def _create_health_check_task(self):
88 return self.loop.create_task(self.health_check())
89
90 async def get_controller(self, timeout: float = 60.0) -> Controller:
91 """
92 Get controller
93
94 :param: timeout: Time in seconds to wait for controller to connect
95 """
96 controller = None
97 try:
98 controller = Controller()
99 await asyncio.wait_for(
100 controller.connect(
101 endpoint=self.vca_connection.data.endpoints,
102 username=self.vca_connection.data.user,
103 password=self.vca_connection.data.secret,
104 cacert=self.vca_connection.data.cacert,
105 ),
106 timeout=timeout,
107 )
108 if self.vca_connection.is_default:
109 endpoints = await controller.api_endpoints
110 if not all(
111 endpoint in self.vca_connection.endpoints for endpoint in endpoints
112 ):
113 await self.vca_connection.update_endpoints(endpoints)
114 return controller
115 except asyncio.CancelledError as e:
116 raise e
117 except Exception as e:
118 self.log.error(
119 "Failed connecting to controller: {}... {}".format(
120 self.vca_connection.data.endpoints, e
121 )
122 )
123 if controller:
124 await self.disconnect_controller(controller)
125 raise JujuControllerFailedConnecting(e)
126
127 async def disconnect(self):
128 """Disconnect"""
129 # Cancel health check task
130 self.health_check_task.cancel()
131 self.log.debug("Libjuju disconnected!")
132
133 async def disconnect_model(self, model: Model):
134 """
135 Disconnect model
136
137 :param: model: Model that will be disconnected
138 """
139 await model.disconnect()
140
141 async def disconnect_controller(self, controller: Controller):
142 """
143 Disconnect controller
144
145 :param: controller: Controller that will be disconnected
146 """
147 if controller:
148 await controller.disconnect()
149
150 @retry(attempts=3, delay=5, timeout=None)
151 async def add_model(self, model_name: str, cloud: VcaCloud):
152 """
153 Create model
154
155 :param: model_name: Model name
156 :param: cloud: Cloud object
157 """
158
159 # Get controller
160 controller = await self.get_controller()
161 model = None
162 try:
163 # Block until other workers have finished model creation
164 while self.creating_model.locked():
165 await asyncio.sleep(0.1)
166
167 # Create the model
168 async with self.creating_model:
169 if await self.model_exists(model_name, controller=controller):
170 return
171 self.log.debug("Creating model {}".format(model_name))
172 model = await controller.add_model(
173 model_name,
174 config=self.vca_connection.data.model_config,
175 cloud_name=cloud.name,
176 credential_name=cloud.credential_name,
177 )
178 except juju.errors.JujuAPIError as e:
179 if "already exists" in e.message:
180 pass
181 else:
182 raise e
183 finally:
184 if model:
185 await self.disconnect_model(model)
186 await self.disconnect_controller(controller)
187
188 async def get_executed_actions(self, model_name: str) -> list:
189 """
190 Get executed/history of actions for a model.
191
192 :param: model_name: Model name, str.
193 :return: List of executed actions for a model.
194 """
195 model = None
196 executed_actions = []
197 controller = await self.get_controller()
198 try:
199 model = await self.get_model(controller, model_name)
200 # Get all unique action names
201 actions = {}
202 for application in model.applications:
203 application_actions = await self.get_actions(application, model_name)
204 actions.update(application_actions)
205 # Get status of all actions
206 for application_action in actions:
207 app_action_status_list = await model.get_action_status(
208 name=application_action
209 )
210 for action_id, action_status in app_action_status_list.items():
211 executed_action = {
212 "id": action_id,
213 "action": application_action,
214 "status": action_status,
215 }
216 # Get action output by id
217 action_status = await model.get_action_output(executed_action["id"])
218 for k, v in action_status.items():
219 executed_action[k] = v
220 executed_actions.append(executed_action)
221 except Exception as e:
222 raise JujuError(
223 "Error in getting executed actions for model: {}. Error: {}".format(
224 model_name, str(e)
225 )
226 )
227 finally:
228 if model:
229 await self.disconnect_model(model)
230 await self.disconnect_controller(controller)
231 return executed_actions
232
233 async def get_application_configs(
234 self, model_name: str, application_name: str
235 ) -> dict:
236 """
237 Get available configs for an application.
238
239 :param: model_name: Model name, str.
240 :param: application_name: Application name, str.
241
242 :return: A dict which has key - action name, value - action description
243 """
244 model = None
245 application_configs = {}
246 controller = await self.get_controller()
247 try:
248 model = await self.get_model(controller, model_name)
249 application = self._get_application(
250 model, application_name=application_name
251 )
252 application_configs = await application.get_config()
253 except Exception as e:
254 raise JujuError(
255 "Error in getting configs for application: {} in model: {}. Error: {}".format(
256 application_name, model_name, str(e)
257 )
258 )
259 finally:
260 if model:
261 await self.disconnect_model(model)
262 await self.disconnect_controller(controller)
263 return application_configs
264
265 @retry(attempts=3, delay=5)
266 async def get_model(self, controller: Controller, model_name: str) -> Model:
267 """
268 Get model from controller
269
270 :param: controller: Controller
271 :param: model_name: Model name
272
273 :return: Model: The created Juju model object
274 """
275 return await controller.get_model(model_name)
276
277 async def model_exists(
278 self, model_name: str, controller: Controller = None
279 ) -> bool:
280 """
281 Check if model exists
282
283 :param: controller: Controller
284 :param: model_name: Model name
285
286 :return bool
287 """
288 need_to_disconnect = False
289
290 # Get controller if not passed
291 if not controller:
292 controller = await self.get_controller()
293 need_to_disconnect = True
294
295 # Check if model exists
296 try:
297 return model_name in await controller.list_models()
298 finally:
299 if need_to_disconnect:
300 await self.disconnect_controller(controller)
301
302 async def models_exist(self, model_names: [str]) -> (bool, list):
303 """
304 Check if models exists
305
306 :param: model_names: List of strings with model names
307
308 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
309 """
310 if not model_names:
311 raise Exception(
312 "model_names must be a non-empty array. Given value: {}".format(
313 model_names
314 )
315 )
316 non_existing_models = []
317 models = await self.list_models()
318 existing_models = list(set(models).intersection(model_names))
319 non_existing_models = list(set(model_names) - set(existing_models))
320
321 return (
322 len(non_existing_models) == 0,
323 non_existing_models,
324 )
325
326 async def get_model_status(self, model_name: str) -> FullStatus:
327 """
328 Get model status
329
330 :param: model_name: Model name
331
332 :return: Full status object
333 """
334 controller = await self.get_controller()
335 model = await self.get_model(controller, model_name)
336 try:
337 return await model.get_status()
338 finally:
339 await self.disconnect_model(model)
340 await self.disconnect_controller(controller)
341
342 async def create_machine(
343 self,
344 model_name: str,
345 machine_id: str = None,
346 db_dict: dict = None,
347 progress_timeout: float = None,
348 total_timeout: float = None,
349 series: str = "bionic",
350 wait: bool = True,
351 ) -> (Machine, bool):
352 """
353 Create machine
354
355 :param: model_name: Model name
356 :param: machine_id: Machine id
357 :param: db_dict: Dictionary with data of the DB to write the updates
358 :param: progress_timeout: Maximum time between two updates in the model
359 :param: total_timeout: Timeout for the entity to be active
360 :param: series: Series of the machine (xenial, bionic, focal, ...)
361 :param: wait: Wait until machine is ready
362
363 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
364 if the machine is new or it already existed
365 """
366 new = False
367 machine = None
368
369 self.log.debug(
370 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
371 )
372
373 # Get controller
374 controller = await self.get_controller()
375
376 # Get model
377 model = await self.get_model(controller, model_name)
378 try:
379 if machine_id is not None:
380 self.log.debug(
381 "Searching machine (id={}) in model {}".format(
382 machine_id, model_name
383 )
384 )
385
386 # Get machines from model and get the machine with machine_id if exists
387 machines = await model.get_machines()
388 if machine_id in machines:
389 self.log.debug(
390 "Machine (id={}) found in model {}".format(
391 machine_id, model_name
392 )
393 )
394 machine = machines[machine_id]
395 else:
396 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
397
398 if machine is None:
399 self.log.debug("Creating a new machine in model {}".format(model_name))
400
401 # Create machine
402 machine = await model.add_machine(
403 spec=None, constraints=None, disks=None, series=series
404 )
405 new = True
406
407 # Wait until the machine is ready
408 self.log.debug(
409 "Wait until machine {} is ready in model {}".format(
410 machine.entity_id, model_name
411 )
412 )
413 if wait:
414 await JujuModelWatcher.wait_for(
415 model=model,
416 entity=machine,
417 progress_timeout=progress_timeout,
418 total_timeout=total_timeout,
419 db_dict=db_dict,
420 n2vc=self.n2vc,
421 vca_id=self.vca_connection._vca_id,
422 )
423 finally:
424 await self.disconnect_model(model)
425 await self.disconnect_controller(controller)
426
427 self.log.debug(
428 "Machine {} ready at {} in model {}".format(
429 machine.entity_id, machine.dns_name, model_name
430 )
431 )
432 return machine, new
433
434 async def provision_machine(
435 self,
436 model_name: str,
437 hostname: str,
438 username: str,
439 private_key_path: str,
440 db_dict: dict = None,
441 progress_timeout: float = None,
442 total_timeout: float = None,
443 ) -> str:
444 """
445 Manually provisioning of a machine
446
447 :param: model_name: Model name
448 :param: hostname: IP to access the machine
449 :param: username: Username to login to the machine
450 :param: private_key_path: Local path for the private key
451 :param: db_dict: Dictionary with data of the DB to write the updates
452 :param: progress_timeout: Maximum time between two updates in the model
453 :param: total_timeout: Timeout for the entity to be active
454
455 :return: (Entity): Machine id
456 """
457 self.log.debug(
458 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
459 model_name, hostname, username
460 )
461 )
462
463 # Get controller
464 controller = await self.get_controller()
465
466 # Get model
467 model = await self.get_model(controller, model_name)
468
469 try:
470 # Get provisioner
471 provisioner = AsyncSSHProvisioner(
472 host=hostname,
473 user=username,
474 private_key_path=private_key_path,
475 log=self.log,
476 )
477
478 # Provision machine
479 params = await provisioner.provision_machine()
480
481 params.jobs = ["JobHostUnits"]
482
483 self.log.debug("Adding machine to model")
484 connection = model.connection()
485 client_facade = client.ClientFacade.from_connection(connection)
486
487 results = await client_facade.AddMachines(params=[params])
488 error = results.machines[0].error
489
490 if error:
491 msg = "Error adding machine: {}".format(error.message)
492 self.log.error(msg=msg)
493 raise ValueError(msg)
494
495 machine_id = results.machines[0].machine
496
497 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
498 asyncio.ensure_future(
499 provisioner.install_agent(
500 connection=connection,
501 nonce=params.nonce,
502 machine_id=machine_id,
503 proxy=self.vca_connection.data.api_proxy,
504 series=params.series,
505 )
506 )
507
508 machine = None
509 for _ in range(10):
510 machine_list = await model.get_machines()
511 if machine_id in machine_list:
512 self.log.debug("Machine {} found in model!".format(machine_id))
513 machine = model.machines.get(machine_id)
514 break
515 await asyncio.sleep(2)
516
517 if machine is None:
518 msg = "Machine {} not found in model".format(machine_id)
519 self.log.error(msg=msg)
520 raise JujuMachineNotFound(msg)
521
522 self.log.debug(
523 "Wait until machine {} is ready in model {}".format(
524 machine.entity_id, model_name
525 )
526 )
527 await JujuModelWatcher.wait_for(
528 model=model,
529 entity=machine,
530 progress_timeout=progress_timeout,
531 total_timeout=total_timeout,
532 db_dict=db_dict,
533 n2vc=self.n2vc,
534 vca_id=self.vca_connection._vca_id,
535 )
536 except Exception as e:
537 raise e
538 finally:
539 await self.disconnect_model(model)
540 await self.disconnect_controller(controller)
541
542 self.log.debug(
543 "Machine provisioned {} in model {}".format(machine_id, model_name)
544 )
545
546 return machine_id
547
548 async def deploy(
549 self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
550 ):
551 """
552 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
553
554 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
555 :param: model_name: Model name
556 :param: wait: Indicates whether to wait or not until all applications are active
557 :param: timeout: Time in seconds to wait until all applications are active
558 """
559 controller = await self.get_controller()
560 model = await self.get_model(controller, model_name)
561 try:
562 await model.deploy(uri, trust=True)
563 if wait:
564 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
565 self.log.debug("All units active in model {}".format(model_name))
566 finally:
567 await self.disconnect_model(model)
568 await self.disconnect_controller(controller)
569
570 async def add_unit(
571 self,
572 application_name: str,
573 model_name: str,
574 machine_id: str,
575 db_dict: dict = None,
576 progress_timeout: float = None,
577 total_timeout: float = None,
578 ):
579 """Add unit
580
581 :param: application_name: Application name
582 :param: model_name: Model name
583 :param: machine_id Machine id
584 :param: db_dict: Dictionary with data of the DB to write the updates
585 :param: progress_timeout: Maximum time between two updates in the model
586 :param: total_timeout: Timeout for the entity to be active
587
588 :return: None
589 """
590
591 model = None
592 controller = await self.get_controller()
593 try:
594 model = await self.get_model(controller, model_name)
595 application = self._get_application(model, application_name)
596
597 if application is not None:
598
599 # Checks if the given machine id in the model,
600 # otherwise function raises an error
601 _machine, _series = self._get_machine_info(model, machine_id)
602
603 self.log.debug(
604 "Adding unit (machine {}) to application {} in model ~{}".format(
605 machine_id, application_name, model_name
606 )
607 )
608
609 await application.add_unit(to=machine_id)
610
611 await JujuModelWatcher.wait_for(
612 model=model,
613 entity=application,
614 progress_timeout=progress_timeout,
615 total_timeout=total_timeout,
616 db_dict=db_dict,
617 n2vc=self.n2vc,
618 vca_id=self.vca_connection._vca_id,
619 )
620 self.log.debug(
621 "Unit is added to application {} in model {}".format(
622 application_name, model_name
623 )
624 )
625 else:
626 raise JujuApplicationNotFound(
627 "Application {} not exists".format(application_name)
628 )
629 finally:
630 if model:
631 await self.disconnect_model(model)
632 await self.disconnect_controller(controller)
633
634 async def destroy_unit(
635 self,
636 application_name: str,
637 model_name: str,
638 machine_id: str,
639 total_timeout: float = None,
640 ):
641 """Destroy unit
642
643 :param: application_name: Application name
644 :param: model_name: Model name
645 :param: machine_id Machine id
646 :param: total_timeout: Timeout for the entity to be active
647
648 :return: None
649 """
650
651 model = None
652 controller = await self.get_controller()
653 try:
654 model = await self.get_model(controller, model_name)
655 application = self._get_application(model, application_name)
656
657 if application is None:
658 raise JujuApplicationNotFound(
659 "Application not found: {} (model={})".format(
660 application_name, model_name
661 )
662 )
663
664 unit = self._get_unit(application, machine_id)
665 if not unit:
666 raise JujuError(
667 "A unit with machine id {} not in available units".format(
668 machine_id
669 )
670 )
671
672 unit_name = unit.name
673
674 self.log.debug(
675 "Destroying unit {} from application {} in model {}".format(
676 unit_name, application_name, model_name
677 )
678 )
679 await application.destroy_unit(unit_name)
680
681 self.log.debug(
682 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
683 unit_name, application_name, model_name
684 )
685 )
686
687 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
688 if total_timeout is None:
689 total_timeout = 3600
690 end = time.time() + total_timeout
691 while time.time() < end:
692 if not self._get_unit(application, machine_id):
693 self.log.debug(
694 "The unit {} was destroyed in application {} (model={}) ".format(
695 unit_name, application_name, model_name
696 )
697 )
698 return
699 await asyncio.sleep(5)
700 self.log.debug(
701 "Unit {} is destroyed from application {} in model {}".format(
702 unit_name, application_name, model_name
703 )
704 )
705 finally:
706 if model:
707 await self.disconnect_model(model)
708 await self.disconnect_controller(controller)
709
710 async def deploy_charm(
711 self,
712 application_name: str,
713 path: str,
714 model_name: str,
715 machine_id: str,
716 db_dict: dict = None,
717 progress_timeout: float = None,
718 total_timeout: float = None,
719 config: dict = None,
720 series: str = None,
721 num_units: int = 1,
722 ):
723 """Deploy charm
724
725 :param: application_name: Application name
726 :param: path: Local path to the charm
727 :param: model_name: Model name
728 :param: machine_id ID of the machine
729 :param: db_dict: Dictionary with data of the DB to write the updates
730 :param: progress_timeout: Maximum time between two updates in the model
731 :param: total_timeout: Timeout for the entity to be active
732 :param: config: Config for the charm
733 :param: series: Series of the charm
734 :param: num_units: Number of units
735
736 :return: (juju.application.Application): Juju application
737 """
738 self.log.debug(
739 "Deploying charm {} to machine {} in model ~{}".format(
740 application_name, machine_id, model_name
741 )
742 )
743 self.log.debug("charm: {}".format(path))
744
745 # Get controller
746 controller = await self.get_controller()
747
748 # Get model
749 model = await self.get_model(controller, model_name)
750
751 try:
752 if application_name not in model.applications:
753
754 if machine_id is not None:
755 machine, series = self._get_machine_info(model, machine_id)
756
757 application = await model.deploy(
758 entity_url=path,
759 application_name=application_name,
760 channel="stable",
761 num_units=1,
762 series=series,
763 to=machine_id,
764 config=config,
765 )
766
767 self.log.debug(
768 "Wait until application {} is ready in model {}".format(
769 application_name, model_name
770 )
771 )
772 if num_units > 1:
773 for _ in range(num_units - 1):
774 m, _ = await self.create_machine(model_name, wait=False)
775 await application.add_unit(to=m.entity_id)
776
777 await JujuModelWatcher.wait_for(
778 model=model,
779 entity=application,
780 progress_timeout=progress_timeout,
781 total_timeout=total_timeout,
782 db_dict=db_dict,
783 n2vc=self.n2vc,
784 vca_id=self.vca_connection._vca_id,
785 )
786 self.log.debug(
787 "Application {} is ready in model {}".format(
788 application_name, model_name
789 )
790 )
791 else:
792 raise JujuApplicationExists(
793 "Application {} exists".format(application_name)
794 )
795 except juju.errors.JujuError as e:
796 if "already exists" in e.message:
797 raise JujuApplicationExists(
798 "Application {} exists".format(application_name)
799 )
800 else:
801 raise e
802 finally:
803 await self.disconnect_model(model)
804 await self.disconnect_controller(controller)
805
806 return application
807
808 async def upgrade_charm(
809 self,
810 application_name: str,
811 path: str,
812 model_name: str,
813 total_timeout: float = None,
814 **kwargs,
815 ):
816 """Upgrade Charm
817
818 :param: application_name: Application name
819 :param: model_name: Model name
820 :param: path: Local path to the charm
821 :param: total_timeout: Timeout for the entity to be active
822
823 :return: (str, str): (output and status)
824 """
825
826 self.log.debug(
827 "Upgrading charm {} in model {} from path {}".format(
828 application_name, model_name, path
829 )
830 )
831
832 await self.resolve_application(
833 model_name=model_name, application_name=application_name
834 )
835
836 # Get controller
837 controller = await self.get_controller()
838
839 # Get model
840 model = await self.get_model(controller, model_name)
841
842 try:
843 # Get application
844 application = self._get_application(
845 model,
846 application_name=application_name,
847 )
848 if application is None:
849 raise JujuApplicationNotFound(
850 "Cannot find application {} to upgrade".format(application_name)
851 )
852
853 await application.refresh(path=path)
854
855 self.log.debug(
856 "Wait until charm upgrade is completed for application {} (model={})".format(
857 application_name, model_name
858 )
859 )
860
861 await JujuModelWatcher.ensure_units_idle(
862 model=model, application=application
863 )
864
865 if application.status == "error":
866 error_message = "Unknown"
867 for unit in application.units:
868 if (
869 unit.workload_status == "error"
870 and unit.workload_status_message != ""
871 ):
872 error_message = unit.workload_status_message
873
874 message = "Application {} failed update in {}: {}".format(
875 application_name, model_name, error_message
876 )
877 self.log.error(message)
878 raise JujuError(message=message)
879
880 self.log.debug(
881 "Application {} is ready in model {}".format(
882 application_name, model_name
883 )
884 )
885
886 finally:
887 await self.disconnect_model(model)
888 await self.disconnect_controller(controller)
889
890 return application
891
892 async def resolve_application(self, model_name: str, application_name: str):
893
894 controller = await self.get_controller()
895 model = await self.get_model(controller, model_name)
896
897 try:
898 application = self._get_application(
899 model,
900 application_name=application_name,
901 )
902 if application is None:
903 raise JujuApplicationNotFound(
904 "Cannot find application {} to resolve".format(application_name)
905 )
906
907 while application.status == "error":
908 for unit in application.units:
909 if unit.workload_status == "error":
910 self.log.debug(
911 "Model {}, Application {}, Unit {} in error state, resolving".format(
912 model_name, application_name, unit.entity_id
913 )
914 )
915 try:
916 await unit.resolved(retry=False)
917 except Exception:
918 pass
919
920 await asyncio.sleep(1)
921
922 finally:
923 await self.disconnect_model(model)
924 await self.disconnect_controller(controller)
925
926 async def scale_application(
927 self,
928 model_name: str,
929 application_name: str,
930 scale: int = 1,
931 total_timeout: float = None,
932 ):
933 """
934 Scale application (K8s)
935
936 :param: model_name: Model name
937 :param: application_name: Application name
938 :param: scale: Scale to which to set this application
939 :param: total_timeout: Timeout for the entity to be active
940 """
941
942 model = None
943 controller = await self.get_controller()
944 try:
945 model = await self.get_model(controller, model_name)
946
947 self.log.debug(
948 "Scaling application {} in model {}".format(
949 application_name, model_name
950 )
951 )
952 application = self._get_application(model, application_name)
953 if application is None:
954 raise JujuApplicationNotFound("Cannot scale application")
955 await application.scale(scale=scale)
956 # Wait until application is scaled in model
957 self.log.debug(
958 "Waiting for application {} to be scaled in model {}...".format(
959 application_name, model_name
960 )
961 )
962 if total_timeout is None:
963 total_timeout = 1800
964 end = time.time() + total_timeout
965 while time.time() < end:
966 application_scale = self._get_application_count(model, application_name)
967 # Before calling wait_for_model function,
968 # wait until application unit count and scale count are equal.
969 # Because there is a delay before scaling triggers in Juju model.
970 if application_scale == scale:
971 await JujuModelWatcher.wait_for_model(
972 model=model, timeout=total_timeout
973 )
974 self.log.debug(
975 "Application {} is scaled in model {}".format(
976 application_name, model_name
977 )
978 )
979 return
980 await asyncio.sleep(5)
981 raise Exception(
982 "Timeout waiting for application {} in model {} to be scaled".format(
983 application_name, model_name
984 )
985 )
986 finally:
987 if model:
988 await self.disconnect_model(model)
989 await self.disconnect_controller(controller)
990
991 def _get_application_count(self, model: Model, application_name: str) -> int:
992 """Get number of units of the application
993
994 :param: model: Model object
995 :param: application_name: Application name
996
997 :return: int (or None if application doesn't exist)
998 """
999 application = self._get_application(model, application_name)
1000 if application is not None:
1001 return len(application.units)
1002
1003 def _get_application(self, model: Model, application_name: str) -> Application:
1004 """Get application
1005
1006 :param: model: Model object
1007 :param: application_name: Application name
1008
1009 :return: juju.application.Application (or None if it doesn't exist)
1010 """
1011 if model.applications and application_name in model.applications:
1012 return model.applications[application_name]
1013
1014 def _get_unit(self, application: Application, machine_id: str) -> Unit:
1015 """Get unit
1016
1017 :param: application: Application object
1018 :param: machine_id: Machine id
1019
1020 :return: Unit
1021 """
1022 unit = None
1023 for u in application.units:
1024 if u.machine_id == machine_id:
1025 unit = u
1026 break
1027 return unit
1028
1029 def _get_machine_info(
1030 self,
1031 model,
1032 machine_id: str,
1033 ) -> (str, str):
1034 """Get machine info
1035
1036 :param: model: Model object
1037 :param: machine_id: Machine id
1038
1039 :return: (str, str): (machine, series)
1040 """
1041 if machine_id not in model.machines:
1042 msg = "Machine {} not found in model".format(machine_id)
1043 self.log.error(msg=msg)
1044 raise JujuMachineNotFound(msg)
1045 machine = model.machines[machine_id]
1046 return machine, machine.series
1047
1048 async def execute_action(
1049 self,
1050 application_name: str,
1051 model_name: str,
1052 action_name: str,
1053 db_dict: dict = None,
1054 machine_id: str = None,
1055 progress_timeout: float = None,
1056 total_timeout: float = None,
1057 **kwargs,
1058 ):
1059 """Execute action
1060
1061 :param: application_name: Application name
1062 :param: model_name: Model name
1063 :param: action_name: Name of the action
1064 :param: db_dict: Dictionary with data of the DB to write the updates
1065 :param: machine_id Machine id
1066 :param: progress_timeout: Maximum time between two updates in the model
1067 :param: total_timeout: Timeout for the entity to be active
1068
1069 :return: (str, str): (output and status)
1070 """
1071 self.log.debug(
1072 "Executing action {} using params {}".format(action_name, kwargs)
1073 )
1074 # Get controller
1075 controller = await self.get_controller()
1076
1077 # Get model
1078 model = await self.get_model(controller, model_name)
1079
1080 try:
1081 # Get application
1082 application = self._get_application(
1083 model,
1084 application_name=application_name,
1085 )
1086 if application is None:
1087 raise JujuApplicationNotFound("Cannot execute action")
1088 # Racing condition:
1089 # Ocassionally, self._get_leader_unit() will return None
1090 # because the leader elected hook has not been triggered yet.
1091 # Therefore, we are doing some retries. If it happens again,
1092 # re-open bug 1236
1093 if machine_id is None:
1094 unit = await self._get_leader_unit(application)
1095 self.log.debug(
1096 "Action {} is being executed on the leader unit {}".format(
1097 action_name, unit.name
1098 )
1099 )
1100 else:
1101 unit = self._get_unit(application, machine_id)
1102 if not unit:
1103 raise JujuError(
1104 "A unit with machine id {} not in available units".format(
1105 machine_id
1106 )
1107 )
1108 self.log.debug(
1109 "Action {} is being executed on {} unit".format(
1110 action_name, unit.name
1111 )
1112 )
1113
1114 actions = await application.get_actions()
1115
1116 if action_name not in actions:
1117 raise JujuActionNotFound(
1118 "Action {} not in available actions".format(action_name)
1119 )
1120
1121 action = await unit.run_action(action_name, **kwargs)
1122
1123 self.log.debug(
1124 "Wait until action {} is completed in application {} (model={})".format(
1125 action_name, application_name, model_name
1126 )
1127 )
1128 await JujuModelWatcher.wait_for(
1129 model=model,
1130 entity=action,
1131 progress_timeout=progress_timeout,
1132 total_timeout=total_timeout,
1133 db_dict=db_dict,
1134 n2vc=self.n2vc,
1135 vca_id=self.vca_connection._vca_id,
1136 )
1137
1138 output = await model.get_action_output(action_uuid=action.entity_id)
1139 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1140 status = (
1141 status[action.entity_id] if action.entity_id in status else "failed"
1142 )
1143
1144 self.log.debug(
1145 "Action {} completed with status {} in application {} (model={})".format(
1146 action_name, action.status, application_name, model_name
1147 )
1148 )
1149 finally:
1150 await self.disconnect_model(model)
1151 await self.disconnect_controller(controller)
1152
1153 return output, status
1154
1155 async def get_actions(self, application_name: str, model_name: str) -> dict:
1156 """Get list of actions
1157
1158 :param: application_name: Application name
1159 :param: model_name: Model name
1160
1161 :return: Dict with this format
1162 {
1163 "action_name": "Description of the action",
1164 ...
1165 }
1166 """
1167 self.log.debug(
1168 "Getting list of actions for application {}".format(application_name)
1169 )
1170
1171 # Get controller
1172 controller = await self.get_controller()
1173
1174 # Get model
1175 model = await self.get_model(controller, model_name)
1176
1177 try:
1178 # Get application
1179 application = self._get_application(
1180 model,
1181 application_name=application_name,
1182 )
1183
1184 # Return list of actions
1185 return await application.get_actions()
1186
1187 finally:
1188 # Disconnect from model and controller
1189 await self.disconnect_model(model)
1190 await self.disconnect_controller(controller)
1191
1192 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1193 """Get the metrics collected by the VCA.
1194
1195 :param model_name The name or unique id of the network service
1196 :param application_name The name of the application
1197 """
1198 if not model_name or not application_name:
1199 raise Exception("model_name and application_name must be non-empty strings")
1200 metrics = {}
1201 controller = await self.get_controller()
1202 model = await self.get_model(controller, model_name)
1203 try:
1204 application = self._get_application(model, application_name)
1205 if application is not None:
1206 metrics = await application.get_metrics()
1207 finally:
1208 self.disconnect_model(model)
1209 self.disconnect_controller(controller)
1210 return metrics
1211
1212 async def add_relation(
1213 self,
1214 model_name: str,
1215 endpoint_1: str,
1216 endpoint_2: str,
1217 ):
1218 """Add relation
1219
1220 :param: model_name: Model name
1221 :param: endpoint_1 First endpoint name
1222 ("app:endpoint" format or directly the saas name)
1223 :param: endpoint_2: Second endpoint name (^ same format)
1224 """
1225
1226 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1227
1228 # Get controller
1229 controller = await self.get_controller()
1230
1231 # Get model
1232 model = await self.get_model(controller, model_name)
1233
1234 # Add relation
1235 try:
1236 await model.add_relation(endpoint_1, endpoint_2)
1237 except juju.errors.JujuAPIError as e:
1238 if "not found" in e.message:
1239 self.log.warning("Relation not found: {}".format(e.message))
1240 return
1241 if "already exists" in e.message:
1242 self.log.warning("Relation already exists: {}".format(e.message))
1243 return
1244 # another exception, raise it
1245 raise e
1246 finally:
1247 await self.disconnect_model(model)
1248 await self.disconnect_controller(controller)
1249
1250 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1251 """
1252 Create an offer from a RelationEndpoint
1253
1254 :param: endpoint: Relation endpoint
1255
1256 :return: Offer object
1257 """
1258 model_name = endpoint.model_name
1259 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1260 controller = await self.get_controller()
1261 model = None
1262 try:
1263 model = await self.get_model(controller, model_name)
1264 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1265 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1266 if offer_list:
1267 return Offer(offer_list[0].offer_url)
1268 else:
1269 raise Exception("offer was not created")
1270 except juju.errors.JujuError as e:
1271 if "application offer already exists" not in e.message:
1272 raise e
1273 finally:
1274 if model:
1275 self.disconnect_model(model)
1276 self.disconnect_controller(controller)
1277
1278 async def consume(
1279 self,
1280 model_name: str,
1281 offer: Offer,
1282 provider_libjuju: "Libjuju",
1283 ) -> str:
1284 """
1285 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1286
1287 :param: model_name: Model name
1288 :param: offer: Offer object to consume
1289 :param: provider_libjuju: Libjuju object of the provider endpoint
1290
1291 :raises ParseError if there's a problem parsing the offer_url
1292 :raises JujuError if remote offer includes and endpoint
1293 :raises JujuAPIError if the operation is not successful
1294
1295 :returns: Saas name. It is the application name in the model that reference the remote application.
1296 """
1297 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1298 if offer.vca_id:
1299 saas_name = f"{saas_name}-{offer.vca_id}"
1300 controller = await self.get_controller()
1301 model = None
1302 provider_controller = None
1303 try:
1304 model = await controller.get_model(model_name)
1305 provider_controller = await provider_libjuju.get_controller()
1306 await model.consume(
1307 offer.url, application_alias=saas_name, controller=provider_controller
1308 )
1309 return saas_name
1310 finally:
1311 if model:
1312 await self.disconnect_model(model)
1313 if provider_controller:
1314 await provider_libjuju.disconnect_controller(provider_controller)
1315 await self.disconnect_controller(controller)
1316
1317 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1318 """
1319 Destroy model
1320
1321 :param: model_name: Model name
1322 :param: total_timeout: Timeout
1323 """
1324
1325 controller = await self.get_controller()
1326 model = None
1327 try:
1328 if not await self.model_exists(model_name, controller=controller):
1329 self.log.warn(f"Model {model_name} doesn't exist")
1330 return
1331
1332 self.log.debug(f"Getting model {model_name} to be destroyed")
1333 model = await self.get_model(controller, model_name)
1334 self.log.debug(f"Destroying manual machines in model {model_name}")
1335 # Destroy machines that are manually provisioned
1336 # and still are in pending state
1337 await self._destroy_pending_machines(model, only_manual=True)
1338 await self.disconnect_model(model)
1339
1340 await self._destroy_model(
1341 model_name,
1342 controller,
1343 timeout=total_timeout,
1344 )
1345 except Exception as e:
1346 if not await self.model_exists(model_name, controller=controller):
1347 self.log.warn(
1348 f"Failed deleting model {model_name}: model doesn't exist"
1349 )
1350 return
1351 self.log.warn(f"Failed deleting model {model_name}: {e}")
1352 raise e
1353 finally:
1354 if model:
1355 await self.disconnect_model(model)
1356 await self.disconnect_controller(controller)
1357
1358 async def _destroy_model(
1359 self, model_name: str, controller: Controller, timeout: float = 1800
1360 ):
1361 """
1362 Destroy model from controller
1363
1364 :param: model: Model name to be removed
1365 :param: controller: Controller object
1366 :param: timeout: Timeout in seconds
1367 """
1368 self.log.debug(f"Destroying model {model_name}")
1369
1370 async def _destroy_model_loop(model_name: str, controller: Controller):
1371 while await self.model_exists(model_name, controller=controller):
1372 await controller.destroy_model(
1373 model_name, destroy_storage=True, force=True, max_wait=0
1374 )
1375 await asyncio.sleep(5)
1376
1377 try:
1378 await asyncio.wait_for(
1379 _destroy_model_loop(model_name, controller), timeout=timeout
1380 )
1381 except asyncio.TimeoutError:
1382 raise Exception(
1383 "Timeout waiting for model {} to be destroyed".format(model_name)
1384 )
1385 except juju.errors.JujuError as e:
1386 if any("has been removed" in error for error in e.errors):
1387 return
1388 raise e
1389
1390 async def destroy_application(
1391 self, model_name: str, application_name: str, total_timeout: float
1392 ):
1393 """
1394 Destroy application
1395
1396 :param: model_name: Model name
1397 :param: application_name: Application name
1398 :param: total_timeout: Timeout
1399 """
1400
1401 controller = await self.get_controller()
1402 model = None
1403
1404 try:
1405 model = await self.get_model(controller, model_name)
1406 self.log.debug(
1407 "Destroying application {} in model {}".format(
1408 application_name, model_name
1409 )
1410 )
1411 application = self._get_application(model, application_name)
1412 if application:
1413 await application.destroy()
1414 else:
1415 self.log.warning("Application not found: {}".format(application_name))
1416
1417 self.log.debug(
1418 "Waiting for application {} to be destroyed in model {}...".format(
1419 application_name, model_name
1420 )
1421 )
1422 if total_timeout is None:
1423 total_timeout = 3600
1424 end = time.time() + total_timeout
1425 while time.time() < end:
1426 if not self._get_application(model, application_name):
1427 self.log.debug(
1428 "The application {} was destroyed in model {} ".format(
1429 application_name, model_name
1430 )
1431 )
1432 return
1433 await asyncio.sleep(5)
1434 raise Exception(
1435 "Timeout waiting for application {} to be destroyed in model {}".format(
1436 application_name, model_name
1437 )
1438 )
1439 finally:
1440 if model is not None:
1441 await self.disconnect_model(model)
1442 await self.disconnect_controller(controller)
1443
1444 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1445 """
1446 Destroy pending machines in a given model
1447
1448 :param: only_manual: Bool that indicates only manually provisioned
1449 machines should be destroyed (if True), or that
1450 all pending machines should be destroyed
1451 """
1452 status = await model.get_status()
1453 for machine_id in status.machines:
1454 machine_status = status.machines[machine_id]
1455 if machine_status.agent_status.status == "pending":
1456 if only_manual and not machine_status.instance_id.startswith("manual:"):
1457 break
1458 machine = model.machines[machine_id]
1459 await machine.destroy(force=True)
1460
1461 async def configure_application(
1462 self, model_name: str, application_name: str, config: dict = None
1463 ):
1464 """Configure application
1465
1466 :param: model_name: Model name
1467 :param: application_name: Application name
1468 :param: config: Config to apply to the charm
1469 """
1470 self.log.debug("Configuring application {}".format(application_name))
1471
1472 if config:
1473 controller = await self.get_controller()
1474 model = None
1475 try:
1476 model = await self.get_model(controller, model_name)
1477 application = self._get_application(
1478 model,
1479 application_name=application_name,
1480 )
1481 await application.set_config(config)
1482 finally:
1483 if model:
1484 await self.disconnect_model(model)
1485 await self.disconnect_controller(controller)
1486
1487 def handle_exception(self, loop, context):
1488 # All unhandled exceptions by libjuju are handled here.
1489 pass
1490
1491 async def health_check(self, interval: float = 300.0):
1492 """
1493 Health check to make sure controller and controller_model connections are OK
1494
1495 :param: interval: Time in seconds between checks
1496 """
1497 controller = None
1498 while True:
1499 try:
1500 controller = await self.get_controller()
1501 # self.log.debug("VCA is alive")
1502 except Exception as e:
1503 self.log.error("Health check to VCA failed: {}".format(e))
1504 finally:
1505 await self.disconnect_controller(controller)
1506 await asyncio.sleep(interval)
1507
1508 async def list_models(self, contains: str = None) -> [str]:
1509 """List models with certain names
1510
1511 :param: contains: String that is contained in model name
1512
1513 :retur: [models] Returns list of model names
1514 """
1515
1516 controller = await self.get_controller()
1517 try:
1518 models = await controller.list_models()
1519 if contains:
1520 models = [model for model in models if contains in model]
1521 return models
1522 finally:
1523 await self.disconnect_controller(controller)
1524
1525 async def _list_offers(
1526 self, model_name: str, offer_name: str = None
1527 ) -> QueryApplicationOffersResults:
1528 """
1529 List offers within a model
1530
1531 :param: model_name: Model name
1532 :param: offer_name: Offer name to filter.
1533
1534 :return: Returns application offers results in the model
1535 """
1536
1537 controller = await self.get_controller()
1538 try:
1539 offers = (await controller.list_offers(model_name)).results
1540 if offer_name:
1541 matching_offer = []
1542 for offer in offers:
1543 if offer.offer_name == offer_name:
1544 matching_offer.append(offer)
1545 break
1546 offers = matching_offer
1547 return offers
1548 finally:
1549 await self.disconnect_controller(controller)
1550
1551 async def add_k8s(
1552 self,
1553 name: str,
1554 rbac_id: str,
1555 token: str,
1556 client_cert_data: str,
1557 configuration: Configuration,
1558 storage_class: str,
1559 credential_name: str = None,
1560 ):
1561 """
1562 Add a Kubernetes cloud to the controller
1563
1564 Similar to the `juju add-k8s` command in the CLI
1565
1566 :param: name: Name for the K8s cloud
1567 :param: configuration: Kubernetes configuration object
1568 :param: storage_class: Storage Class to use in the cloud
1569 :param: credential_name: Storage Class to use in the cloud
1570 """
1571
1572 if not storage_class:
1573 raise Exception("storage_class must be a non-empty string")
1574 if not name:
1575 raise Exception("name must be a non-empty string")
1576 if not configuration:
1577 raise Exception("configuration must be provided")
1578
1579 endpoint = configuration.host
1580 credential = self.get_k8s_cloud_credential(
1581 configuration,
1582 client_cert_data,
1583 token,
1584 )
1585 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1586 cloud = client.Cloud(
1587 type_="kubernetes",
1588 auth_types=[credential.auth_type],
1589 endpoint=endpoint,
1590 ca_certificates=[client_cert_data],
1591 config={
1592 "operator-storage": storage_class,
1593 "workload-storage": storage_class,
1594 },
1595 )
1596
1597 return await self.add_cloud(
1598 name, cloud, credential, credential_name=credential_name
1599 )
1600
1601 def get_k8s_cloud_credential(
1602 self,
1603 configuration: Configuration,
1604 client_cert_data: str,
1605 token: str = None,
1606 ) -> client.CloudCredential:
1607 attrs = {}
1608 # TODO: Test with AKS
1609 key = None # open(configuration.key_file, "r").read()
1610 username = configuration.username
1611 password = configuration.password
1612
1613 if client_cert_data:
1614 attrs["ClientCertificateData"] = client_cert_data
1615 if key:
1616 attrs["ClientKeyData"] = key
1617 if token:
1618 if username or password:
1619 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1620 attrs["Token"] = token
1621
1622 auth_type = None
1623 if key:
1624 auth_type = "oauth2"
1625 if client_cert_data:
1626 auth_type = "oauth2withcert"
1627 if not token:
1628 raise JujuInvalidK8sConfiguration(
1629 "missing token for auth type {}".format(auth_type)
1630 )
1631 elif username:
1632 if not password:
1633 self.log.debug(
1634 "credential for user {} has empty password".format(username)
1635 )
1636 attrs["username"] = username
1637 attrs["password"] = password
1638 if client_cert_data:
1639 auth_type = "userpasswithcert"
1640 else:
1641 auth_type = "userpass"
1642 elif client_cert_data and token:
1643 auth_type = "certificate"
1644 else:
1645 raise JujuInvalidK8sConfiguration("authentication method not supported")
1646 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1647
1648 async def add_cloud(
1649 self,
1650 name: str,
1651 cloud: Cloud,
1652 credential: CloudCredential = None,
1653 credential_name: str = None,
1654 ) -> Cloud:
1655 """
1656 Add cloud to the controller
1657
1658 :param: name: Name of the cloud to be added
1659 :param: cloud: Cloud object
1660 :param: credential: CloudCredentials object for the cloud
1661 :param: credential_name: Credential name.
1662 If not defined, cloud of the name will be used.
1663 """
1664 controller = await self.get_controller()
1665 try:
1666 _ = await controller.add_cloud(name, cloud)
1667 if credential:
1668 await controller.add_credential(
1669 credential_name or name, credential=credential, cloud=name
1670 )
1671 # Need to return the object returned by the controller.add_cloud() function
1672 # I'm returning the original value now until this bug is fixed:
1673 # https://github.com/juju/python-libjuju/issues/443
1674 return cloud
1675 finally:
1676 await self.disconnect_controller(controller)
1677
1678 async def remove_cloud(self, name: str):
1679 """
1680 Remove cloud
1681
1682 :param: name: Name of the cloud to be removed
1683 """
1684 controller = await self.get_controller()
1685 try:
1686 await controller.remove_cloud(name)
1687 except juju.errors.JujuError as e:
1688 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1689 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1690 else:
1691 raise e
1692 finally:
1693 await self.disconnect_controller(controller)
1694
1695 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1696 async def _get_leader_unit(self, application: Application) -> Unit:
1697 unit = None
1698 for u in application.units:
1699 if await u.is_leader_from_status():
1700 unit = u
1701 break
1702 if not unit:
1703 raise Exception()
1704 return unit
1705
1706 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1707 """
1708 Get cloud credentials
1709
1710 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1711
1712 :return: List of credentials object associated to the specified cloud
1713
1714 """
1715 controller = await self.get_controller()
1716 try:
1717 facade = client.CloudFacade.from_connection(controller.connection())
1718 cloud_cred_tag = tag.credential(
1719 cloud.name, self.vca_connection.data.user, cloud.credential_name
1720 )
1721 params = [client.Entity(cloud_cred_tag)]
1722 return (await facade.Credential(params)).results
1723 finally:
1724 await self.disconnect_controller(controller)
1725
1726 async def check_application_exists(self, model_name, application_name) -> bool:
1727 """Check application exists
1728
1729 :param: model_name: Model Name
1730 :param: application_name: Application Name
1731
1732 :return: Boolean
1733 """
1734
1735 model = None
1736 controller = await self.get_controller()
1737 try:
1738 model = await self.get_model(controller, model_name)
1739 self.log.debug(
1740 "Checking if application {} exists in model {}".format(
1741 application_name, model_name
1742 )
1743 )
1744 return self._get_application(model, application_name) is not None
1745 finally:
1746 if model:
1747 await self.disconnect_model(model)
1748 await self.disconnect_controller(controller)