Code Coverage

Cobertura Coverage Report > n2vc >

libjuju.py

Trend

File Coverage summary

NameClassesLinesConditionals
libjuju.py
100%
1/1
74%
569/766
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
libjuju.py
74%
569/766
N/A

Source

n2vc/libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import asyncio
16 1 import logging
17 1 import os
18 1 import typing
19 1 import yaml
20
21 1 import time
22
23 1 import juju.errors
24 1 from juju.bundle import BundleHandler
25 1 from juju.model import Model
26 1 from juju.machine import Machine
27 1 from juju.application import Application
28 1 from juju.unit import Unit
29 1 from juju.url import URL
30 1 from juju.version import DEFAULT_ARCHITECTURE
31 1 from juju.client._definitions import (
32     FullStatus,
33     QueryApplicationOffersResults,
34     Cloud,
35     CloudCredential,
36 )
37 1 from juju.controller import Controller
38 1 from juju.client import client
39 1 from juju import tag
40
41 1 from n2vc.definitions import Offer, RelationEndpoint
42 1 from n2vc.juju_watcher import JujuModelWatcher
43 1 from n2vc.provisioner import AsyncSSHProvisioner
44 1 from n2vc.n2vc_conn import N2VCConnector
45 1 from n2vc.exceptions import (
46     JujuMachineNotFound,
47     JujuApplicationNotFound,
48     JujuLeaderUnitNotFound,
49     JujuActionNotFound,
50     JujuControllerFailedConnecting,
51     JujuApplicationExists,
52     JujuInvalidK8sConfiguration,
53     JujuError,
54 )
55 1 from n2vc.vca.cloud import Cloud as VcaCloud
56 1 from n2vc.vca.connection import Connection
57 1 from kubernetes.client.configuration import Configuration
58 1 from retrying_async import retry
59
60
61 1 RBAC_LABEL_KEY_NAME = "rbac-id"
62
63
64 1 @asyncio.coroutine
65 1 def retry_callback(attempt, exc, args, kwargs, delay=0.5, *, loop):
66     # Specifically overridden from upstream implementation so it can
67     # continue to work with Python 3.10
68 1     yield from asyncio.sleep(attempt * delay)
69 1     return retry
70
71
72 1 class Libjuju:
73 1     def __init__(
74         self,
75         vca_connection: Connection,
76         log: logging.Logger = None,
77         n2vc: N2VCConnector = None,
78     ):
79         """
80         Constructor
81
82         :param: vca_connection:         n2vc.vca.connection object
83         :param: log:                    Logger
84         :param: n2vc:                   N2VC object
85         """
86
87 1         self.log = log or logging.getLogger("Libjuju")
88 1         self.n2vc = n2vc
89 1         self.vca_connection = vca_connection
90
91 1         self.creating_model = asyncio.Lock()
92
93 1         if self.vca_connection.is_default:
94 1             self.health_check_task = self._create_health_check_task()
95
96 1     def _create_health_check_task(self):
97 1         return asyncio.get_event_loop().create_task(self.health_check())
98
99 1     async def get_controller(self, timeout: float = 60.0) -> Controller:
100         """
101         Get controller
102
103         :param: timeout: Time in seconds to wait for controller to connect
104         """
105 1         controller = None
106 1         try:
107 1             controller = Controller()
108 1             await asyncio.wait_for(
109                 controller.connect(
110                     endpoint=self.vca_connection.data.endpoints,
111                     username=self.vca_connection.data.user,
112                     password=self.vca_connection.data.secret,
113                     cacert=self.vca_connection.data.cacert,
114                 ),
115                 timeout=timeout,
116             )
117 1             if self.vca_connection.is_default:
118 1                 endpoints = await controller.api_endpoints
119 1                 if not all(
120                     endpoint in self.vca_connection.endpoints for endpoint in endpoints
121                 ):
122 1                     await self.vca_connection.update_endpoints(endpoints)
123 1             return controller
124 1         except asyncio.CancelledError as e:
125 1             raise e
126 1         except Exception as e:
127 1             self.log.error(
128                 "Failed connecting to controller: {}... {}".format(
129                     self.vca_connection.data.endpoints, e
130                 )
131             )
132 1             if controller:
133 1                 await self.disconnect_controller(controller)
134
135 1             raise JujuControllerFailedConnecting(
136                 f"Error connecting to Juju controller: {e}"
137             )
138
139 1     async def disconnect(self):
140         """Disconnect"""
141         # Cancel health check task
142 1         self.health_check_task.cancel()
143 1         self.log.debug("Libjuju disconnected!")
144
145 1     async def disconnect_model(self, model: Model):
146         """
147         Disconnect model
148
149         :param: model: Model that will be disconnected
150         """
151 1         await model.disconnect()
152
153 1     async def disconnect_controller(self, controller: Controller):
154         """
155         Disconnect controller
156
157         :param: controller: Controller that will be disconnected
158         """
159 1         if controller:
160 1             await controller.disconnect()
161
162 1     @retry(attempts=3, delay=5, timeout=None, callback=retry_callback)
163 1     async def add_model(self, model_name: str, cloud: VcaCloud):
164         """
165         Create model
166
167         :param: model_name: Model name
168         :param: cloud: Cloud object
169         """
170
171         # Get controller
172 1         controller = await self.get_controller()
173 1         model = None
174 1         try:
175             # Block until other workers have finished model creation
176 1             while self.creating_model.locked():
177 0                 await asyncio.sleep(0.1)
178
179             # Create the model
180 1             async with self.creating_model:
181 1                 if await self.model_exists(model_name, controller=controller):
182 1                     return
183 1                 self.log.debug("Creating model {}".format(model_name))
184 1                 model = await controller.add_model(
185                     model_name,
186                     config=self.vca_connection.data.model_config,
187                     cloud_name=cloud.name,
188                     credential_name=cloud.credential_name,
189                 )
190 0         except juju.errors.JujuAPIError as e:
191 0             if "already exists" in e.message:
192 0                 pass
193             else:
194 0                 raise e
195         finally:
196 1             if model:
197 1                 await self.disconnect_model(model)
198 1             await self.disconnect_controller(controller)
199
200 1     async def get_executed_actions(self, model_name: str) -> list:
201         """
202         Get executed/history of actions for a model.
203
204         :param: model_name: Model name, str.
205         :return: List of executed actions for a model.
206         """
207 1         model = None
208 1         executed_actions = []
209 1         controller = await self.get_controller()
210 1         try:
211 1             model = await self.get_model(controller, model_name)
212             # Get all unique action names
213 1             actions = {}
214 1             for application in model.applications:
215 1                 application_actions = await self.get_actions(application, model_name)
216 1                 actions.update(application_actions)
217             # Get status of all actions
218 1             for application_action in actions:
219 1                 app_action_status_list = await model.get_action_status(
220                     name=application_action
221                 )
222 1                 for action_id, action_status in app_action_status_list.items():
223 1                     executed_action = {
224                         "id": action_id,
225                         "action": application_action,
226                         "status": action_status,
227                     }
228                     # Get action output by id
229 1                     action_status = await model.get_action_output(executed_action["id"])
230 1                     for k, v in action_status.items():
231 1                         executed_action[k] = v
232 1                     executed_actions.append(executed_action)
233 1         except Exception as e:
234 1             raise JujuError(
235                 "Error in getting executed actions for model: {}. Error: {}".format(
236                     model_name, str(e)
237                 )
238             )
239         finally:
240 1             if model:
241 1                 await self.disconnect_model(model)
242 1             await self.disconnect_controller(controller)
243 1         return executed_actions
244
245 1     async def get_application_configs(
246         self, model_name: str, application_name: str
247     ) -> dict:
248         """
249         Get available configs for an application.
250
251         :param: model_name: Model name, str.
252         :param: application_name: Application name, str.
253
254         :return: A dict which has key - action name, value - action description
255         """
256 1         model = None
257 1         application_configs = {}
258 1         controller = await self.get_controller()
259 1         try:
260 1             model = await self.get_model(controller, model_name)
261 1             application = self._get_application(
262                 model, application_name=application_name
263             )
264 1             application_configs = await application.get_config()
265 1         except Exception as e:
266 1             raise JujuError(
267                 "Error in getting configs for application: {} in model: {}. Error: {}".format(
268                     application_name, model_name, str(e)
269                 )
270             )
271         finally:
272 1             if model:
273 1                 await self.disconnect_model(model)
274 1             await self.disconnect_controller(controller)
275 1         return application_configs
276
277 1     @retry(attempts=3, delay=5, callback=retry_callback)
278 1     async def get_model(self, controller: Controller, model_name: str) -> Model:
279         """
280         Get model from controller
281
282         :param: controller: Controller
283         :param: model_name: Model name
284
285         :return: Model: The created Juju model object
286         """
287 1         return await controller.get_model(model_name)
288
289 1     async def model_exists(
290         self, model_name: str, controller: Controller = None
291     ) -> bool:
292         """
293         Check if model exists
294
295         :param: controller: Controller
296         :param: model_name: Model name
297
298         :return bool
299         """
300 1         need_to_disconnect = False
301
302         # Get controller if not passed
303 1         if not controller:
304 1             controller = await self.get_controller()
305 1             need_to_disconnect = True
306
307         # Check if model exists
308 1         try:
309 1             return model_name in await controller.list_models()
310         finally:
311 1             if need_to_disconnect:
312 1                 await self.disconnect_controller(controller)
313
314 1     async def models_exist(self, model_names: [str]) -> (bool, list):
315         """
316         Check if models exists
317
318         :param: model_names: List of strings with model names
319
320         :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
321         """
322 1         if not model_names:
323 1             raise Exception(
324                 "model_names must be a non-empty array. Given value: {}".format(
325                     model_names
326                 )
327             )
328 1         non_existing_models = []
329 1         models = await self.list_models()
330 1         existing_models = list(set(models).intersection(model_names))
331 1         non_existing_models = list(set(model_names) - set(existing_models))
332
333 1         return (
334             len(non_existing_models) == 0,
335             non_existing_models,
336         )
337
338 1     async def get_model_status(self, model_name: str) -> FullStatus:
339         """
340         Get model status
341
342         :param: model_name: Model name
343
344         :return: Full status object
345         """
346 1         controller = await self.get_controller()
347 1         model = await self.get_model(controller, model_name)
348 1         try:
349 1             return await model.get_status()
350         finally:
351 1             await self.disconnect_model(model)
352 1             await self.disconnect_controller(controller)
353
354 1     async def create_machine(
355         self,
356         model_name: str,
357         machine_id: str = None,
358         db_dict: dict = None,
359         progress_timeout: float = None,
360         total_timeout: float = None,
361         series: str = "bionic",
362         wait: bool = True,
363     ) -> (Machine, bool):
364         """
365         Create machine
366
367         :param: model_name:         Model name
368         :param: machine_id:         Machine id
369         :param: db_dict:            Dictionary with data of the DB to write the updates
370         :param: progress_timeout:   Maximum time between two updates in the model
371         :param: total_timeout:      Timeout for the entity to be active
372         :param: series:             Series of the machine (xenial, bionic, focal, ...)
373         :param: wait:               Wait until machine is ready
374
375         :return: (juju.machine.Machine, bool):  Machine object and a boolean saying
376                                                 if the machine is new or it already existed
377         """
378 1         new = False
379 1         machine = None
380
381 1         self.log.debug(
382             "Creating machine (id={}) in model: {}".format(machine_id, model_name)
383         )
384
385         # Get controller
386 1         controller = await self.get_controller()
387
388         # Get model
389 1         model = await self.get_model(controller, model_name)
390 1         try:
391 1             if machine_id is not None:
392 1                 self.log.debug(
393                     "Searching machine (id={}) in model {}".format(
394                         machine_id, model_name
395                     )
396                 )
397
398                 # Get machines from model and get the machine with machine_id if exists
399 1                 machines = await model.get_machines()
400 1                 if machine_id in machines:
401 1                     self.log.debug(
402                         "Machine (id={}) found in model {}".format(
403                             machine_id, model_name
404                         )
405                     )
406 1                     machine = machines[machine_id]
407                 else:
408 1                     raise JujuMachineNotFound("Machine {} not found".format(machine_id))
409
410 1             if machine is None:
411 1                 self.log.debug("Creating a new machine in model {}".format(model_name))
412
413                 # Create machine
414 1                 machine = await model.add_machine(
415                     spec=None, constraints=None, disks=None, series=series
416                 )
417 1                 new = True
418
419                 # Wait until the machine is ready
420 1                 self.log.debug(
421                     "Wait until machine {} is ready in model {}".format(
422                         machine.entity_id, model_name
423                     )
424                 )
425 1                 if wait:
426 1                     await JujuModelWatcher.wait_for(
427                         model=model,
428                         entity=machine,
429                         progress_timeout=progress_timeout,
430                         total_timeout=total_timeout,
431                         db_dict=db_dict,
432                         n2vc=self.n2vc,
433                         vca_id=self.vca_connection._vca_id,
434                     )
435         finally:
436 1             await self.disconnect_model(model)
437 1             await self.disconnect_controller(controller)
438
439 1         self.log.debug(
440             "Machine {} ready at {} in model {}".format(
441                 machine.entity_id, machine.dns_name, model_name
442             )
443         )
444 1         return machine, new
445
446 1     async def provision_machine(
447         self,
448         model_name: str,
449         hostname: str,
450         username: str,
451         private_key_path: str,
452         db_dict: dict = None,
453         progress_timeout: float = None,
454         total_timeout: float = None,
455     ) -> str:
456         """
457         Manually provisioning of a machine
458
459         :param: model_name:         Model name
460         :param: hostname:           IP to access the machine
461         :param: username:           Username to login to the machine
462         :param: private_key_path:   Local path for the private key
463         :param: db_dict:            Dictionary with data of the DB to write the updates
464         :param: progress_timeout:   Maximum time between two updates in the model
465         :param: total_timeout:      Timeout for the entity to be active
466
467         :return: (Entity): Machine id
468         """
469 0         self.log.debug(
470             "Provisioning machine. model: {}, hostname: {}, username: {}".format(
471                 model_name, hostname, username
472             )
473         )
474
475         # Get controller
476 0         controller = await self.get_controller()
477
478         # Get model
479 0         model = await self.get_model(controller, model_name)
480
481 0         try:
482             # Get provisioner
483 0             provisioner = AsyncSSHProvisioner(
484                 host=hostname,
485                 user=username,
486                 private_key_path=private_key_path,
487                 log=self.log,
488             )
489
490             # Provision machine
491 0             params = await provisioner.provision_machine()
492
493 0             params.jobs = ["JobHostUnits"]
494
495 0             self.log.debug("Adding machine to model")
496 0             connection = model.connection()
497 0             client_facade = client.ClientFacade.from_connection(connection)
498
499 0             results = await client_facade.AddMachines(params=[params])
500 0             error = results.machines[0].error
501
502 0             if error:
503 0                 msg = "Error adding machine: {}".format(error.message)
504 0                 self.log.error(msg=msg)
505 0                 raise ValueError(msg)
506
507 0             machine_id = results.machines[0].machine
508
509 0             self.log.debug("Installing Juju agent into machine {}".format(machine_id))
510 0             asyncio.ensure_future(
511                 provisioner.install_agent(
512                     connection=connection,
513                     nonce=params.nonce,
514                     machine_id=machine_id,
515                     proxy=self.vca_connection.data.api_proxy,
516                     series=params.series,
517                 )
518             )
519
520 0             machine = None
521 0             for _ in range(10):
522 0                 machine_list = await model.get_machines()
523 0                 if machine_id in machine_list:
524 0                     self.log.debug("Machine {} found in model!".format(machine_id))
525 0                     machine = model.machines.get(machine_id)
526 0                     break
527 0                 await asyncio.sleep(2)
528
529 0             if machine is None:
530 0                 msg = "Machine {} not found in model".format(machine_id)
531 0                 self.log.error(msg=msg)
532 0                 raise JujuMachineNotFound(msg)
533
534 0             self.log.debug(
535                 "Wait until machine {} is ready in model {}".format(
536                     machine.entity_id, model_name
537                 )
538             )
539 0             await JujuModelWatcher.wait_for(
540                 model=model,
541                 entity=machine,
542                 progress_timeout=progress_timeout,
543                 total_timeout=total_timeout,
544                 db_dict=db_dict,
545                 n2vc=self.n2vc,
546                 vca_id=self.vca_connection._vca_id,
547             )
548 0         except Exception as e:
549 0             raise e
550         finally:
551 0             await self.disconnect_model(model)
552 0             await self.disconnect_controller(controller)
553
554 0         self.log.debug(
555             "Machine provisioned {} in model {}".format(machine_id, model_name)
556         )
557
558 0         return machine_id
559
560 1     async def deploy(
561         self,
562         uri: str,
563         model_name: str,
564         wait: bool = True,
565         timeout: float = 3600,
566         instantiation_params: dict = None,
567     ):
568         """
569         Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
570
571         :param uri:            Path or Charm Store uri in which the charm or bundle can be found
572         :param model_name:     Model name
573         :param wait:           Indicates whether to wait or not until all applications are active
574         :param timeout:        Time in seconds to wait until all applications are active
575         :param instantiation_params: To be applied as overlay bundle over primary bundle.
576         """
577 1         controller = await self.get_controller()
578 1         model = await self.get_model(controller, model_name)
579 1         overlays = []
580 1         try:
581 1             await self._validate_instantiation_params(uri, model, instantiation_params)
582 1             overlays = self._get_overlays(model_name, instantiation_params)
583 1             await model.deploy(uri, trust=True, overlays=overlays)
584 1             if wait:
585 1                 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
586 1                 self.log.debug("All units active in model {}".format(model_name))
587         finally:
588 1             self._remove_overlay_file(overlays)
589 1             await self.disconnect_model(model)
590 1             await self.disconnect_controller(controller)
591
592 1     async def _validate_instantiation_params(
593         self, uri: str, model, instantiation_params: dict
594     ) -> None:
595         """Checks if all the applications in instantiation_params
596         exist ins the original bundle.
597
598         Raises:
599             JujuApplicationNotFound if there is an invalid app in
600             the instantiation params.
601         """
602 1         overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
603 1         if not overlay_apps:
604 1             return
605 1         original_apps = await self._get_apps_in_original_bundle(uri, model)
606 1         if not all(app in original_apps for app in overlay_apps):
607 1             raise JujuApplicationNotFound(
608                 "Cannot find application {} in original bundle {}".format(
609                     overlay_apps, original_apps
610                 )
611             )
612
613 1     async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
614         """Bundle is downloaded in BundleHandler.fetch_plan.
615         That method takes care of opening and exception handling.
616
617         Resolve method gets all the information regarding the channel,
618         track, revision, type, source.
619
620         Returns:
621             Set with the names of the applications in original bundle.
622         """
623 1         url = URL.parse(uri)
624 1         architecture = DEFAULT_ARCHITECTURE  # only AMD64 is allowed
625 1         res = await model.deploy_types[str(url.schema)].resolve(
626             url, architecture, entity_url=uri
627         )
628 1         handler = BundleHandler(model, trusted=True, forced=False)
629 1         await handler.fetch_plan(url, res.origin)
630 1         return handler.applications
631
632 1     def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
633         """Extract applications key in instantiation params.
634
635         Returns:
636             List with the names of the applications in instantiation params.
637
638         Raises:
639             JujuError if applications key is not found.
640         """
641 1         if not instantiation_params:
642 1             return []
643 1         try:
644 1             return [key for key in instantiation_params.get("applications")]
645 1         except Exception as e:
646 1             raise JujuError("Invalid overlay format. {}".format(str(e)))
647
648 1     def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
649         """Creates a temporary overlay file which includes the instantiation params.
650         Only one overlay file is created.
651
652         Returns:
653             List with one overlay filename. Empty list if there are no instantiation params.
654         """
655 1         if not instantiation_params:
656 1             return []
657 1         file_name = model_name + "-overlay.yaml"
658 1         self._write_overlay_file(file_name, instantiation_params)
659 1         return [file_name]
660
661 1     def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
662 1         with open(file_name, "w") as file:
663 1             yaml.dump(instantiation_params, file)
664
665 1     def _remove_overlay_file(self, overlay: list) -> None:
666         """Overlay contains either one or zero file names."""
667 1         if not overlay:
668 1             return
669 1         try:
670 1             filename = overlay[0]
671 1             os.remove(filename)
672 1         except OSError as e:
673 1             self.log.warning(
674                 "Overlay file {} could not be removed: {}".format(filename, e)
675             )
676
677 1     async def add_unit(
678         self,
679         application_name: str,
680         model_name: str,
681         machine_id: str,
682         db_dict: dict = None,
683         progress_timeout: float = None,
684         total_timeout: float = None,
685     ):
686         """Add unit
687
688         :param: application_name:   Application name
689         :param: model_name:         Model name
690         :param: machine_id          Machine id
691         :param: db_dict:            Dictionary with data of the DB to write the updates
692         :param: progress_timeout:   Maximum time between two updates in the model
693         :param: total_timeout:      Timeout for the entity to be active
694
695         :return: None
696         """
697
698 1         model = None
699 1         controller = await self.get_controller()
700 1         try:
701 1             model = await self.get_model(controller, model_name)
702 1             application = self._get_application(model, application_name)
703
704 1             if application is not None:
705                 # Checks if the given machine id in the model,
706                 # otherwise function raises an error
707 1                 _machine, _series = self._get_machine_info(model, machine_id)
708
709 1                 self.log.debug(
710                     "Adding unit (machine {}) to application {} in model ~{}".format(
711                         machine_id, application_name, model_name
712                     )
713                 )
714
715 1                 await application.add_unit(to=machine_id)
716
717 1                 await JujuModelWatcher.wait_for(
718                     model=model,
719                     entity=application,
720                     progress_timeout=progress_timeout,
721                     total_timeout=total_timeout,
722                     db_dict=db_dict,
723                     n2vc=self.n2vc,
724                     vca_id=self.vca_connection._vca_id,
725                 )
726 1                 self.log.debug(
727                     "Unit is added to application {} in model {}".format(
728                         application_name, model_name
729                     )
730                 )
731             else:
732 1                 raise JujuApplicationNotFound(
733                     "Application {} not exists".format(application_name)
734                 )
735         finally:
736 1             if model:
737 1                 await self.disconnect_model(model)
738 1             await self.disconnect_controller(controller)
739
740 1     async def destroy_unit(
741         self,
742         application_name: str,
743         model_name: str,
744         machine_id: str,
745         total_timeout: float = None,
746     ):
747         """Destroy unit
748
749         :param: application_name:   Application name
750         :param: model_name:         Model name
751         :param: machine_id          Machine id
752         :param: total_timeout:      Timeout for the entity to be active
753
754         :return: None
755         """
756
757 1         model = None
758 1         controller = await self.get_controller()
759 1         try:
760 1             model = await self.get_model(controller, model_name)
761 1             application = self._get_application(model, application_name)
762
763 1             if application is None:
764 1                 raise JujuApplicationNotFound(
765                     "Application not found: {} (model={})".format(
766                         application_name, model_name
767                     )
768                 )
769
770 1             unit = self._get_unit(application, machine_id)
771 1             if not unit:
772 1                 raise JujuError(
773                     "A unit with machine id {} not in available units".format(
774                         machine_id
775                     )
776                 )
777
778 1             unit_name = unit.name
779
780 1             self.log.debug(
781                 "Destroying unit {} from application {} in model {}".format(
782                     unit_name, application_name, model_name
783                 )
784             )
785 1             await application.destroy_unit(unit_name)
786
787 1             self.log.debug(
788                 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
789                     unit_name, application_name, model_name
790                 )
791             )
792
793             # TODO: Add functionality in the Juju watcher to replace this kind of blocks
794 1             if total_timeout is None:
795 0                 total_timeout = 3600
796 1             end = time.time() + total_timeout
797 1             while time.time() < end:
798 0                 if not self._get_unit(application, machine_id):
799 0                     self.log.debug(
800                         "The unit {} was destroyed in application {} (model={}) ".format(
801                             unit_name, application_name, model_name
802                         )
803                     )
804 0                     return
805 0                 await asyncio.sleep(5)
806 1             self.log.debug(
807                 "Unit {} is destroyed from application {} in model {}".format(
808                     unit_name, application_name, model_name
809                 )
810             )
811         finally:
812 1             if model:
813 1                 await self.disconnect_model(model)
814 1             await self.disconnect_controller(controller)
815
816 1     async def deploy_charm(
817         self,
818         application_name: str,
819         path: str,
820         model_name: str,
821         machine_id: str,
822         db_dict: dict = None,
823         progress_timeout: float = None,
824         total_timeout: float = None,
825         config: dict = None,
826         series: str = None,
827         num_units: int = 1,
828     ):
829         """Deploy charm
830
831         :param: application_name:   Application name
832         :param: path:               Local path to the charm
833         :param: model_name:         Model name
834         :param: machine_id          ID of the machine
835         :param: db_dict:            Dictionary with data of the DB to write the updates
836         :param: progress_timeout:   Maximum time between two updates in the model
837         :param: total_timeout:      Timeout for the entity to be active
838         :param: config:             Config for the charm
839         :param: series:             Series of the charm
840         :param: num_units:          Number of units
841
842         :return: (juju.application.Application): Juju application
843         """
844 1         self.log.debug(
845             "Deploying charm {} to machine {} in model ~{}".format(
846                 application_name, machine_id, model_name
847             )
848         )
849 1         self.log.debug("charm: {}".format(path))
850
851         # Get controller
852 1         controller = await self.get_controller()
853
854         # Get model
855 1         model = await self.get_model(controller, model_name)
856
857 1         try:
858 1             if application_name not in model.applications:
859 1                 if machine_id is not None:
860 1                     machine, series = self._get_machine_info(model, machine_id)
861
862 1                 application = await model.deploy(
863                     entity_url=path,
864                     application_name=application_name,
865                     channel="stable",
866                     num_units=1,
867                     series=series,
868                     to=machine_id,
869                     config=config,
870                 )
871
872 1                 self.log.debug(
873                     "Wait until application {} is ready in model {}".format(
874                         application_name, model_name
875                     )
876                 )
877 1                 if num_units > 1:
878 1                     for _ in range(num_units - 1):
879 1                         m, _ = await self.create_machine(model_name, wait=False)
880 1                         await application.add_unit(to=m.entity_id)
881
882 1                 await JujuModelWatcher.wait_for(
883                     model=model,
884                     entity=application,
885                     progress_timeout=progress_timeout,
886                     total_timeout=total_timeout,
887                     db_dict=db_dict,
888                     n2vc=self.n2vc,
889                     vca_id=self.vca_connection._vca_id,
890                 )
891 1                 self.log.debug(
892                     "Application {} is ready in model {}".format(
893                         application_name, model_name
894                     )
895                 )
896             else:
897 1                 raise JujuApplicationExists(
898                     "Application {} exists".format(application_name)
899                 )
900 1         except juju.errors.JujuError as e:
901 0             if "already exists" in e.message:
902 0                 raise JujuApplicationExists(
903                     "Application {} exists".format(application_name)
904                 )
905             else:
906 0                 raise e
907         finally:
908 1             await self.disconnect_model(model)
909 1             await self.disconnect_controller(controller)
910
911 1         return application
912
913 1     async def upgrade_charm(
914         self,
915         application_name: str,
916         path: str,
917         model_name: str,
918         total_timeout: float = None,
919         **kwargs,
920     ):
921         """Upgrade Charm
922
923         :param: application_name:   Application name
924         :param: model_name:         Model name
925         :param: path:               Local path to the charm
926         :param: total_timeout:      Timeout for the entity to be active
927
928         :return: (str, str): (output and status)
929         """
930
931 0         self.log.debug(
932             "Upgrading charm {} in model {} from path {}".format(
933                 application_name, model_name, path
934             )
935         )
936
937 0         await self.resolve_application(
938             model_name=model_name, application_name=application_name
939         )
940
941         # Get controller
942 0         controller = await self.get_controller()
943
944         # Get model
945 0         model = await self.get_model(controller, model_name)
946
947 0         try:
948             # Get application
949 0             application = self._get_application(
950                 model,
951                 application_name=application_name,
952             )
953 0             if application is None:
954 0                 raise JujuApplicationNotFound(
955                     "Cannot find application {} to upgrade".format(application_name)
956                 )
957
958 0             await application.refresh(path=path)
959
960 0             self.log.debug(
961                 "Wait until charm upgrade is completed for application {} (model={})".format(
962                     application_name, model_name
963                 )
964             )
965
966 0             await JujuModelWatcher.ensure_units_idle(
967                 model=model, application=application
968             )
969
970 0             if application.status == "error":
971 0                 error_message = "Unknown"
972 0                 for unit in application.units:
973 0                     if (
974                         unit.workload_status == "error"
975                         and unit.workload_status_message != ""
976                     ):
977 0                         error_message = unit.workload_status_message
978
979 0                 message = "Application {} failed update in {}: {}".format(
980                     application_name, model_name, error_message
981                 )
982 0                 self.log.error(message)
983 0                 raise JujuError(message=message)
984
985 0             self.log.debug(
986                 "Application {} is ready in model {}".format(
987                     application_name, model_name
988                 )
989             )
990
991         finally:
992 0             await self.disconnect_model(model)
993 0             await self.disconnect_controller(controller)
994
995 0         return application
996
997 1     async def resolve_application(self, model_name: str, application_name: str):
998 0         controller = await self.get_controller()
999 0         model = await self.get_model(controller, model_name)
1000
1001 0         try:
1002 0             application = self._get_application(
1003                 model,
1004                 application_name=application_name,
1005             )
1006 0             if application is None:
1007 0                 raise JujuApplicationNotFound(
1008                     "Cannot find application {} to resolve".format(application_name)
1009                 )
1010
1011 0             while application.status == "error":
1012 0                 for unit in application.units:
1013 0                     if unit.workload_status == "error":
1014 0                         self.log.debug(
1015                             "Model {}, Application {}, Unit {} in error state, resolving".format(
1016                                 model_name, application_name, unit.entity_id
1017                             )
1018                         )
1019 0                         try:
1020 0                             await unit.resolved(retry=False)
1021 0                         except Exception:
1022 0                             pass
1023
1024 0                 await asyncio.sleep(1)
1025
1026         finally:
1027 0             await self.disconnect_model(model)
1028 0             await self.disconnect_controller(controller)
1029
1030 1     async def resolve(self, model_name: str):
1031 0         controller = await self.get_controller()
1032 0         model = await self.get_model(controller, model_name)
1033 0         all_units_active = False
1034 0         try:
1035 0             while not all_units_active:
1036 0                 all_units_active = True
1037 0                 for application_name, application in model.applications.items():
1038 0                     if application.status == "error":
1039 0                         for unit in application.units:
1040 0                             if unit.workload_status == "error":
1041 0                                 self.log.debug(
1042                                     "Model {}, Application {}, Unit {} in error state, resolving".format(
1043                                         model_name, application_name, unit.entity_id
1044                                     )
1045                                 )
1046 0                                 try:
1047 0                                     await unit.resolved(retry=False)
1048 0                                     all_units_active = False
1049 0                                 except Exception:
1050 0                                     pass
1051
1052 0                 if not all_units_active:
1053 0                     await asyncio.sleep(5)
1054         finally:
1055 0             await self.disconnect_model(model)
1056 0             await self.disconnect_controller(controller)
1057
1058 1     async def scale_application(
1059         self,
1060         model_name: str,
1061         application_name: str,
1062         scale: int = 1,
1063         total_timeout: float = None,
1064     ):
1065         """
1066         Scale application (K8s)
1067
1068         :param: model_name:         Model name
1069         :param: application_name:   Application name
1070         :param: scale:              Scale to which to set this application
1071         :param: total_timeout:      Timeout for the entity to be active
1072         """
1073
1074 1         model = None
1075 1         controller = await self.get_controller()
1076 1         try:
1077 1             model = await self.get_model(controller, model_name)
1078
1079 1             self.log.debug(
1080                 "Scaling application {} in model {}".format(
1081                     application_name, model_name
1082                 )
1083             )
1084 1             application = self._get_application(model, application_name)
1085 1             if application is None:
1086 1                 raise JujuApplicationNotFound("Cannot scale application")
1087 1             await application.scale(scale=scale)
1088             # Wait until application is scaled in model
1089 1             self.log.debug(
1090                 "Waiting for application {} to be scaled in model {}...".format(
1091                     application_name, model_name
1092                 )
1093             )
1094 1             if total_timeout is None:
1095 1                 total_timeout = 1800
1096 1             end = time.time() + total_timeout
1097 1             while time.time() < end:
1098 1                 application_scale = self._get_application_count(model, application_name)
1099                 # Before calling wait_for_model function,
1100                 # wait until application unit count and scale count are equal.
1101                 # Because there is a delay before scaling triggers in Juju model.
1102 1                 if application_scale == scale:
1103 1                     await JujuModelWatcher.wait_for_model(
1104                         model=model, timeout=total_timeout
1105                     )
1106 1                     self.log.debug(
1107                         "Application {} is scaled in model {}".format(
1108                             application_name, model_name
1109                         )
1110                     )
1111 1                     return
1112 0                 await asyncio.sleep(5)
1113 1             raise Exception(
1114                 "Timeout waiting for application {} in model {} to be scaled".format(
1115                     application_name, model_name
1116                 )
1117             )
1118         finally:
1119 1             if model:
1120 1                 await self.disconnect_model(model)
1121 1             await self.disconnect_controller(controller)
1122
1123 1     def _get_application_count(self, model: Model, application_name: str) -> int:
1124         """Get number of units of the application
1125
1126         :param: model:              Model object
1127         :param: application_name:   Application name
1128
1129         :return: int (or None if application doesn't exist)
1130         """
1131 1         application = self._get_application(model, application_name)
1132 1         if application is not None:
1133 1             return len(application.units)
1134
1135 1     def _get_application(self, model: Model, application_name: str) -> Application:
1136         """Get application
1137
1138         :param: model:              Model object
1139         :param: application_name:   Application name
1140
1141         :return: juju.application.Application (or None if it doesn't exist)
1142         """
1143 1         if model.applications and application_name in model.applications:
1144 1             return model.applications[application_name]
1145
1146 1     def _get_unit(self, application: Application, machine_id: str) -> Unit:
1147         """Get unit
1148
1149         :param: application:        Application object
1150         :param: machine_id:         Machine id
1151
1152         :return: Unit
1153         """
1154 1         unit = None
1155 1         for u in application.units:
1156 1             if u.machine_id == machine_id:
1157 1                 unit = u
1158 1                 break
1159 1         return unit
1160
1161 1     def _get_machine_info(
1162         self,
1163         model,
1164         machine_id: str,
1165     ) -> (str, str):
1166         """Get machine info
1167
1168         :param: model:          Model object
1169         :param: machine_id:     Machine id
1170
1171         :return: (str, str): (machine, series)
1172         """
1173 1         if machine_id not in model.machines:
1174 1             msg = "Machine {} not found in model".format(machine_id)
1175 1             self.log.error(msg=msg)
1176 1             raise JujuMachineNotFound(msg)
1177 1         machine = model.machines[machine_id]
1178 1         return machine, machine.series
1179
1180 1     async def execute_action(
1181         self,
1182         application_name: str,
1183         model_name: str,
1184         action_name: str,
1185         db_dict: dict = None,
1186         machine_id: str = None,
1187         progress_timeout: float = None,
1188         total_timeout: float = None,
1189         **kwargs,
1190     ):
1191         """Execute action
1192
1193         :param: application_name:   Application name
1194         :param: model_name:         Model name
1195         :param: action_name:        Name of the action
1196         :param: db_dict:            Dictionary with data of the DB to write the updates
1197         :param: machine_id          Machine id
1198         :param: progress_timeout:   Maximum time between two updates in the model
1199         :param: total_timeout:      Timeout for the entity to be active
1200
1201         :return: (str, str): (output and status)
1202         """
1203 1         self.log.debug(
1204             "Executing action {} using params {}".format(action_name, kwargs)
1205         )
1206         # Get controller
1207 1         controller = await self.get_controller()
1208
1209         # Get model
1210 1         model = await self.get_model(controller, model_name)
1211
1212 1         try:
1213             # Get application
1214 1             application = self._get_application(
1215                 model,
1216                 application_name=application_name,
1217             )
1218 1             if application is None:
1219 1                 raise JujuApplicationNotFound("Cannot execute action")
1220             # Racing condition:
1221             #   Ocassionally, self._get_leader_unit() will return None
1222             #   because the leader elected hook has not been triggered yet.
1223             #   Therefore, we are doing some retries. If it happens again,
1224             #   re-open bug 1236
1225 1             if machine_id is None:
1226 1                 unit = await self._get_leader_unit(application)
1227 1                 self.log.debug(
1228                     "Action {} is being executed on the leader unit {}".format(
1229                         action_name, unit.name
1230                     )
1231                 )
1232             else:
1233 0                 unit = self._get_unit(application, machine_id)
1234 0                 if not unit:
1235 0                     raise JujuError(
1236                         "A unit with machine id {} not in available units".format(
1237                             machine_id
1238                         )
1239                     )
1240 0                 self.log.debug(
1241                     "Action {} is being executed on {} unit".format(
1242                         action_name, unit.name
1243                     )
1244                 )
1245
1246 1             actions = await application.get_actions()
1247
1248 1             if action_name not in actions:
1249 1                 raise JujuActionNotFound(
1250                     "Action {} not in available actions".format(action_name)
1251                 )
1252
1253 1             action = await unit.run_action(action_name, **kwargs)
1254
1255 1             self.log.debug(
1256                 "Wait until action {} is completed in application {} (model={})".format(
1257                     action_name, application_name, model_name
1258                 )
1259             )
1260 1             await JujuModelWatcher.wait_for(
1261                 model=model,
1262                 entity=action,
1263                 progress_timeout=progress_timeout,
1264                 total_timeout=total_timeout,
1265                 db_dict=db_dict,
1266                 n2vc=self.n2vc,
1267                 vca_id=self.vca_connection._vca_id,
1268             )
1269
1270 1             output = await model.get_action_output(action_uuid=action.entity_id)
1271 1             status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1272 1             status = (
1273                 status[action.entity_id] if action.entity_id in status else "failed"
1274             )
1275
1276 1             self.log.debug(
1277                 "Action {} completed with status {} in application {} (model={})".format(
1278                     action_name, action.status, application_name, model_name
1279                 )
1280             )
1281         finally:
1282 1             await self.disconnect_model(model)
1283 1             await self.disconnect_controller(controller)
1284
1285 1         return output, status
1286
1287 1     async def get_actions(self, application_name: str, model_name: str) -> dict:
1288         """Get list of actions
1289
1290         :param: application_name: Application name
1291         :param: model_name: Model name
1292
1293         :return: Dict with this format
1294             {
1295                 "action_name": "Description of the action",
1296                 ...
1297             }
1298         """
1299 1         self.log.debug(
1300             "Getting list of actions for application {}".format(application_name)
1301         )
1302
1303         # Get controller
1304 1         controller = await self.get_controller()
1305
1306         # Get model
1307 1         model = await self.get_model(controller, model_name)
1308
1309 1         try:
1310             # Get application
1311 1             application = self._get_application(
1312                 model,
1313                 application_name=application_name,
1314             )
1315
1316             # Return list of actions
1317 1             return await application.get_actions()
1318
1319         finally:
1320             # Disconnect from model and controller
1321 1             await self.disconnect_model(model)
1322 1             await self.disconnect_controller(controller)
1323
1324 1     async def get_metrics(self, model_name: str, application_name: str) -> dict:
1325         """Get the metrics collected by the VCA.
1326
1327         :param model_name The name or unique id of the network service
1328         :param application_name The name of the application
1329         """
1330 1         if not model_name or not application_name:
1331 1             raise Exception("model_name and application_name must be non-empty strings")
1332 1         metrics = {}
1333 1         controller = await self.get_controller()
1334 1         model = await self.get_model(controller, model_name)
1335 1         try:
1336 1             application = self._get_application(model, application_name)
1337 1             if application is not None:
1338 1                 metrics = await application.get_metrics()
1339         finally:
1340 1             self.disconnect_model(model)
1341 1             self.disconnect_controller(controller)
1342 1         return metrics
1343
1344 1     async def add_relation(
1345         self,
1346         model_name: str,
1347         endpoint_1: str,
1348         endpoint_2: str,
1349     ):
1350         """Add relation
1351
1352         :param: model_name:     Model name
1353         :param: endpoint_1      First endpoint name
1354                                 ("app:endpoint" format or directly the saas name)
1355         :param: endpoint_2:     Second endpoint name (^ same format)
1356         """
1357
1358 1         self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1359
1360         # Get controller
1361 1         controller = await self.get_controller()
1362
1363         # Get model
1364 1         model = await self.get_model(controller, model_name)
1365
1366         # Add relation
1367 1         try:
1368 1             await model.add_relation(endpoint_1, endpoint_2)
1369 1         except juju.errors.JujuAPIError as e:
1370 1             if self._relation_is_not_found(e):
1371 1                 self.log.warning("Relation not found: {}".format(e.message))
1372 1                 return
1373 1             if self._relation_already_exist(e):
1374 1                 self.log.warning("Relation already exists: {}".format(e.message))
1375 1                 return
1376             # another exception, raise it
1377 1             raise e
1378         finally:
1379 1             await self.disconnect_model(model)
1380 1             await self.disconnect_controller(controller)
1381
1382 1     def _relation_is_not_found(self, juju_error):
1383 1         text = "not found"
1384 1         return (text in juju_error.message) or (
1385             juju_error.error_code and text in juju_error.error_code
1386         )
1387
1388 1     def _relation_already_exist(self, juju_error):
1389 1         text = "already exists"
1390 1         return (text in juju_error.message) or (
1391             juju_error.error_code and text in juju_error.error_code
1392         )
1393
1394 1     async def offer(self, endpoint: RelationEndpoint) -> Offer:
1395         """
1396         Create an offer from a RelationEndpoint
1397
1398         :param: endpoint: Relation endpoint
1399
1400         :return: Offer object
1401         """
1402 1         model_name = endpoint.model_name
1403 1         offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1404 1         controller = await self.get_controller()
1405 1         model = None
1406 1         try:
1407 1             model = await self.get_model(controller, model_name)
1408 1             await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1409 1             offer_list = await self._list_offers(model_name, offer_name=offer_name)
1410 1             if offer_list:
1411 1                 return Offer(offer_list[0].offer_url)
1412             else:
1413 1                 raise Exception("offer was not created")
1414 1         except juju.errors.JujuError as e:
1415 0             if "application offer already exists" not in e.message:
1416 0                 raise e
1417         finally:
1418 1             if model:
1419 1                 self.disconnect_model(model)
1420 1             self.disconnect_controller(controller)
1421
1422 1     async def consume(
1423         self,
1424         model_name: str,
1425         offer: Offer,
1426         provider_libjuju: "Libjuju",
1427     ) -> str:
1428         """
1429         Consumes a remote offer in the model. Relations can be created later using "juju relate".
1430
1431         :param: model_name:             Model name
1432         :param: offer:                  Offer object to consume
1433         :param: provider_libjuju:       Libjuju object of the provider endpoint
1434
1435         :raises ParseError if there's a problem parsing the offer_url
1436         :raises JujuError if remote offer includes and endpoint
1437         :raises JujuAPIError if the operation is not successful
1438
1439         :returns: Saas name. It is the application name in the model that reference the remote application.
1440         """
1441 1         saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1442 1         if offer.vca_id:
1443 1             saas_name = f"{saas_name}-{offer.vca_id}"
1444 1         controller = await self.get_controller()
1445 1         model = None
1446 1         provider_controller = None
1447 1         try:
1448 1             model = await controller.get_model(model_name)
1449 1             provider_controller = await provider_libjuju.get_controller()
1450 1             await model.consume(
1451                 offer.url, application_alias=saas_name, controller=provider_controller
1452             )
1453 1             return saas_name
1454         finally:
1455 1             if model:
1456 1                 await self.disconnect_model(model)
1457 1             if provider_controller:
1458 1                 await provider_libjuju.disconnect_controller(provider_controller)
1459 1             await self.disconnect_controller(controller)
1460
1461 1     async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1462         """
1463         Destroy model
1464
1465         :param: model_name:     Model name
1466         :param: total_timeout:  Timeout
1467         """
1468
1469 0         controller = await self.get_controller()
1470 0         model = None
1471 0         try:
1472 0             if not await self.model_exists(model_name, controller=controller):
1473 0                 self.log.warn(f"Model {model_name} doesn't exist")
1474 0                 return
1475
1476 0             self.log.debug(f"Getting model {model_name} to be destroyed")
1477 0             model = await self.get_model(controller, model_name)
1478 0             self.log.debug(f"Destroying manual machines in model {model_name}")
1479             # Destroy machines that are manually provisioned
1480             # and still are in pending state
1481 0             await self._destroy_pending_machines(model, only_manual=True)
1482 0             await self.disconnect_model(model)
1483
1484 0             await asyncio.wait_for(
1485                 self._destroy_model(model_name, controller),
1486                 timeout=total_timeout,
1487             )
1488 0         except Exception as e:
1489 0             if not await self.model_exists(model_name, controller=controller):
1490 0                 self.log.warn(
1491                     f"Failed deleting model {model_name}: model doesn't exist"
1492                 )
1493 0                 return
1494 0             self.log.warn(f"Failed deleting model {model_name}: {e}")
1495 0             raise e
1496         finally:
1497 0             if model:
1498 0                 await self.disconnect_model(model)
1499 0             await self.disconnect_controller(controller)
1500
1501 1     async def _destroy_model(
1502         self,
1503         model_name: str,
1504         controller: Controller,
1505     ):
1506         """
1507         Destroy model from controller
1508
1509         :param: model: Model name to be removed
1510         :param: controller: Controller object
1511         :param: timeout: Timeout in seconds
1512         """
1513 0         self.log.debug(f"Destroying model {model_name}")
1514
1515 0         async def _destroy_model_gracefully(model_name: str, controller: Controller):
1516 0             self.log.info(f"Gracefully deleting model {model_name}")
1517 0             resolved = False
1518 0             while model_name in await controller.list_models():
1519 0                 if not resolved:
1520 0                     await self.resolve(model_name)
1521 0                     resolved = True
1522 0                 await controller.destroy_model(model_name, destroy_storage=True)
1523
1524 0                 await asyncio.sleep(5)
1525 0             self.log.info(f"Model {model_name} deleted gracefully")
1526
1527 0         async def _destroy_model_forcefully(model_name: str, controller: Controller):
1528 0             self.log.info(f"Forcefully deleting model {model_name}")
1529 0             while model_name in await controller.list_models():
1530 0                 await controller.destroy_model(
1531                     model_name, destroy_storage=True, force=True, max_wait=60
1532                 )
1533 0                 await asyncio.sleep(5)
1534 0             self.log.info(f"Model {model_name} deleted forcefully")
1535
1536 0         try:
1537 0             try:
1538 0                 await asyncio.wait_for(
1539                     _destroy_model_gracefully(model_name, controller), timeout=120
1540                 )
1541 0             except asyncio.TimeoutError:
1542 0                 await _destroy_model_forcefully(model_name, controller)
1543 0         except juju.errors.JujuError as e:
1544 0             if any("has been removed" in error for error in e.errors):
1545 0                 return
1546 0             if any("model not found" in error for error in e.errors):
1547 0                 return
1548 0             raise e
1549
1550 1     async def destroy_application(
1551         self, model_name: str, application_name: str, total_timeout: float
1552     ):
1553         """
1554         Destroy application
1555
1556         :param: model_name:         Model name
1557         :param: application_name:   Application name
1558         :param: total_timeout:      Timeout
1559         """
1560
1561 1         controller = await self.get_controller()
1562 1         model = None
1563
1564 1         try:
1565 1             model = await self.get_model(controller, model_name)
1566 1             self.log.debug(
1567                 "Destroying application {} in model {}".format(
1568                     application_name, model_name
1569                 )
1570             )
1571 1             application = self._get_application(model, application_name)
1572 1             if application:
1573 0                 await application.destroy()
1574             else:
1575 1                 self.log.warning("Application not found: {}".format(application_name))
1576
1577 1             self.log.debug(
1578                 "Waiting for application {} to be destroyed in model {}...".format(
1579                     application_name, model_name
1580                 )
1581             )
1582 1             if total_timeout is None:
1583 0                 total_timeout = 3600
1584 1             end = time.time() + total_timeout
1585 1             while time.time() < end:
1586 1                 if not self._get_application(model, application_name):
1587 1                     self.log.debug(
1588                         "The application {} was destroyed in model {} ".format(
1589                             application_name, model_name
1590                         )
1591                     )
1592 1                     return
1593 0                 await asyncio.sleep(5)
1594 1             raise Exception(
1595                 "Timeout waiting for application {} to be destroyed in model {}".format(
1596                     application_name, model_name
1597                 )
1598             )
1599         finally:
1600 1             if model is not None:
1601 1                 await self.disconnect_model(model)
1602 1             await self.disconnect_controller(controller)
1603
1604 1     async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1605         """
1606         Destroy pending machines in a given model
1607
1608         :param: only_manual:    Bool that indicates only manually provisioned
1609                                 machines should be destroyed (if True), or that
1610                                 all pending machines should be destroyed
1611         """
1612 0         status = await model.get_status()
1613 0         for machine_id in status.machines:
1614 0             machine_status = status.machines[machine_id]
1615 0             if machine_status.agent_status.status == "pending":
1616 0                 if only_manual and not machine_status.instance_id.startswith("manual:"):
1617 0                     break
1618 0                 machine = model.machines[machine_id]
1619 0                 await machine.destroy(force=True)
1620
1621 1     async def configure_application(
1622         self, model_name: str, application_name: str, config: dict = None
1623     ):
1624         """Configure application
1625
1626         :param: model_name:         Model name
1627         :param: application_name:   Application name
1628         :param: config:             Config to apply to the charm
1629         """
1630 1         self.log.debug("Configuring application {}".format(application_name))
1631
1632 1         if config:
1633 1             controller = await self.get_controller()
1634 1             model = None
1635 1             try:
1636 1                 model = await self.get_model(controller, model_name)
1637 1                 application = self._get_application(
1638                     model,
1639                     application_name=application_name,
1640                 )
1641 1                 await application.set_config(config)
1642             finally:
1643 1                 if model:
1644 1                     await self.disconnect_model(model)
1645 1                 await self.disconnect_controller(controller)
1646
1647 1     async def health_check(self, interval: float = 300.0):
1648         """
1649         Health check to make sure controller and controller_model connections are OK
1650
1651         :param: interval: Time in seconds between checks
1652         """
1653 1         controller = None
1654 1         while True:
1655 1             try:
1656 1                 controller = await self.get_controller()
1657                 # self.log.debug("VCA is alive")
1658 1             except Exception as e:
1659 0                 self.log.error("Health check to VCA failed: {}".format(e))
1660             finally:
1661 1                 await self.disconnect_controller(controller)
1662 0             await asyncio.sleep(interval)
1663
1664 1     async def list_models(self, contains: str = None) -> [str]:
1665         """List models with certain names
1666
1667         :param: contains:   String that is contained in model name
1668
1669         :retur: [models] Returns list of model names
1670         """
1671
1672 1         controller = await self.get_controller()
1673 1         try:
1674 1             models = await controller.list_models()
1675 1             if contains:
1676 1                 models = [model for model in models if contains in model]
1677 1             return models
1678         finally:
1679 1             await self.disconnect_controller(controller)
1680
1681 1     async def _list_offers(
1682         self, model_name: str, offer_name: str = None
1683     ) -> QueryApplicationOffersResults:
1684         """
1685         List offers within a model
1686
1687         :param: model_name: Model name
1688         :param: offer_name: Offer name to filter.
1689
1690         :return: Returns application offers results in the model
1691         """
1692
1693 1         controller = await self.get_controller()
1694 1         try:
1695 1             offers = (await controller.list_offers(model_name)).results
1696 1             if offer_name:
1697 1                 matching_offer = []
1698 1                 for offer in offers:
1699 1                     if offer.offer_name == offer_name:
1700 1                         matching_offer.append(offer)
1701 1                         break
1702 1                 offers = matching_offer
1703 1             return offers
1704         finally:
1705 1             await self.disconnect_controller(controller)
1706
1707 1     async def add_k8s(
1708         self,
1709         name: str,
1710         rbac_id: str,
1711         token: str,
1712         client_cert_data: str,
1713         configuration: Configuration,
1714         storage_class: str,
1715         credential_name: str = None,
1716     ):
1717         """
1718         Add a Kubernetes cloud to the controller
1719
1720         Similar to the `juju add-k8s` command in the CLI
1721
1722         :param: name:               Name for the K8s cloud
1723         :param: configuration:      Kubernetes configuration object
1724         :param: storage_class:      Storage Class to use in the cloud
1725         :param: credential_name:    Storage Class to use in the cloud
1726         """
1727
1728 1         if not storage_class:
1729 1             raise Exception("storage_class must be a non-empty string")
1730 1         if not name:
1731 1             raise Exception("name must be a non-empty string")
1732 1         if not configuration:
1733 1             raise Exception("configuration must be provided")
1734
1735 1         endpoint = configuration.host
1736 1         credential = self.get_k8s_cloud_credential(
1737             configuration,
1738             client_cert_data,
1739             token,
1740         )
1741 1         credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1742 1         cloud = client.Cloud(
1743             type_="kubernetes",
1744             auth_types=[credential.auth_type],
1745             endpoint=endpoint,
1746             ca_certificates=[client_cert_data],
1747             config={
1748                 "operator-storage": storage_class,
1749                 "workload-storage": storage_class,
1750             },
1751         )
1752
1753 1         return await self.add_cloud(
1754             name, cloud, credential, credential_name=credential_name
1755         )
1756
1757 1     def get_k8s_cloud_credential(
1758         self,
1759         configuration: Configuration,
1760         client_cert_data: str,
1761         token: str = None,
1762     ) -> client.CloudCredential:
1763 1         attrs = {}
1764         # TODO: Test with AKS
1765 1         key = None  # open(configuration.key_file, "r").read()
1766 1         username = configuration.username
1767 1         password = configuration.password
1768
1769 1         if client_cert_data:
1770 1             attrs["ClientCertificateData"] = client_cert_data
1771 1         if key:
1772 0             attrs["ClientKeyData"] = key
1773 1         if token:
1774 1             if username or password:
1775 1                 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1776 1             attrs["Token"] = token
1777
1778 1         auth_type = None
1779 1         if key:
1780 0             auth_type = "oauth2"
1781 0             if client_cert_data:
1782 0                 auth_type = "oauth2withcert"
1783 0             if not token:
1784 0                 raise JujuInvalidK8sConfiguration(
1785                     "missing token for auth type {}".format(auth_type)
1786                 )
1787 1         elif username:
1788 1             if not password:
1789 1                 self.log.debug(
1790                     "credential for user {} has empty password".format(username)
1791                 )
1792 1             attrs["username"] = username
1793 1             attrs["password"] = password
1794 1             if client_cert_data:
1795 1                 auth_type = "userpasswithcert"
1796             else:
1797 1                 auth_type = "userpass"
1798 1         elif client_cert_data and token:
1799 1             auth_type = "certificate"
1800         else:
1801 1             raise JujuInvalidK8sConfiguration("authentication method not supported")
1802 1         return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1803
1804 1     async def add_cloud(
1805         self,
1806         name: str,
1807         cloud: Cloud,
1808         credential: CloudCredential = None,
1809         credential_name: str = None,
1810     ) -> Cloud:
1811         """
1812         Add cloud to the controller
1813
1814         :param: name:               Name of the cloud to be added
1815         :param: cloud:              Cloud object
1816         :param: credential:         CloudCredentials object for the cloud
1817         :param: credential_name:    Credential name.
1818                                     If not defined, cloud of the name will be used.
1819         """
1820 1         controller = await self.get_controller()
1821 1         try:
1822 1             _ = await controller.add_cloud(name, cloud)
1823 1             if credential:
1824 1                 await controller.add_credential(
1825                     credential_name or name, credential=credential, cloud=name
1826                 )
1827             # Need to return the object returned by the controller.add_cloud() function
1828             # I'm returning the original value now until this bug is fixed:
1829             #   https://github.com/juju/python-libjuju/issues/443
1830 1             return cloud
1831         finally:
1832 1             await self.disconnect_controller(controller)
1833
1834 1     async def remove_cloud(self, name: str):
1835         """
1836         Remove cloud
1837
1838         :param: name:   Name of the cloud to be removed
1839         """
1840 1         controller = await self.get_controller()
1841 1         try:
1842 1             await controller.remove_cloud(name)
1843 1         except juju.errors.JujuError as e:
1844 0             if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1845 0                 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1846             else:
1847 0                 raise e
1848         finally:
1849 1             await self.disconnect_controller(controller)
1850
1851 1     @retry(
1852         attempts=20, delay=5, fallback=JujuLeaderUnitNotFound(), callback=retry_callback
1853     )
1854 1     async def _get_leader_unit(self, application: Application) -> Unit:
1855 1         unit = None
1856 1         for u in application.units:
1857 1             if await u.is_leader_from_status():
1858 1                 unit = u
1859 1                 break
1860 1         if not unit:
1861 1             raise Exception()
1862 1         return unit
1863
1864 1     async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1865         """
1866         Get cloud credentials
1867
1868         :param: cloud: Cloud object. The returned credentials will be from this cloud.
1869
1870         :return: List of credentials object associated to the specified cloud
1871
1872         """
1873 0         controller = await self.get_controller()
1874 0         try:
1875 0             facade = client.CloudFacade.from_connection(controller.connection())
1876 0             cloud_cred_tag = tag.credential(
1877                 cloud.name, self.vca_connection.data.user, cloud.credential_name
1878             )
1879 0             params = [client.Entity(cloud_cred_tag)]
1880 0             return (await facade.Credential(params)).results
1881         finally:
1882 0             await self.disconnect_controller(controller)
1883
1884 1     async def check_application_exists(self, model_name, application_name) -> bool:
1885         """Check application exists
1886
1887         :param: model_name:         Model Name
1888         :param: application_name:   Application Name
1889
1890         :return: Boolean
1891         """
1892
1893 1         model = None
1894 1         controller = await self.get_controller()
1895 1         try:
1896 1             model = await self.get_model(controller, model_name)
1897 1             self.log.debug(
1898                 "Checking if application {} exists in model {}".format(
1899                     application_name, model_name
1900                 )
1901             )
1902 1             return self._get_application(model, application_name) is not None
1903         finally:
1904 1             if model:
1905 1                 await self.disconnect_model(model)
1906 1             await self.disconnect_controller(controller)