Code Coverage

Cobertura Coverage Report > n2vc >

libjuju.py

Trend

File Coverage summary

NameClassesLinesConditionals
libjuju.py
100%
1/1
72%
515/712
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
libjuju.py
72%
515/712
N/A

Source

n2vc/libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #     Unless required by applicable law or agreed to in writing, software
10 #     distributed under the License is distributed on an "AS IS" BASIS,
11 #     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #     See the License for the specific language governing permissions and
13 #     limitations under the License.
14
15 1 import asyncio
16 1 import logging
17 1 import typing
18
19 1 import time
20
21 1 import juju.errors
22 1 from juju.model import Model
23 1 from juju.machine import Machine
24 1 from juju.application import Application
25 1 from juju.unit import Unit
26 1 from juju.client._definitions import (
27     FullStatus,
28     QueryApplicationOffersResults,
29     Cloud,
30     CloudCredential,
31 )
32 1 from juju.controller import Controller
33 1 from juju.client import client
34 1 from juju import tag
35
36 1 from n2vc.definitions import Offer, RelationEndpoint
37 1 from n2vc.juju_watcher import JujuModelWatcher
38 1 from n2vc.provisioner import AsyncSSHProvisioner
39 1 from n2vc.n2vc_conn import N2VCConnector
40 1 from n2vc.exceptions import (
41     JujuMachineNotFound,
42     JujuApplicationNotFound,
43     JujuLeaderUnitNotFound,
44     JujuActionNotFound,
45     JujuControllerFailedConnecting,
46     JujuApplicationExists,
47     JujuInvalidK8sConfiguration,
48     JujuError,
49 )
50 1 from n2vc.vca.cloud import Cloud as VcaCloud
51 1 from n2vc.vca.connection import Connection
52 1 from kubernetes.client.configuration import Configuration
53 1 from retrying_async import retry
54
55
56 1 RBAC_LABEL_KEY_NAME = "rbac-id"
57
58
59 1 class Libjuju:
60 1     def __init__(
61         self,
62         vca_connection: Connection,
63         loop: asyncio.AbstractEventLoop = None,
64         log: logging.Logger = None,
65         n2vc: N2VCConnector = None,
66     ):
67         """
68         Constructor
69
70         :param: vca_connection:         n2vc.vca.connection object
71         :param: loop:                   Asyncio loop
72         :param: log:                    Logger
73         :param: n2vc:                   N2VC object
74         """
75
76 1         self.log = log or logging.getLogger("Libjuju")
77 1         self.n2vc = n2vc
78 1         self.vca_connection = vca_connection
79
80 1         self.loop = loop or asyncio.get_event_loop()
81 1         self.loop.set_exception_handler(self.handle_exception)
82 1         self.creating_model = asyncio.Lock(loop=self.loop)
83
84 1         if self.vca_connection.is_default:
85 1             self.health_check_task = self._create_health_check_task()
86
87 1     def _create_health_check_task(self):
88 1         return self.loop.create_task(self.health_check())
89
90 1     async def get_controller(self, timeout: float = 60.0) -> Controller:
91         """
92         Get controller
93
94         :param: timeout: Time in seconds to wait for controller to connect
95         """
96 1         controller = None
97 1         try:
98 1             controller = Controller()
99 1             await asyncio.wait_for(
100                 controller.connect(
101                     endpoint=self.vca_connection.data.endpoints,
102                     username=self.vca_connection.data.user,
103                     password=self.vca_connection.data.secret,
104                     cacert=self.vca_connection.data.cacert,
105                 ),
106                 timeout=timeout,
107             )
108 1             if self.vca_connection.is_default:
109 1                 endpoints = await controller.api_endpoints
110 1                 if not all(
111                     endpoint in self.vca_connection.endpoints for endpoint in endpoints
112                 ):
113 1                     await self.vca_connection.update_endpoints(endpoints)
114 1             return controller
115 1         except asyncio.CancelledError as e:
116 1             raise e
117 1         except Exception as e:
118 1             self.log.error(
119                 "Failed connecting to controller: {}... {}".format(
120                     self.vca_connection.data.endpoints, e
121                 )
122             )
123 1             if controller:
124 1                 await self.disconnect_controller(controller)
125
126 1             raise JujuControllerFailedConnecting(
127                 f"Error connecting to Juju controller: {e}"
128             )
129
130 1     async def disconnect(self):
131         """Disconnect"""
132         # Cancel health check task
133 1         self.health_check_task.cancel()
134 1         self.log.debug("Libjuju disconnected!")
135
136 1     async def disconnect_model(self, model: Model):
137         """
138         Disconnect model
139
140         :param: model: Model that will be disconnected
141         """
142 1         await model.disconnect()
143
144 1     async def disconnect_controller(self, controller: Controller):
145         """
146         Disconnect controller
147
148         :param: controller: Controller that will be disconnected
149         """
150 1         if controller:
151 1             await controller.disconnect()
152
153 1     @retry(attempts=3, delay=5, timeout=None)
154 1     async def add_model(self, model_name: str, cloud: VcaCloud):
155         """
156         Create model
157
158         :param: model_name: Model name
159         :param: cloud: Cloud object
160         """
161
162         # Get controller
163 1         controller = await self.get_controller()
164 1         model = None
165 1         try:
166             # Block until other workers have finished model creation
167 1             while self.creating_model.locked():
168 0                 await asyncio.sleep(0.1)
169
170             # Create the model
171 1             async with self.creating_model:
172 1                 if await self.model_exists(model_name, controller=controller):
173 1                     return
174 1                 self.log.debug("Creating model {}".format(model_name))
175 1                 model = await controller.add_model(
176                     model_name,
177                     config=self.vca_connection.data.model_config,
178                     cloud_name=cloud.name,
179                     credential_name=cloud.credential_name,
180                 )
181 0         except juju.errors.JujuAPIError as e:
182 0             if "already exists" in e.message:
183 0                 pass
184             else:
185 0                 raise e
186         finally:
187 1             if model:
188 1                 await self.disconnect_model(model)
189 1             await self.disconnect_controller(controller)
190
191 1     async def get_executed_actions(self, model_name: str) -> list:
192         """
193         Get executed/history of actions for a model.
194
195         :param: model_name: Model name, str.
196         :return: List of executed actions for a model.
197         """
198 1         model = None
199 1         executed_actions = []
200 1         controller = await self.get_controller()
201 1         try:
202 1             model = await self.get_model(controller, model_name)
203             # Get all unique action names
204 1             actions = {}
205 1             for application in model.applications:
206 1                 application_actions = await self.get_actions(application, model_name)
207 1                 actions.update(application_actions)
208             # Get status of all actions
209 1             for application_action in actions:
210 1                 app_action_status_list = await model.get_action_status(
211                     name=application_action
212                 )
213 1                 for action_id, action_status in app_action_status_list.items():
214 1                     executed_action = {
215                         "id": action_id,
216                         "action": application_action,
217                         "status": action_status,
218                     }
219                     # Get action output by id
220 1                     action_status = await model.get_action_output(executed_action["id"])
221 1                     for k, v in action_status.items():
222 1                         executed_action[k] = v
223 1                     executed_actions.append(executed_action)
224 1         except Exception as e:
225 1             raise JujuError(
226                 "Error in getting executed actions for model: {}. Error: {}".format(
227                     model_name, str(e)
228                 )
229             )
230         finally:
231 1             if model:
232 1                 await self.disconnect_model(model)
233 1             await self.disconnect_controller(controller)
234 1         return executed_actions
235
236 1     async def get_application_configs(
237         self, model_name: str, application_name: str
238     ) -> dict:
239         """
240         Get available configs for an application.
241
242         :param: model_name: Model name, str.
243         :param: application_name: Application name, str.
244
245         :return: A dict which has key - action name, value - action description
246         """
247 1         model = None
248 1         application_configs = {}
249 1         controller = await self.get_controller()
250 1         try:
251 1             model = await self.get_model(controller, model_name)
252 1             application = self._get_application(
253                 model, application_name=application_name
254             )
255 1             application_configs = await application.get_config()
256 1         except Exception as e:
257 1             raise JujuError(
258                 "Error in getting configs for application: {} in model: {}. Error: {}".format(
259                     application_name, model_name, str(e)
260                 )
261             )
262         finally:
263 1             if model:
264 1                 await self.disconnect_model(model)
265 1             await self.disconnect_controller(controller)
266 1         return application_configs
267
268 1     @retry(attempts=3, delay=5)
269 1     async def get_model(self, controller: Controller, model_name: str) -> Model:
270         """
271         Get model from controller
272
273         :param: controller: Controller
274         :param: model_name: Model name
275
276         :return: Model: The created Juju model object
277         """
278 1         return await controller.get_model(model_name)
279
280 1     async def model_exists(
281         self, model_name: str, controller: Controller = None
282     ) -> bool:
283         """
284         Check if model exists
285
286         :param: controller: Controller
287         :param: model_name: Model name
288
289         :return bool
290         """
291 1         need_to_disconnect = False
292
293         # Get controller if not passed
294 1         if not controller:
295 1             controller = await self.get_controller()
296 1             need_to_disconnect = True
297
298         # Check if model exists
299 1         try:
300 1             return model_name in await controller.list_models()
301         finally:
302 1             if need_to_disconnect:
303 1                 await self.disconnect_controller(controller)
304
305 1     async def models_exist(self, model_names: [str]) -> (bool, list):
306         """
307         Check if models exists
308
309         :param: model_names: List of strings with model names
310
311         :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
312         """
313 1         if not model_names:
314 1             raise Exception(
315                 "model_names must be a non-empty array. Given value: {}".format(
316                     model_names
317                 )
318             )
319 1         non_existing_models = []
320 1         models = await self.list_models()
321 1         existing_models = list(set(models).intersection(model_names))
322 1         non_existing_models = list(set(model_names) - set(existing_models))
323
324 1         return (
325             len(non_existing_models) == 0,
326             non_existing_models,
327         )
328
329 1     async def get_model_status(self, model_name: str) -> FullStatus:
330         """
331         Get model status
332
333         :param: model_name: Model name
334
335         :return: Full status object
336         """
337 1         controller = await self.get_controller()
338 1         model = await self.get_model(controller, model_name)
339 1         try:
340 1             return await model.get_status()
341         finally:
342 1             await self.disconnect_model(model)
343 1             await self.disconnect_controller(controller)
344
345 1     async def create_machine(
346         self,
347         model_name: str,
348         machine_id: str = None,
349         db_dict: dict = None,
350         progress_timeout: float = None,
351         total_timeout: float = None,
352         series: str = "bionic",
353         wait: bool = True,
354     ) -> (Machine, bool):
355         """
356         Create machine
357
358         :param: model_name:         Model name
359         :param: machine_id:         Machine id
360         :param: db_dict:            Dictionary with data of the DB to write the updates
361         :param: progress_timeout:   Maximum time between two updates in the model
362         :param: total_timeout:      Timeout for the entity to be active
363         :param: series:             Series of the machine (xenial, bionic, focal, ...)
364         :param: wait:               Wait until machine is ready
365
366         :return: (juju.machine.Machine, bool):  Machine object and a boolean saying
367                                                 if the machine is new or it already existed
368         """
369 1         new = False
370 1         machine = None
371
372 1         self.log.debug(
373             "Creating machine (id={}) in model: {}".format(machine_id, model_name)
374         )
375
376         # Get controller
377 1         controller = await self.get_controller()
378
379         # Get model
380 1         model = await self.get_model(controller, model_name)
381 1         try:
382 1             if machine_id is not None:
383 1                 self.log.debug(
384                     "Searching machine (id={}) in model {}".format(
385                         machine_id, model_name
386                     )
387                 )
388
389                 # Get machines from model and get the machine with machine_id if exists
390 1                 machines = await model.get_machines()
391 1                 if machine_id in machines:
392 1                     self.log.debug(
393                         "Machine (id={}) found in model {}".format(
394                             machine_id, model_name
395                         )
396                     )
397 1                     machine = machines[machine_id]
398                 else:
399 1                     raise JujuMachineNotFound("Machine {} not found".format(machine_id))
400
401 1             if machine is None:
402 1                 self.log.debug("Creating a new machine in model {}".format(model_name))
403
404                 # Create machine
405 1                 machine = await model.add_machine(
406                     spec=None, constraints=None, disks=None, series=series
407                 )
408 1                 new = True
409
410                 # Wait until the machine is ready
411 1                 self.log.debug(
412                     "Wait until machine {} is ready in model {}".format(
413                         machine.entity_id, model_name
414                     )
415                 )
416 1                 if wait:
417 1                     await JujuModelWatcher.wait_for(
418                         model=model,
419                         entity=machine,
420                         progress_timeout=progress_timeout,
421                         total_timeout=total_timeout,
422                         db_dict=db_dict,
423                         n2vc=self.n2vc,
424                         vca_id=self.vca_connection._vca_id,
425                     )
426         finally:
427 1             await self.disconnect_model(model)
428 1             await self.disconnect_controller(controller)
429
430 1         self.log.debug(
431             "Machine {} ready at {} in model {}".format(
432                 machine.entity_id, machine.dns_name, model_name
433             )
434         )
435 1         return machine, new
436
437 1     async def provision_machine(
438         self,
439         model_name: str,
440         hostname: str,
441         username: str,
442         private_key_path: str,
443         db_dict: dict = None,
444         progress_timeout: float = None,
445         total_timeout: float = None,
446     ) -> str:
447         """
448         Manually provisioning of a machine
449
450         :param: model_name:         Model name
451         :param: hostname:           IP to access the machine
452         :param: username:           Username to login to the machine
453         :param: private_key_path:   Local path for the private key
454         :param: db_dict:            Dictionary with data of the DB to write the updates
455         :param: progress_timeout:   Maximum time between two updates in the model
456         :param: total_timeout:      Timeout for the entity to be active
457
458         :return: (Entity): Machine id
459         """
460 0         self.log.debug(
461             "Provisioning machine. model: {}, hostname: {}, username: {}".format(
462                 model_name, hostname, username
463             )
464         )
465
466         # Get controller
467 0         controller = await self.get_controller()
468
469         # Get model
470 0         model = await self.get_model(controller, model_name)
471
472 0         try:
473             # Get provisioner
474 0             provisioner = AsyncSSHProvisioner(
475                 host=hostname,
476                 user=username,
477                 private_key_path=private_key_path,
478                 log=self.log,
479             )
480
481             # Provision machine
482 0             params = await provisioner.provision_machine()
483
484 0             params.jobs = ["JobHostUnits"]
485
486 0             self.log.debug("Adding machine to model")
487 0             connection = model.connection()
488 0             client_facade = client.ClientFacade.from_connection(connection)
489
490 0             results = await client_facade.AddMachines(params=[params])
491 0             error = results.machines[0].error
492
493 0             if error:
494 0                 msg = "Error adding machine: {}".format(error.message)
495 0                 self.log.error(msg=msg)
496 0                 raise ValueError(msg)
497
498 0             machine_id = results.machines[0].machine
499
500 0             self.log.debug("Installing Juju agent into machine {}".format(machine_id))
501 0             asyncio.ensure_future(
502                 provisioner.install_agent(
503                     connection=connection,
504                     nonce=params.nonce,
505                     machine_id=machine_id,
506                     proxy=self.vca_connection.data.api_proxy,
507                     series=params.series,
508                 )
509             )
510
511 0             machine = None
512 0             for _ in range(10):
513 0                 machine_list = await model.get_machines()
514 0                 if machine_id in machine_list:
515 0                     self.log.debug("Machine {} found in model!".format(machine_id))
516 0                     machine = model.machines.get(machine_id)
517 0                     break
518 0                 await asyncio.sleep(2)
519
520 0             if machine is None:
521 0                 msg = "Machine {} not found in model".format(machine_id)
522 0                 self.log.error(msg=msg)
523 0                 raise JujuMachineNotFound(msg)
524
525 0             self.log.debug(
526                 "Wait until machine {} is ready in model {}".format(
527                     machine.entity_id, model_name
528                 )
529             )
530 0             await JujuModelWatcher.wait_for(
531                 model=model,
532                 entity=machine,
533                 progress_timeout=progress_timeout,
534                 total_timeout=total_timeout,
535                 db_dict=db_dict,
536                 n2vc=self.n2vc,
537                 vca_id=self.vca_connection._vca_id,
538             )
539 0         except Exception as e:
540 0             raise e
541         finally:
542 0             await self.disconnect_model(model)
543 0             await self.disconnect_controller(controller)
544
545 0         self.log.debug(
546             "Machine provisioned {} in model {}".format(machine_id, model_name)
547         )
548
549 0         return machine_id
550
551 1     async def deploy(
552         self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
553     ):
554         """
555         Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
556
557         :param: uri:            Path or Charm Store uri in which the charm or bundle can be found
558         :param: model_name:     Model name
559         :param: wait:           Indicates whether to wait or not until all applications are active
560         :param: timeout:        Time in seconds to wait until all applications are active
561         """
562 1         controller = await self.get_controller()
563 1         model = await self.get_model(controller, model_name)
564 1         try:
565 1             await model.deploy(uri, trust=True)
566 1             if wait:
567 1                 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
568 1                 self.log.debug("All units active in model {}".format(model_name))
569         finally:
570 1             await self.disconnect_model(model)
571 1             await self.disconnect_controller(controller)
572
573 1     async def add_unit(
574         self,
575         application_name: str,
576         model_name: str,
577         machine_id: str,
578         db_dict: dict = None,
579         progress_timeout: float = None,
580         total_timeout: float = None,
581     ):
582         """Add unit
583
584         :param: application_name:   Application name
585         :param: model_name:         Model name
586         :param: machine_id          Machine id
587         :param: db_dict:            Dictionary with data of the DB to write the updates
588         :param: progress_timeout:   Maximum time between two updates in the model
589         :param: total_timeout:      Timeout for the entity to be active
590
591         :return: None
592         """
593
594 1         model = None
595 1         controller = await self.get_controller()
596 1         try:
597 1             model = await self.get_model(controller, model_name)
598 1             application = self._get_application(model, application_name)
599
600 1             if application is not None:
601                 # Checks if the given machine id in the model,
602                 # otherwise function raises an error
603 1                 _machine, _series = self._get_machine_info(model, machine_id)
604
605 1                 self.log.debug(
606                     "Adding unit (machine {}) to application {} in model ~{}".format(
607                         machine_id, application_name, model_name
608                     )
609                 )
610
611 1                 await application.add_unit(to=machine_id)
612
613 1                 await JujuModelWatcher.wait_for(
614                     model=model,
615                     entity=application,
616                     progress_timeout=progress_timeout,
617                     total_timeout=total_timeout,
618                     db_dict=db_dict,
619                     n2vc=self.n2vc,
620                     vca_id=self.vca_connection._vca_id,
621                 )
622 1                 self.log.debug(
623                     "Unit is added to application {} in model {}".format(
624                         application_name, model_name
625                     )
626                 )
627             else:
628 1                 raise JujuApplicationNotFound(
629                     "Application {} not exists".format(application_name)
630                 )
631         finally:
632 1             if model:
633 1                 await self.disconnect_model(model)
634 1             await self.disconnect_controller(controller)
635
636 1     async def destroy_unit(
637         self,
638         application_name: str,
639         model_name: str,
640         machine_id: str,
641         total_timeout: float = None,
642     ):
643         """Destroy unit
644
645         :param: application_name:   Application name
646         :param: model_name:         Model name
647         :param: machine_id          Machine id
648         :param: total_timeout:      Timeout for the entity to be active
649
650         :return: None
651         """
652
653 1         model = None
654 1         controller = await self.get_controller()
655 1         try:
656 1             model = await self.get_model(controller, model_name)
657 1             application = self._get_application(model, application_name)
658
659 1             if application is None:
660 1                 raise JujuApplicationNotFound(
661                     "Application not found: {} (model={})".format(
662                         application_name, model_name
663                     )
664                 )
665
666 1             unit = self._get_unit(application, machine_id)
667 1             if not unit:
668 1                 raise JujuError(
669                     "A unit with machine id {} not in available units".format(
670                         machine_id
671                     )
672                 )
673
674 1             unit_name = unit.name
675
676 1             self.log.debug(
677                 "Destroying unit {} from application {} in model {}".format(
678                     unit_name, application_name, model_name
679                 )
680             )
681 1             await application.destroy_unit(unit_name)
682
683 1             self.log.debug(
684                 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
685                     unit_name, application_name, model_name
686                 )
687             )
688
689             # TODO: Add functionality in the Juju watcher to replace this kind of blocks
690 1             if total_timeout is None:
691 0                 total_timeout = 3600
692 1             end = time.time() + total_timeout
693 1             while time.time() < end:
694 0                 if not self._get_unit(application, machine_id):
695 0                     self.log.debug(
696                         "The unit {} was destroyed in application {} (model={}) ".format(
697                             unit_name, application_name, model_name
698                         )
699                     )
700 0                     return
701 0                 await asyncio.sleep(5)
702 1             self.log.debug(
703                 "Unit {} is destroyed from application {} in model {}".format(
704                     unit_name, application_name, model_name
705                 )
706             )
707         finally:
708 1             if model:
709 1                 await self.disconnect_model(model)
710 1             await self.disconnect_controller(controller)
711
712 1     async def deploy_charm(
713         self,
714         application_name: str,
715         path: str,
716         model_name: str,
717         machine_id: str,
718         db_dict: dict = None,
719         progress_timeout: float = None,
720         total_timeout: float = None,
721         config: dict = None,
722         series: str = None,
723         num_units: int = 1,
724     ):
725         """Deploy charm
726
727         :param: application_name:   Application name
728         :param: path:               Local path to the charm
729         :param: model_name:         Model name
730         :param: machine_id          ID of the machine
731         :param: db_dict:            Dictionary with data of the DB to write the updates
732         :param: progress_timeout:   Maximum time between two updates in the model
733         :param: total_timeout:      Timeout for the entity to be active
734         :param: config:             Config for the charm
735         :param: series:             Series of the charm
736         :param: num_units:          Number of units
737
738         :return: (juju.application.Application): Juju application
739         """
740 1         self.log.debug(
741             "Deploying charm {} to machine {} in model ~{}".format(
742                 application_name, machine_id, model_name
743             )
744         )
745 1         self.log.debug("charm: {}".format(path))
746
747         # Get controller
748 1         controller = await self.get_controller()
749
750         # Get model
751 1         model = await self.get_model(controller, model_name)
752
753 1         try:
754 1             if application_name not in model.applications:
755 1                 if machine_id is not None:
756 1                     machine, series = self._get_machine_info(model, machine_id)
757
758 1                 application = await model.deploy(
759                     entity_url=path,
760                     application_name=application_name,
761                     channel="stable",
762                     num_units=1,
763                     series=series,
764                     to=machine_id,
765                     config=config,
766                 )
767
768 1                 self.log.debug(
769                     "Wait until application {} is ready in model {}".format(
770                         application_name, model_name
771                     )
772                 )
773 1                 if num_units > 1:
774 1                     for _ in range(num_units - 1):
775 1                         m, _ = await self.create_machine(model_name, wait=False)
776 1                         await application.add_unit(to=m.entity_id)
777
778 1                 await JujuModelWatcher.wait_for(
779                     model=model,
780                     entity=application,
781                     progress_timeout=progress_timeout,
782                     total_timeout=total_timeout,
783                     db_dict=db_dict,
784                     n2vc=self.n2vc,
785                     vca_id=self.vca_connection._vca_id,
786                 )
787 1                 self.log.debug(
788                     "Application {} is ready in model {}".format(
789                         application_name, model_name
790                     )
791                 )
792             else:
793 1                 raise JujuApplicationExists(
794                     "Application {} exists".format(application_name)
795                 )
796 1         except juju.errors.JujuError as e:
797 0             if "already exists" in e.message:
798 0                 raise JujuApplicationExists(
799                     "Application {} exists".format(application_name)
800                 )
801             else:
802 0                 raise e
803         finally:
804 1             await self.disconnect_model(model)
805 1             await self.disconnect_controller(controller)
806
807 1         return application
808
809 1     async def upgrade_charm(
810         self,
811         application_name: str,
812         path: str,
813         model_name: str,
814         total_timeout: float = None,
815         **kwargs,
816     ):
817         """Upgrade Charm
818
819         :param: application_name:   Application name
820         :param: model_name:         Model name
821         :param: path:               Local path to the charm
822         :param: total_timeout:      Timeout for the entity to be active
823
824         :return: (str, str): (output and status)
825         """
826
827 0         self.log.debug(
828             "Upgrading charm {} in model {} from path {}".format(
829                 application_name, model_name, path
830             )
831         )
832
833 0         await self.resolve_application(
834             model_name=model_name, application_name=application_name
835         )
836
837         # Get controller
838 0         controller = await self.get_controller()
839
840         # Get model
841 0         model = await self.get_model(controller, model_name)
842
843 0         try:
844             # Get application
845 0             application = self._get_application(
846                 model,
847                 application_name=application_name,
848             )
849 0             if application is None:
850 0                 raise JujuApplicationNotFound(
851                     "Cannot find application {} to upgrade".format(application_name)
852                 )
853
854 0             await application.refresh(path=path)
855
856 0             self.log.debug(
857                 "Wait until charm upgrade is completed for application {} (model={})".format(
858                     application_name, model_name
859                 )
860             )
861
862 0             await JujuModelWatcher.ensure_units_idle(
863                 model=model, application=application
864             )
865
866 0             if application.status == "error":
867 0                 error_message = "Unknown"
868 0                 for unit in application.units:
869 0                     if (
870                         unit.workload_status == "error"
871                         and unit.workload_status_message != ""
872                     ):
873 0                         error_message = unit.workload_status_message
874
875 0                 message = "Application {} failed update in {}: {}".format(
876                     application_name, model_name, error_message
877                 )
878 0                 self.log.error(message)
879 0                 raise JujuError(message=message)
880
881 0             self.log.debug(
882                 "Application {} is ready in model {}".format(
883                     application_name, model_name
884                 )
885             )
886
887         finally:
888 0             await self.disconnect_model(model)
889 0             await self.disconnect_controller(controller)
890
891 0         return application
892
893 1     async def resolve_application(self, model_name: str, application_name: str):
894 0         controller = await self.get_controller()
895 0         model = await self.get_model(controller, model_name)
896
897 0         try:
898 0             application = self._get_application(
899                 model,
900                 application_name=application_name,
901             )
902 0             if application is None:
903 0                 raise JujuApplicationNotFound(
904                     "Cannot find application {} to resolve".format(application_name)
905                 )
906
907 0             while application.status == "error":
908 0                 for unit in application.units:
909 0                     if unit.workload_status == "error":
910 0                         self.log.debug(
911                             "Model {}, Application {}, Unit {} in error state, resolving".format(
912                                 model_name, application_name, unit.entity_id
913                             )
914                         )
915 0                         try:
916 0                             await unit.resolved(retry=False)
917 0                         except Exception:
918 0                             pass
919
920 0                 await asyncio.sleep(1)
921
922         finally:
923 0             await self.disconnect_model(model)
924 0             await self.disconnect_controller(controller)
925
926 1     async def resolve(self, model_name: str):
927 0         controller = await self.get_controller()
928 0         model = await self.get_model(controller, model_name)
929 0         all_units_active = False
930 0         try:
931 0             while not all_units_active:
932 0                 all_units_active = True
933 0                 for application_name, application in model.applications.items():
934 0                     if application.status == "error":
935 0                         for unit in application.units:
936 0                             if unit.workload_status == "error":
937 0                                 self.log.debug(
938                                     "Model {}, Application {}, Unit {} in error state, resolving".format(
939                                         model_name, application_name, unit.entity_id
940                                     )
941                                 )
942 0                                 try:
943 0                                     await unit.resolved(retry=False)
944 0                                     all_units_active = False
945 0                                 except Exception:
946 0                                     pass
947
948 0                 if not all_units_active:
949 0                     await asyncio.sleep(5)
950         finally:
951 0             await self.disconnect_model(model)
952 0             await self.disconnect_controller(controller)
953
954 1     async def scale_application(
955         self,
956         model_name: str,
957         application_name: str,
958         scale: int = 1,
959         total_timeout: float = None,
960     ):
961         """
962         Scale application (K8s)
963
964         :param: model_name:         Model name
965         :param: application_name:   Application name
966         :param: scale:              Scale to which to set this application
967         :param: total_timeout:      Timeout for the entity to be active
968         """
969
970 1         model = None
971 1         controller = await self.get_controller()
972 1         try:
973 1             model = await self.get_model(controller, model_name)
974
975 1             self.log.debug(
976                 "Scaling application {} in model {}".format(
977                     application_name, model_name
978                 )
979             )
980 1             application = self._get_application(model, application_name)
981 1             if application is None:
982 1                 raise JujuApplicationNotFound("Cannot scale application")
983 1             await application.scale(scale=scale)
984             # Wait until application is scaled in model
985 1             self.log.debug(
986                 "Waiting for application {} to be scaled in model {}...".format(
987                     application_name, model_name
988                 )
989             )
990 1             if total_timeout is None:
991 1                 total_timeout = 1800
992 1             end = time.time() + total_timeout
993 1             while time.time() < end:
994 1                 application_scale = self._get_application_count(model, application_name)
995                 # Before calling wait_for_model function,
996                 # wait until application unit count and scale count are equal.
997                 # Because there is a delay before scaling triggers in Juju model.
998 1                 if application_scale == scale:
999 1                     await JujuModelWatcher.wait_for_model(
1000                         model=model, timeout=total_timeout
1001                     )
1002 1                     self.log.debug(
1003                         "Application {} is scaled in model {}".format(
1004                             application_name, model_name
1005                         )
1006                     )
1007 1                     return
1008 0                 await asyncio.sleep(5)
1009 1             raise Exception(
1010                 "Timeout waiting for application {} in model {} to be scaled".format(
1011                     application_name, model_name
1012                 )
1013             )
1014         finally:
1015 1             if model:
1016 1                 await self.disconnect_model(model)
1017 1             await self.disconnect_controller(controller)
1018
1019 1     def _get_application_count(self, model: Model, application_name: str) -> int:
1020         """Get number of units of the application
1021
1022         :param: model:              Model object
1023         :param: application_name:   Application name
1024
1025         :return: int (or None if application doesn't exist)
1026         """
1027 1         application = self._get_application(model, application_name)
1028 1         if application is not None:
1029 1             return len(application.units)
1030
1031 1     def _get_application(self, model: Model, application_name: str) -> Application:
1032         """Get application
1033
1034         :param: model:              Model object
1035         :param: application_name:   Application name
1036
1037         :return: juju.application.Application (or None if it doesn't exist)
1038         """
1039 1         if model.applications and application_name in model.applications:
1040 1             return model.applications[application_name]
1041
1042 1     def _get_unit(self, application: Application, machine_id: str) -> Unit:
1043         """Get unit
1044
1045         :param: application:        Application object
1046         :param: machine_id:         Machine id
1047
1048         :return: Unit
1049         """
1050 1         unit = None
1051 1         for u in application.units:
1052 1             if u.machine_id == machine_id:
1053 1                 unit = u
1054 1                 break
1055 1         return unit
1056
1057 1     def _get_machine_info(
1058         self,
1059         model,
1060         machine_id: str,
1061     ) -> (str, str):
1062         """Get machine info
1063
1064         :param: model:          Model object
1065         :param: machine_id:     Machine id
1066
1067         :return: (str, str): (machine, series)
1068         """
1069 1         if machine_id not in model.machines:
1070 1             msg = "Machine {} not found in model".format(machine_id)
1071 1             self.log.error(msg=msg)
1072 1             raise JujuMachineNotFound(msg)
1073 1         machine = model.machines[machine_id]
1074 1         return machine, machine.series
1075
1076 1     async def execute_action(
1077         self,
1078         application_name: str,
1079         model_name: str,
1080         action_name: str,
1081         db_dict: dict = None,
1082         machine_id: str = None,
1083         progress_timeout: float = None,
1084         total_timeout: float = None,
1085         **kwargs,
1086     ):
1087         """Execute action
1088
1089         :param: application_name:   Application name
1090         :param: model_name:         Model name
1091         :param: action_name:        Name of the action
1092         :param: db_dict:            Dictionary with data of the DB to write the updates
1093         :param: machine_id          Machine id
1094         :param: progress_timeout:   Maximum time between two updates in the model
1095         :param: total_timeout:      Timeout for the entity to be active
1096
1097         :return: (str, str): (output and status)
1098         """
1099 1         self.log.debug(
1100             "Executing action {} using params {}".format(action_name, kwargs)
1101         )
1102         # Get controller
1103 1         controller = await self.get_controller()
1104
1105         # Get model
1106 1         model = await self.get_model(controller, model_name)
1107
1108 1         try:
1109             # Get application
1110 1             application = self._get_application(
1111                 model,
1112                 application_name=application_name,
1113             )
1114 1             if application is None:
1115 1                 raise JujuApplicationNotFound("Cannot execute action")
1116             # Racing condition:
1117             #   Ocassionally, self._get_leader_unit() will return None
1118             #   because the leader elected hook has not been triggered yet.
1119             #   Therefore, we are doing some retries. If it happens again,
1120             #   re-open bug 1236
1121 1             if machine_id is None:
1122 1                 unit = await self._get_leader_unit(application)
1123 1                 self.log.debug(
1124                     "Action {} is being executed on the leader unit {}".format(
1125                         action_name, unit.name
1126                     )
1127                 )
1128             else:
1129 0                 unit = self._get_unit(application, machine_id)
1130 0                 if not unit:
1131 0                     raise JujuError(
1132                         "A unit with machine id {} not in available units".format(
1133                             machine_id
1134                         )
1135                     )
1136 0                 self.log.debug(
1137                     "Action {} is being executed on {} unit".format(
1138                         action_name, unit.name
1139                     )
1140                 )
1141
1142 1             actions = await application.get_actions()
1143
1144 1             if action_name not in actions:
1145 1                 raise JujuActionNotFound(
1146                     "Action {} not in available actions".format(action_name)
1147                 )
1148
1149 1             action = await unit.run_action(action_name, **kwargs)
1150
1151 1             self.log.debug(
1152                 "Wait until action {} is completed in application {} (model={})".format(
1153                     action_name, application_name, model_name
1154                 )
1155             )
1156 1             await JujuModelWatcher.wait_for(
1157                 model=model,
1158                 entity=action,
1159                 progress_timeout=progress_timeout,
1160                 total_timeout=total_timeout,
1161                 db_dict=db_dict,
1162                 n2vc=self.n2vc,
1163                 vca_id=self.vca_connection._vca_id,
1164             )
1165
1166 1             output = await model.get_action_output(action_uuid=action.entity_id)
1167 1             status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1168 1             status = (
1169                 status[action.entity_id] if action.entity_id in status else "failed"
1170             )
1171
1172 1             self.log.debug(
1173                 "Action {} completed with status {} in application {} (model={})".format(
1174                     action_name, action.status, application_name, model_name
1175                 )
1176             )
1177         finally:
1178 1             await self.disconnect_model(model)
1179 1             await self.disconnect_controller(controller)
1180
1181 1         return output, status
1182
1183 1     async def get_actions(self, application_name: str, model_name: str) -> dict:
1184         """Get list of actions
1185
1186         :param: application_name: Application name
1187         :param: model_name: Model name
1188
1189         :return: Dict with this format
1190             {
1191                 "action_name": "Description of the action",
1192                 ...
1193             }
1194         """
1195 1         self.log.debug(
1196             "Getting list of actions for application {}".format(application_name)
1197         )
1198
1199         # Get controller
1200 1         controller = await self.get_controller()
1201
1202         # Get model
1203 1         model = await self.get_model(controller, model_name)
1204
1205 1         try:
1206             # Get application
1207 1             application = self._get_application(
1208                 model,
1209                 application_name=application_name,
1210             )
1211
1212             # Return list of actions
1213 1             return await application.get_actions()
1214
1215         finally:
1216             # Disconnect from model and controller
1217 1             await self.disconnect_model(model)
1218 1             await self.disconnect_controller(controller)
1219
1220 1     async def get_metrics(self, model_name: str, application_name: str) -> dict:
1221         """Get the metrics collected by the VCA.
1222
1223         :param model_name The name or unique id of the network service
1224         :param application_name The name of the application
1225         """
1226 1         if not model_name or not application_name:
1227 1             raise Exception("model_name and application_name must be non-empty strings")
1228 1         metrics = {}
1229 1         controller = await self.get_controller()
1230 1         model = await self.get_model(controller, model_name)
1231 1         try:
1232 1             application = self._get_application(model, application_name)
1233 1             if application is not None:
1234 1                 metrics = await application.get_metrics()
1235         finally:
1236 1             self.disconnect_model(model)
1237 1             self.disconnect_controller(controller)
1238 1         return metrics
1239
1240 1     async def add_relation(
1241         self,
1242         model_name: str,
1243         endpoint_1: str,
1244         endpoint_2: str,
1245     ):
1246         """Add relation
1247
1248         :param: model_name:     Model name
1249         :param: endpoint_1      First endpoint name
1250                                 ("app:endpoint" format or directly the saas name)
1251         :param: endpoint_2:     Second endpoint name (^ same format)
1252         """
1253
1254 1         self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1255
1256         # Get controller
1257 1         controller = await self.get_controller()
1258
1259         # Get model
1260 1         model = await self.get_model(controller, model_name)
1261
1262         # Add relation
1263 1         try:
1264 1             await model.add_relation(endpoint_1, endpoint_2)
1265 1         except juju.errors.JujuAPIError as e:
1266 1             if "not found" in e.message:
1267 1                 self.log.warning("Relation not found: {}".format(e.message))
1268 1                 return
1269 1             if "already exists" in e.message:
1270 1                 self.log.warning("Relation already exists: {}".format(e.message))
1271 1                 return
1272             # another exception, raise it
1273 1             raise e
1274         finally:
1275 1             await self.disconnect_model(model)
1276 1             await self.disconnect_controller(controller)
1277
1278 1     async def offer(self, endpoint: RelationEndpoint) -> Offer:
1279         """
1280         Create an offer from a RelationEndpoint
1281
1282         :param: endpoint: Relation endpoint
1283
1284         :return: Offer object
1285         """
1286 1         model_name = endpoint.model_name
1287 1         offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1288 1         controller = await self.get_controller()
1289 1         model = None
1290 1         try:
1291 1             model = await self.get_model(controller, model_name)
1292 1             await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1293 1             offer_list = await self._list_offers(model_name, offer_name=offer_name)
1294 1             if offer_list:
1295 1                 return Offer(offer_list[0].offer_url)
1296             else:
1297 1                 raise Exception("offer was not created")
1298 1         except juju.errors.JujuError as e:
1299 0             if "application offer already exists" not in e.message:
1300 0                 raise e
1301         finally:
1302 1             if model:
1303 1                 self.disconnect_model(model)
1304 1             self.disconnect_controller(controller)
1305
1306 1     async def consume(
1307         self,
1308         model_name: str,
1309         offer: Offer,
1310         provider_libjuju: "Libjuju",
1311     ) -> str:
1312         """
1313         Consumes a remote offer in the model. Relations can be created later using "juju relate".
1314
1315         :param: model_name:             Model name
1316         :param: offer:                  Offer object to consume
1317         :param: provider_libjuju:       Libjuju object of the provider endpoint
1318
1319         :raises ParseError if there's a problem parsing the offer_url
1320         :raises JujuError if remote offer includes and endpoint
1321         :raises JujuAPIError if the operation is not successful
1322
1323         :returns: Saas name. It is the application name in the model that reference the remote application.
1324         """
1325 1         saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1326 1         if offer.vca_id:
1327 1             saas_name = f"{saas_name}-{offer.vca_id}"
1328 1         controller = await self.get_controller()
1329 1         model = None
1330 1         provider_controller = None
1331 1         try:
1332 1             model = await controller.get_model(model_name)
1333 1             provider_controller = await provider_libjuju.get_controller()
1334 1             await model.consume(
1335                 offer.url, application_alias=saas_name, controller=provider_controller
1336             )
1337 1             return saas_name
1338         finally:
1339 1             if model:
1340 1                 await self.disconnect_model(model)
1341 1             if provider_controller:
1342 1                 await provider_libjuju.disconnect_controller(provider_controller)
1343 1             await self.disconnect_controller(controller)
1344
1345 1     async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1346         """
1347         Destroy model
1348
1349         :param: model_name:     Model name
1350         :param: total_timeout:  Timeout
1351         """
1352
1353 0         controller = await self.get_controller()
1354 0         model = None
1355 0         try:
1356 0             if not await self.model_exists(model_name, controller=controller):
1357 0                 self.log.warn(f"Model {model_name} doesn't exist")
1358 0                 return
1359
1360 0             self.log.debug(f"Getting model {model_name} to be destroyed")
1361 0             model = await self.get_model(controller, model_name)
1362 0             self.log.debug(f"Destroying manual machines in model {model_name}")
1363             # Destroy machines that are manually provisioned
1364             # and still are in pending state
1365 0             await self._destroy_pending_machines(model, only_manual=True)
1366 0             await self.disconnect_model(model)
1367
1368 0             await asyncio.wait_for(
1369                 self._destroy_model(model_name, controller),
1370                 timeout=total_timeout,
1371             )
1372 0         except Exception as e:
1373 0             if not await self.model_exists(model_name, controller=controller):
1374 0                 self.log.warn(
1375                     f"Failed deleting model {model_name}: model doesn't exist"
1376                 )
1377 0                 return
1378 0             self.log.warn(f"Failed deleting model {model_name}: {e}")
1379 0             raise e
1380         finally:
1381 0             if model:
1382 0                 await self.disconnect_model(model)
1383 0             await self.disconnect_controller(controller)
1384
1385 1     async def _destroy_model(
1386         self,
1387         model_name: str,
1388         controller: Controller,
1389     ):
1390         """
1391         Destroy model from controller
1392
1393         :param: model: Model name to be removed
1394         :param: controller: Controller object
1395         :param: timeout: Timeout in seconds
1396         """
1397 0         self.log.debug(f"Destroying model {model_name}")
1398
1399 0         async def _destroy_model_gracefully(model_name: str, controller: Controller):
1400 0             self.log.info(f"Gracefully deleting model {model_name}")
1401 0             resolved = False
1402 0             while model_name in await controller.list_models():
1403 0                 if not resolved:
1404 0                     await self.resolve(model_name)
1405 0                     resolved = True
1406 0                 await controller.destroy_model(model_name, destroy_storage=True)
1407
1408 0                 await asyncio.sleep(5)
1409 0             self.log.info(f"Model {model_name} deleted gracefully")
1410
1411 0         async def _destroy_model_forcefully(model_name: str, controller: Controller):
1412 0             self.log.info(f"Forcefully deleting model {model_name}")
1413 0             while model_name in await controller.list_models():
1414 0                 await controller.destroy_model(
1415                     model_name, destroy_storage=True, force=True, max_wait=60
1416                 )
1417 0                 await asyncio.sleep(5)
1418 0             self.log.info(f"Model {model_name} deleted forcefully")
1419
1420 0         try:
1421 0             try:
1422 0                 await asyncio.wait_for(
1423                     _destroy_model_gracefully(model_name, controller), timeout=120
1424                 )
1425 0             except asyncio.TimeoutError:
1426 0                 await _destroy_model_forcefully(model_name, controller)
1427 0         except juju.errors.JujuError as e:
1428 0             if any("has been removed" in error for error in e.errors):
1429 0                 return
1430 0             if any("model not found" in error for error in e.errors):
1431 0                 return
1432 0             raise e
1433
1434 1     async def destroy_application(
1435         self, model_name: str, application_name: str, total_timeout: float
1436     ):
1437         """
1438         Destroy application
1439
1440         :param: model_name:         Model name
1441         :param: application_name:   Application name
1442         :param: total_timeout:      Timeout
1443         """
1444
1445 1         controller = await self.get_controller()
1446 1         model = None
1447
1448 1         try:
1449 1             model = await self.get_model(controller, model_name)
1450 1             self.log.debug(
1451                 "Destroying application {} in model {}".format(
1452                     application_name, model_name
1453                 )
1454             )
1455 1             application = self._get_application(model, application_name)
1456 1             if application:
1457 0                 await application.destroy()
1458             else:
1459 1                 self.log.warning("Application not found: {}".format(application_name))
1460
1461 1             self.log.debug(
1462                 "Waiting for application {} to be destroyed in model {}...".format(
1463                     application_name, model_name
1464                 )
1465             )
1466 1             if total_timeout is None:
1467 0                 total_timeout = 3600
1468 1             end = time.time() + total_timeout
1469 1             while time.time() < end:
1470 1                 if not self._get_application(model, application_name):
1471 1                     self.log.debug(
1472                         "The application {} was destroyed in model {} ".format(
1473                             application_name, model_name
1474                         )
1475                     )
1476 1                     return
1477 0                 await asyncio.sleep(5)
1478 1             raise Exception(
1479                 "Timeout waiting for application {} to be destroyed in model {}".format(
1480                     application_name, model_name
1481                 )
1482             )
1483         finally:
1484 1             if model is not None:
1485 1                 await self.disconnect_model(model)
1486 1             await self.disconnect_controller(controller)
1487
1488 1     async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1489         """
1490         Destroy pending machines in a given model
1491
1492         :param: only_manual:    Bool that indicates only manually provisioned
1493                                 machines should be destroyed (if True), or that
1494                                 all pending machines should be destroyed
1495         """
1496 0         status = await model.get_status()
1497 0         for machine_id in status.machines:
1498 0             machine_status = status.machines[machine_id]
1499 0             if machine_status.agent_status.status == "pending":
1500 0                 if only_manual and not machine_status.instance_id.startswith("manual:"):
1501 0                     break
1502 0                 machine = model.machines[machine_id]
1503 0                 await machine.destroy(force=True)
1504
1505 1     async def configure_application(
1506         self, model_name: str, application_name: str, config: dict = None
1507     ):
1508         """Configure application
1509
1510         :param: model_name:         Model name
1511         :param: application_name:   Application name
1512         :param: config:             Config to apply to the charm
1513         """
1514 1         self.log.debug("Configuring application {}".format(application_name))
1515
1516 1         if config:
1517 1             controller = await self.get_controller()
1518 1             model = None
1519 1             try:
1520 1                 model = await self.get_model(controller, model_name)
1521 1                 application = self._get_application(
1522                     model,
1523                     application_name=application_name,
1524                 )
1525 1                 await application.set_config(config)
1526             finally:
1527 1                 if model:
1528 1                     await self.disconnect_model(model)
1529 1                 await self.disconnect_controller(controller)
1530
1531 1     def handle_exception(self, loop, context):
1532         # All unhandled exceptions by libjuju are handled here.
1533 1         pass
1534
1535 1     async def health_check(self, interval: float = 300.0):
1536         """
1537         Health check to make sure controller and controller_model connections are OK
1538
1539         :param: interval: Time in seconds between checks
1540         """
1541 1         controller = None
1542         while True:
1543 1             try:
1544 1                 controller = await self.get_controller()
1545                 # self.log.debug("VCA is alive")
1546 1             except Exception as e:
1547 0                 self.log.error("Health check to VCA failed: {}".format(e))
1548             finally:
1549 1                 await self.disconnect_controller(controller)
1550 0             await asyncio.sleep(interval)
1551
1552 1     async def list_models(self, contains: str = None) -> [str]:
1553         """List models with certain names
1554
1555         :param: contains:   String that is contained in model name
1556
1557         :retur: [models] Returns list of model names
1558         """
1559
1560 1         controller = await self.get_controller()
1561 1         try:
1562 1             models = await controller.list_models()
1563 1             if contains:
1564 1                 models = [model for model in models if contains in model]
1565 1             return models
1566         finally:
1567 1             await self.disconnect_controller(controller)
1568
1569 1     async def _list_offers(
1570         self, model_name: str, offer_name: str = None
1571     ) -> QueryApplicationOffersResults:
1572         """
1573         List offers within a model
1574
1575         :param: model_name: Model name
1576         :param: offer_name: Offer name to filter.
1577
1578         :return: Returns application offers results in the model
1579         """
1580
1581 1         controller = await self.get_controller()
1582 1         try:
1583 1             offers = (await controller.list_offers(model_name)).results
1584 1             if offer_name:
1585 1                 matching_offer = []
1586 1                 for offer in offers:
1587 1                     if offer.offer_name == offer_name:
1588 1                         matching_offer.append(offer)
1589 1                         break
1590 1                 offers = matching_offer
1591 1             return offers
1592         finally:
1593 1             await self.disconnect_controller(controller)
1594
1595 1     async def add_k8s(
1596         self,
1597         name: str,
1598         rbac_id: str,
1599         token: str,
1600         client_cert_data: str,
1601         configuration: Configuration,
1602         storage_class: str,
1603         credential_name: str = None,
1604     ):
1605         """
1606         Add a Kubernetes cloud to the controller
1607
1608         Similar to the `juju add-k8s` command in the CLI
1609
1610         :param: name:               Name for the K8s cloud
1611         :param: configuration:      Kubernetes configuration object
1612         :param: storage_class:      Storage Class to use in the cloud
1613         :param: credential_name:    Storage Class to use in the cloud
1614         """
1615
1616 1         if not storage_class:
1617 1             raise Exception("storage_class must be a non-empty string")
1618 1         if not name:
1619 1             raise Exception("name must be a non-empty string")
1620 1         if not configuration:
1621 1             raise Exception("configuration must be provided")
1622
1623 1         endpoint = configuration.host
1624 1         credential = self.get_k8s_cloud_credential(
1625             configuration,
1626             client_cert_data,
1627             token,
1628         )
1629 1         credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1630 1         cloud = client.Cloud(
1631             type_="kubernetes",
1632             auth_types=[credential.auth_type],
1633             endpoint=endpoint,
1634             ca_certificates=[client_cert_data],
1635             config={
1636                 "operator-storage": storage_class,
1637                 "workload-storage": storage_class,
1638             },
1639         )
1640
1641 1         return await self.add_cloud(
1642             name, cloud, credential, credential_name=credential_name
1643         )
1644
1645 1     def get_k8s_cloud_credential(
1646         self,
1647         configuration: Configuration,
1648         client_cert_data: str,
1649         token: str = None,
1650     ) -> client.CloudCredential:
1651 1         attrs = {}
1652         # TODO: Test with AKS
1653 1         key = None  # open(configuration.key_file, "r").read()
1654 1         username = configuration.username
1655 1         password = configuration.password
1656
1657 1         if client_cert_data:
1658 1             attrs["ClientCertificateData"] = client_cert_data
1659 1         if key:
1660 0             attrs["ClientKeyData"] = key
1661 1         if token:
1662 1             if username or password:
1663 1                 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1664 1             attrs["Token"] = token
1665
1666 1         auth_type = None
1667 1         if key:
1668 0             auth_type = "oauth2"
1669 0             if client_cert_data:
1670 0                 auth_type = "oauth2withcert"
1671 0             if not token:
1672 0                 raise JujuInvalidK8sConfiguration(
1673                     "missing token for auth type {}".format(auth_type)
1674                 )
1675 1         elif username:
1676 1             if not password:
1677 1                 self.log.debug(
1678                     "credential for user {} has empty password".format(username)
1679                 )
1680 1             attrs["username"] = username
1681 1             attrs["password"] = password
1682 1             if client_cert_data:
1683 1                 auth_type = "userpasswithcert"
1684             else:
1685 1                 auth_type = "userpass"
1686 1         elif client_cert_data and token:
1687 1             auth_type = "certificate"
1688         else:
1689 1             raise JujuInvalidK8sConfiguration("authentication method not supported")
1690 1         return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1691
1692 1     async def add_cloud(
1693         self,
1694         name: str,
1695         cloud: Cloud,
1696         credential: CloudCredential = None,
1697         credential_name: str = None,
1698     ) -> Cloud:
1699         """
1700         Add cloud to the controller
1701
1702         :param: name:               Name of the cloud to be added
1703         :param: cloud:              Cloud object
1704         :param: credential:         CloudCredentials object for the cloud
1705         :param: credential_name:    Credential name.
1706                                     If not defined, cloud of the name will be used.
1707         """
1708 1         controller = await self.get_controller()
1709 1         try:
1710 1             _ = await controller.add_cloud(name, cloud)
1711 1             if credential:
1712 1                 await controller.add_credential(
1713                     credential_name or name, credential=credential, cloud=name
1714                 )
1715             # Need to return the object returned by the controller.add_cloud() function
1716             # I'm returning the original value now until this bug is fixed:
1717             #   https://github.com/juju/python-libjuju/issues/443
1718 1             return cloud
1719         finally:
1720 1             await self.disconnect_controller(controller)
1721
1722 1     async def remove_cloud(self, name: str):
1723         """
1724         Remove cloud
1725
1726         :param: name:   Name of the cloud to be removed
1727         """
1728 1         controller = await self.get_controller()
1729 1         try:
1730 1             await controller.remove_cloud(name)
1731 1         except juju.errors.JujuError as e:
1732 0             if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1733 0                 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1734             else:
1735 0                 raise e
1736         finally:
1737 1             await self.disconnect_controller(controller)
1738
1739 1     @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1740 1     async def _get_leader_unit(self, application: Application) -> Unit:
1741 1         unit = None
1742 1         for u in application.units:
1743 1             if await u.is_leader_from_status():
1744 1                 unit = u
1745 1                 break
1746 1         if not unit:
1747 1             raise Exception()
1748 1         return unit
1749
1750 1     async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1751         """
1752         Get cloud credentials
1753
1754         :param: cloud: Cloud object. The returned credentials will be from this cloud.
1755
1756         :return: List of credentials object associated to the specified cloud
1757
1758         """
1759 0         controller = await self.get_controller()
1760 0         try:
1761 0             facade = client.CloudFacade.from_connection(controller.connection())
1762 0             cloud_cred_tag = tag.credential(
1763                 cloud.name, self.vca_connection.data.user, cloud.credential_name
1764             )
1765 0             params = [client.Entity(cloud_cred_tag)]
1766 0             return (await facade.Credential(params)).results
1767         finally:
1768 0             await self.disconnect_controller(controller)
1769
1770 1     async def check_application_exists(self, model_name, application_name) -> bool:
1771         """Check application exists
1772
1773         :param: model_name:         Model Name
1774         :param: application_name:   Application Name
1775
1776         :return: Boolean
1777         """
1778
1779 1         model = None
1780 1         controller = await self.get_controller()
1781 1         try:
1782 1             model = await self.get_model(controller, model_name)
1783 1             self.log.debug(
1784                 "Checking if application {} exists in model {}".format(
1785                     application_name, model_name
1786                 )
1787             )
1788 1             return self._get_application(model, application_name) is not None
1789         finally:
1790 1             if model:
1791 1                 await self.disconnect_model(model)
1792 1             await self.disconnect_controller(controller)