Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import os
18 import typing
19 import yaml
20
21 import time
22
23 import juju.errors
24 from juju.bundle import BundleHandler
25 from juju.model import Model
26 from juju.machine import Machine
27 from juju.application import Application
28 from juju.unit import Unit
29 from juju.url import URL
30 from juju.version import DEFAULT_ARCHITECTURE
31 from juju.client._definitions import (
32 FullStatus,
33 QueryApplicationOffersResults,
34 Cloud,
35 CloudCredential,
36 )
37 from juju.controller import Controller
38 from juju.client import client
39 from juju import tag
40
41 from n2vc.definitions import Offer, RelationEndpoint
42 from n2vc.juju_watcher import JujuModelWatcher
43 from n2vc.provisioner import AsyncSSHProvisioner
44 from n2vc.n2vc_conn import N2VCConnector
45 from n2vc.exceptions import (
46 JujuMachineNotFound,
47 JujuApplicationNotFound,
48 JujuLeaderUnitNotFound,
49 JujuActionNotFound,
50 JujuControllerFailedConnecting,
51 JujuApplicationExists,
52 JujuInvalidK8sConfiguration,
53 JujuError,
54 )
55 from n2vc.vca.cloud import Cloud as VcaCloud
56 from n2vc.vca.connection import Connection
57 from kubernetes.client.configuration import Configuration
58 from retrying_async import retry
59
60
61 RBAC_LABEL_KEY_NAME = "rbac-id"
62
63
64 @asyncio.coroutine
65 def retry_callback(attempt, exc, args, kwargs, delay=0.5, *, loop):
66 # Specifically overridden from upstream implementation so it can
67 # continue to work with Python 3.10
68 yield from asyncio.sleep(attempt * delay)
69 return retry
70
71
72 class Libjuju:
73 def __init__(
74 self,
75 vca_connection: Connection,
76 log: logging.Logger = None,
77 n2vc: N2VCConnector = None,
78 ):
79 """
80 Constructor
81
82 :param: vca_connection: n2vc.vca.connection object
83 :param: log: Logger
84 :param: n2vc: N2VC object
85 """
86
87 self.log = log or logging.getLogger("Libjuju")
88 self.n2vc = n2vc
89 self.vca_connection = vca_connection
90
91 self.creating_model = asyncio.Lock()
92
93 if self.vca_connection.is_default:
94 self.health_check_task = self._create_health_check_task()
95
96 def _create_health_check_task(self):
97 return asyncio.get_event_loop().create_task(self.health_check())
98
99 async def get_controller(self, timeout: float = 60.0) -> Controller:
100 """
101 Get controller
102
103 :param: timeout: Time in seconds to wait for controller to connect
104 """
105 controller = None
106 try:
107 controller = Controller()
108 await asyncio.wait_for(
109 controller.connect(
110 endpoint=self.vca_connection.data.endpoints,
111 username=self.vca_connection.data.user,
112 password=self.vca_connection.data.secret,
113 cacert=self.vca_connection.data.cacert,
114 ),
115 timeout=timeout,
116 )
117 if self.vca_connection.is_default:
118 endpoints = await controller.api_endpoints
119 if not all(
120 endpoint in self.vca_connection.endpoints for endpoint in endpoints
121 ):
122 await self.vca_connection.update_endpoints(endpoints)
123 return controller
124 except asyncio.CancelledError as e:
125 raise e
126 except Exception as e:
127 self.log.error(
128 "Failed connecting to controller: {}... {}".format(
129 self.vca_connection.data.endpoints, e
130 )
131 )
132 if controller:
133 await self.disconnect_controller(controller)
134
135 raise JujuControllerFailedConnecting(
136 f"Error connecting to Juju controller: {e}"
137 )
138
139 async def disconnect(self):
140 """Disconnect"""
141 # Cancel health check task
142 self.health_check_task.cancel()
143 self.log.debug("Libjuju disconnected!")
144
145 async def disconnect_model(self, model: Model):
146 """
147 Disconnect model
148
149 :param: model: Model that will be disconnected
150 """
151 await model.disconnect()
152
153 async def disconnect_controller(self, controller: Controller):
154 """
155 Disconnect controller
156
157 :param: controller: Controller that will be disconnected
158 """
159 if controller:
160 await controller.disconnect()
161
162 @retry(attempts=3, delay=5, timeout=None, callback=retry_callback)
163 async def add_model(self, model_name: str, cloud: VcaCloud):
164 """
165 Create model
166
167 :param: model_name: Model name
168 :param: cloud: Cloud object
169 """
170
171 # Get controller
172 controller = await self.get_controller()
173 model = None
174 try:
175 # Block until other workers have finished model creation
176 while self.creating_model.locked():
177 await asyncio.sleep(0.1)
178
179 # Create the model
180 async with self.creating_model:
181 if await self.model_exists(model_name, controller=controller):
182 return
183 self.log.debug("Creating model {}".format(model_name))
184 model = await controller.add_model(
185 model_name,
186 config=self.vca_connection.data.model_config,
187 cloud_name=cloud.name,
188 credential_name=cloud.credential_name,
189 )
190 except juju.errors.JujuAPIError as e:
191 if "already exists" in e.message:
192 pass
193 else:
194 raise e
195 finally:
196 if model:
197 await self.disconnect_model(model)
198 await self.disconnect_controller(controller)
199
200 async def get_executed_actions(self, model_name: str) -> list:
201 """
202 Get executed/history of actions for a model.
203
204 :param: model_name: Model name, str.
205 :return: List of executed actions for a model.
206 """
207 model = None
208 executed_actions = []
209 controller = await self.get_controller()
210 try:
211 model = await self.get_model(controller, model_name)
212 # Get all unique action names
213 actions = {}
214 for application in model.applications:
215 application_actions = await self.get_actions(application, model_name)
216 actions.update(application_actions)
217 # Get status of all actions
218 for application_action in actions:
219 app_action_status_list = await model.get_action_status(
220 name=application_action
221 )
222 for action_id, action_status in app_action_status_list.items():
223 executed_action = {
224 "id": action_id,
225 "action": application_action,
226 "status": action_status,
227 }
228 # Get action output by id
229 action_status = await model.get_action_output(executed_action["id"])
230 for k, v in action_status.items():
231 executed_action[k] = v
232 executed_actions.append(executed_action)
233 except Exception as e:
234 raise JujuError(
235 "Error in getting executed actions for model: {}. Error: {}".format(
236 model_name, str(e)
237 )
238 )
239 finally:
240 if model:
241 await self.disconnect_model(model)
242 await self.disconnect_controller(controller)
243 return executed_actions
244
245 async def get_application_configs(
246 self, model_name: str, application_name: str
247 ) -> dict:
248 """
249 Get available configs for an application.
250
251 :param: model_name: Model name, str.
252 :param: application_name: Application name, str.
253
254 :return: A dict which has key - action name, value - action description
255 """
256 model = None
257 application_configs = {}
258 controller = await self.get_controller()
259 try:
260 model = await self.get_model(controller, model_name)
261 application = self._get_application(
262 model, application_name=application_name
263 )
264 application_configs = await application.get_config()
265 except Exception as e:
266 raise JujuError(
267 "Error in getting configs for application: {} in model: {}. Error: {}".format(
268 application_name, model_name, str(e)
269 )
270 )
271 finally:
272 if model:
273 await self.disconnect_model(model)
274 await self.disconnect_controller(controller)
275 return application_configs
276
277 @retry(attempts=3, delay=5, callback=retry_callback)
278 async def get_model(self, controller: Controller, model_name: str) -> Model:
279 """
280 Get model from controller
281
282 :param: controller: Controller
283 :param: model_name: Model name
284
285 :return: Model: The created Juju model object
286 """
287 return await controller.get_model(model_name)
288
289 async def model_exists(
290 self, model_name: str, controller: Controller = None
291 ) -> bool:
292 """
293 Check if model exists
294
295 :param: controller: Controller
296 :param: model_name: Model name
297
298 :return bool
299 """
300 need_to_disconnect = False
301
302 # Get controller if not passed
303 if not controller:
304 controller = await self.get_controller()
305 need_to_disconnect = True
306
307 # Check if model exists
308 try:
309 return model_name in await controller.list_models()
310 finally:
311 if need_to_disconnect:
312 await self.disconnect_controller(controller)
313
314 async def models_exist(self, model_names: [str]) -> (bool, list):
315 """
316 Check if models exists
317
318 :param: model_names: List of strings with model names
319
320 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
321 """
322 if not model_names:
323 raise Exception(
324 "model_names must be a non-empty array. Given value: {}".format(
325 model_names
326 )
327 )
328 non_existing_models = []
329 models = await self.list_models()
330 existing_models = list(set(models).intersection(model_names))
331 non_existing_models = list(set(model_names) - set(existing_models))
332
333 return (
334 len(non_existing_models) == 0,
335 non_existing_models,
336 )
337
338 async def get_model_status(self, model_name: str) -> FullStatus:
339 """
340 Get model status
341
342 :param: model_name: Model name
343
344 :return: Full status object
345 """
346 controller = await self.get_controller()
347 model = await self.get_model(controller, model_name)
348 try:
349 return await model.get_status()
350 finally:
351 await self.disconnect_model(model)
352 await self.disconnect_controller(controller)
353
354 async def create_machine(
355 self,
356 model_name: str,
357 machine_id: str = None,
358 db_dict: dict = None,
359 progress_timeout: float = None,
360 total_timeout: float = None,
361 series: str = "bionic",
362 wait: bool = True,
363 ) -> (Machine, bool):
364 """
365 Create machine
366
367 :param: model_name: Model name
368 :param: machine_id: Machine id
369 :param: db_dict: Dictionary with data of the DB to write the updates
370 :param: progress_timeout: Maximum time between two updates in the model
371 :param: total_timeout: Timeout for the entity to be active
372 :param: series: Series of the machine (xenial, bionic, focal, ...)
373 :param: wait: Wait until machine is ready
374
375 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
376 if the machine is new or it already existed
377 """
378 new = False
379 machine = None
380
381 self.log.debug(
382 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
383 )
384
385 # Get controller
386 controller = await self.get_controller()
387
388 # Get model
389 model = await self.get_model(controller, model_name)
390 try:
391 if machine_id is not None:
392 self.log.debug(
393 "Searching machine (id={}) in model {}".format(
394 machine_id, model_name
395 )
396 )
397
398 # Get machines from model and get the machine with machine_id if exists
399 machines = await model.get_machines()
400 if machine_id in machines:
401 self.log.debug(
402 "Machine (id={}) found in model {}".format(
403 machine_id, model_name
404 )
405 )
406 machine = machines[machine_id]
407 else:
408 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
409
410 if machine is None:
411 self.log.debug("Creating a new machine in model {}".format(model_name))
412
413 # Create machine
414 machine = await model.add_machine(
415 spec=None, constraints=None, disks=None, series=series
416 )
417 new = True
418
419 # Wait until the machine is ready
420 self.log.debug(
421 "Wait until machine {} is ready in model {}".format(
422 machine.entity_id, model_name
423 )
424 )
425 if wait:
426 await JujuModelWatcher.wait_for(
427 model=model,
428 entity=machine,
429 progress_timeout=progress_timeout,
430 total_timeout=total_timeout,
431 db_dict=db_dict,
432 n2vc=self.n2vc,
433 vca_id=self.vca_connection._vca_id,
434 )
435 finally:
436 await self.disconnect_model(model)
437 await self.disconnect_controller(controller)
438
439 self.log.debug(
440 "Machine {} ready at {} in model {}".format(
441 machine.entity_id, machine.dns_name, model_name
442 )
443 )
444 return machine, new
445
446 async def provision_machine(
447 self,
448 model_name: str,
449 hostname: str,
450 username: str,
451 private_key_path: str,
452 db_dict: dict = None,
453 progress_timeout: float = None,
454 total_timeout: float = None,
455 ) -> str:
456 """
457 Manually provisioning of a machine
458
459 :param: model_name: Model name
460 :param: hostname: IP to access the machine
461 :param: username: Username to login to the machine
462 :param: private_key_path: Local path for the private key
463 :param: db_dict: Dictionary with data of the DB to write the updates
464 :param: progress_timeout: Maximum time between two updates in the model
465 :param: total_timeout: Timeout for the entity to be active
466
467 :return: (Entity): Machine id
468 """
469 self.log.debug(
470 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
471 model_name, hostname, username
472 )
473 )
474
475 # Get controller
476 controller = await self.get_controller()
477
478 # Get model
479 model = await self.get_model(controller, model_name)
480
481 try:
482 # Get provisioner
483 provisioner = AsyncSSHProvisioner(
484 host=hostname,
485 user=username,
486 private_key_path=private_key_path,
487 log=self.log,
488 )
489
490 # Provision machine
491 params = await provisioner.provision_machine()
492
493 params.jobs = ["JobHostUnits"]
494
495 self.log.debug("Adding machine to model")
496 connection = model.connection()
497 client_facade = client.ClientFacade.from_connection(connection)
498
499 results = await client_facade.AddMachines(params=[params])
500 error = results.machines[0].error
501
502 if error:
503 msg = "Error adding machine: {}".format(error.message)
504 self.log.error(msg=msg)
505 raise ValueError(msg)
506
507 machine_id = results.machines[0].machine
508
509 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
510 asyncio.ensure_future(
511 provisioner.install_agent(
512 connection=connection,
513 nonce=params.nonce,
514 machine_id=machine_id,
515 proxy=self.vca_connection.data.api_proxy,
516 series=params.series,
517 )
518 )
519
520 machine = None
521 for _ in range(10):
522 machine_list = await model.get_machines()
523 if machine_id in machine_list:
524 self.log.debug("Machine {} found in model!".format(machine_id))
525 machine = model.machines.get(machine_id)
526 break
527 await asyncio.sleep(2)
528
529 if machine is None:
530 msg = "Machine {} not found in model".format(machine_id)
531 self.log.error(msg=msg)
532 raise JujuMachineNotFound(msg)
533
534 self.log.debug(
535 "Wait until machine {} is ready in model {}".format(
536 machine.entity_id, model_name
537 )
538 )
539 await JujuModelWatcher.wait_for(
540 model=model,
541 entity=machine,
542 progress_timeout=progress_timeout,
543 total_timeout=total_timeout,
544 db_dict=db_dict,
545 n2vc=self.n2vc,
546 vca_id=self.vca_connection._vca_id,
547 )
548 except Exception as e:
549 raise e
550 finally:
551 await self.disconnect_model(model)
552 await self.disconnect_controller(controller)
553
554 self.log.debug(
555 "Machine provisioned {} in model {}".format(machine_id, model_name)
556 )
557
558 return machine_id
559
560 async def deploy(
561 self,
562 uri: str,
563 model_name: str,
564 wait: bool = True,
565 timeout: float = 3600,
566 instantiation_params: dict = None,
567 ):
568 """
569 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
570
571 :param uri: Path or Charm Store uri in which the charm or bundle can be found
572 :param model_name: Model name
573 :param wait: Indicates whether to wait or not until all applications are active
574 :param timeout: Time in seconds to wait until all applications are active
575 :param instantiation_params: To be applied as overlay bundle over primary bundle.
576 """
577 controller = await self.get_controller()
578 model = await self.get_model(controller, model_name)
579 overlays = []
580 try:
581 await self._validate_instantiation_params(uri, model, instantiation_params)
582 overlays = self._get_overlays(model_name, instantiation_params)
583 await model.deploy(uri, trust=True, overlays=overlays)
584 if wait:
585 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
586 self.log.debug("All units active in model {}".format(model_name))
587 finally:
588 self._remove_overlay_file(overlays)
589 await self.disconnect_model(model)
590 await self.disconnect_controller(controller)
591
592 async def _validate_instantiation_params(
593 self, uri: str, model, instantiation_params: dict
594 ) -> None:
595 """Checks if all the applications in instantiation_params
596 exist ins the original bundle.
597
598 Raises:
599 JujuApplicationNotFound if there is an invalid app in
600 the instantiation params.
601 """
602 overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
603 if not overlay_apps:
604 return
605 original_apps = await self._get_apps_in_original_bundle(uri, model)
606 if not all(app in original_apps for app in overlay_apps):
607 raise JujuApplicationNotFound(
608 "Cannot find application {} in original bundle {}".format(
609 overlay_apps, original_apps
610 )
611 )
612
613 async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
614 """Bundle is downloaded in BundleHandler.fetch_plan.
615 That method takes care of opening and exception handling.
616
617 Resolve method gets all the information regarding the channel,
618 track, revision, type, source.
619
620 Returns:
621 Set with the names of the applications in original bundle.
622 """
623 url = URL.parse(uri)
624 architecture = DEFAULT_ARCHITECTURE # only AMD64 is allowed
625 res = await model.deploy_types[str(url.schema)].resolve(
626 url, architecture, entity_url=uri
627 )
628 handler = BundleHandler(model, trusted=True, forced=False)
629 await handler.fetch_plan(url, res.origin)
630 return handler.applications
631
632 def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
633 """Extract applications key in instantiation params.
634
635 Returns:
636 List with the names of the applications in instantiation params.
637
638 Raises:
639 JujuError if applications key is not found.
640 """
641 if not instantiation_params:
642 return []
643 try:
644 return [key for key in instantiation_params.get("applications")]
645 except Exception as e:
646 raise JujuError("Invalid overlay format. {}".format(str(e)))
647
648 def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
649 """Creates a temporary overlay file which includes the instantiation params.
650 Only one overlay file is created.
651
652 Returns:
653 List with one overlay filename. Empty list if there are no instantiation params.
654 """
655 if not instantiation_params:
656 return []
657 file_name = model_name + "-overlay.yaml"
658 self._write_overlay_file(file_name, instantiation_params)
659 return [file_name]
660
661 def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
662 with open(file_name, "w") as file:
663 yaml.dump(instantiation_params, file)
664
665 def _remove_overlay_file(self, overlay: list) -> None:
666 """Overlay contains either one or zero file names."""
667 if not overlay:
668 return
669 try:
670 filename = overlay[0]
671 os.remove(filename)
672 except OSError as e:
673 self.log.warning(
674 "Overlay file {} could not be removed: {}".format(filename, e)
675 )
676
677 async def add_unit(
678 self,
679 application_name: str,
680 model_name: str,
681 machine_id: str,
682 db_dict: dict = None,
683 progress_timeout: float = None,
684 total_timeout: float = None,
685 ):
686 """Add unit
687
688 :param: application_name: Application name
689 :param: model_name: Model name
690 :param: machine_id Machine id
691 :param: db_dict: Dictionary with data of the DB to write the updates
692 :param: progress_timeout: Maximum time between two updates in the model
693 :param: total_timeout: Timeout for the entity to be active
694
695 :return: None
696 """
697
698 model = None
699 controller = await self.get_controller()
700 try:
701 model = await self.get_model(controller, model_name)
702 application = self._get_application(model, application_name)
703
704 if application is not None:
705 # Checks if the given machine id in the model,
706 # otherwise function raises an error
707 _machine, _series = self._get_machine_info(model, machine_id)
708
709 self.log.debug(
710 "Adding unit (machine {}) to application {} in model ~{}".format(
711 machine_id, application_name, model_name
712 )
713 )
714
715 await application.add_unit(to=machine_id)
716
717 await JujuModelWatcher.wait_for(
718 model=model,
719 entity=application,
720 progress_timeout=progress_timeout,
721 total_timeout=total_timeout,
722 db_dict=db_dict,
723 n2vc=self.n2vc,
724 vca_id=self.vca_connection._vca_id,
725 )
726 self.log.debug(
727 "Unit is added to application {} in model {}".format(
728 application_name, model_name
729 )
730 )
731 else:
732 raise JujuApplicationNotFound(
733 "Application {} not exists".format(application_name)
734 )
735 finally:
736 if model:
737 await self.disconnect_model(model)
738 await self.disconnect_controller(controller)
739
740 async def destroy_unit(
741 self,
742 application_name: str,
743 model_name: str,
744 machine_id: str,
745 total_timeout: float = None,
746 ):
747 """Destroy unit
748
749 :param: application_name: Application name
750 :param: model_name: Model name
751 :param: machine_id Machine id
752 :param: total_timeout: Timeout for the entity to be active
753
754 :return: None
755 """
756
757 model = None
758 controller = await self.get_controller()
759 try:
760 model = await self.get_model(controller, model_name)
761 application = self._get_application(model, application_name)
762
763 if application is None:
764 raise JujuApplicationNotFound(
765 "Application not found: {} (model={})".format(
766 application_name, model_name
767 )
768 )
769
770 unit = self._get_unit(application, machine_id)
771 if not unit:
772 raise JujuError(
773 "A unit with machine id {} not in available units".format(
774 machine_id
775 )
776 )
777
778 unit_name = unit.name
779
780 self.log.debug(
781 "Destroying unit {} from application {} in model {}".format(
782 unit_name, application_name, model_name
783 )
784 )
785 await application.destroy_unit(unit_name)
786
787 self.log.debug(
788 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
789 unit_name, application_name, model_name
790 )
791 )
792
793 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
794 if total_timeout is None:
795 total_timeout = 3600
796 end = time.time() + total_timeout
797 while time.time() < end:
798 if not self._get_unit(application, machine_id):
799 self.log.debug(
800 "The unit {} was destroyed in application {} (model={}) ".format(
801 unit_name, application_name, model_name
802 )
803 )
804 return
805 await asyncio.sleep(5)
806 self.log.debug(
807 "Unit {} is destroyed from application {} in model {}".format(
808 unit_name, application_name, model_name
809 )
810 )
811 finally:
812 if model:
813 await self.disconnect_model(model)
814 await self.disconnect_controller(controller)
815
816 async def deploy_charm(
817 self,
818 application_name: str,
819 path: str,
820 model_name: str,
821 machine_id: str,
822 db_dict: dict = None,
823 progress_timeout: float = None,
824 total_timeout: float = None,
825 config: dict = None,
826 series: str = None,
827 num_units: int = 1,
828 ):
829 """Deploy charm
830
831 :param: application_name: Application name
832 :param: path: Local path to the charm
833 :param: model_name: Model name
834 :param: machine_id ID of the machine
835 :param: db_dict: Dictionary with data of the DB to write the updates
836 :param: progress_timeout: Maximum time between two updates in the model
837 :param: total_timeout: Timeout for the entity to be active
838 :param: config: Config for the charm
839 :param: series: Series of the charm
840 :param: num_units: Number of units
841
842 :return: (juju.application.Application): Juju application
843 """
844 self.log.debug(
845 "Deploying charm {} to machine {} in model ~{}".format(
846 application_name, machine_id, model_name
847 )
848 )
849 self.log.debug("charm: {}".format(path))
850
851 # Get controller
852 controller = await self.get_controller()
853
854 # Get model
855 model = await self.get_model(controller, model_name)
856
857 try:
858 if application_name not in model.applications:
859 if machine_id is not None:
860 machine, series = self._get_machine_info(model, machine_id)
861
862 application = await model.deploy(
863 entity_url=path,
864 application_name=application_name,
865 channel="stable",
866 num_units=1,
867 series=series,
868 to=machine_id,
869 config=config,
870 )
871
872 self.log.debug(
873 "Wait until application {} is ready in model {}".format(
874 application_name, model_name
875 )
876 )
877 if num_units > 1:
878 for _ in range(num_units - 1):
879 m, _ = await self.create_machine(model_name, wait=False)
880 await application.add_unit(to=m.entity_id)
881
882 await JujuModelWatcher.wait_for(
883 model=model,
884 entity=application,
885 progress_timeout=progress_timeout,
886 total_timeout=total_timeout,
887 db_dict=db_dict,
888 n2vc=self.n2vc,
889 vca_id=self.vca_connection._vca_id,
890 )
891 self.log.debug(
892 "Application {} is ready in model {}".format(
893 application_name, model_name
894 )
895 )
896 else:
897 raise JujuApplicationExists(
898 "Application {} exists".format(application_name)
899 )
900 except juju.errors.JujuError as e:
901 if "already exists" in e.message:
902 raise JujuApplicationExists(
903 "Application {} exists".format(application_name)
904 )
905 else:
906 raise e
907 finally:
908 await self.disconnect_model(model)
909 await self.disconnect_controller(controller)
910
911 return application
912
913 async def upgrade_charm(
914 self,
915 application_name: str,
916 path: str,
917 model_name: str,
918 total_timeout: float = None,
919 **kwargs,
920 ):
921 """Upgrade Charm
922
923 :param: application_name: Application name
924 :param: model_name: Model name
925 :param: path: Local path to the charm
926 :param: total_timeout: Timeout for the entity to be active
927
928 :return: (str, str): (output and status)
929 """
930
931 self.log.debug(
932 "Upgrading charm {} in model {} from path {}".format(
933 application_name, model_name, path
934 )
935 )
936
937 await self.resolve_application(
938 model_name=model_name, application_name=application_name
939 )
940
941 # Get controller
942 controller = await self.get_controller()
943
944 # Get model
945 model = await self.get_model(controller, model_name)
946
947 try:
948 # Get application
949 application = self._get_application(
950 model,
951 application_name=application_name,
952 )
953 if application is None:
954 raise JujuApplicationNotFound(
955 "Cannot find application {} to upgrade".format(application_name)
956 )
957
958 await application.refresh(path=path)
959
960 self.log.debug(
961 "Wait until charm upgrade is completed for application {} (model={})".format(
962 application_name, model_name
963 )
964 )
965
966 await JujuModelWatcher.ensure_units_idle(
967 model=model, application=application
968 )
969
970 if application.status == "error":
971 error_message = "Unknown"
972 for unit in application.units:
973 if (
974 unit.workload_status == "error"
975 and unit.workload_status_message != ""
976 ):
977 error_message = unit.workload_status_message
978
979 message = "Application {} failed update in {}: {}".format(
980 application_name, model_name, error_message
981 )
982 self.log.error(message)
983 raise JujuError(message=message)
984
985 self.log.debug(
986 "Application {} is ready in model {}".format(
987 application_name, model_name
988 )
989 )
990
991 finally:
992 await self.disconnect_model(model)
993 await self.disconnect_controller(controller)
994
995 return application
996
997 async def resolve_application(self, model_name: str, application_name: str):
998 controller = await self.get_controller()
999 model = await self.get_model(controller, model_name)
1000
1001 try:
1002 application = self._get_application(
1003 model,
1004 application_name=application_name,
1005 )
1006 if application is None:
1007 raise JujuApplicationNotFound(
1008 "Cannot find application {} to resolve".format(application_name)
1009 )
1010
1011 while application.status == "error":
1012 for unit in application.units:
1013 if unit.workload_status == "error":
1014 self.log.debug(
1015 "Model {}, Application {}, Unit {} in error state, resolving".format(
1016 model_name, application_name, unit.entity_id
1017 )
1018 )
1019 try:
1020 await unit.resolved(retry=False)
1021 except Exception:
1022 pass
1023
1024 await asyncio.sleep(1)
1025
1026 finally:
1027 await self.disconnect_model(model)
1028 await self.disconnect_controller(controller)
1029
1030 async def resolve(self, model_name: str):
1031 controller = await self.get_controller()
1032 model = await self.get_model(controller, model_name)
1033 all_units_active = False
1034 try:
1035 while not all_units_active:
1036 all_units_active = True
1037 for application_name, application in model.applications.items():
1038 if application.status == "error":
1039 for unit in application.units:
1040 if unit.workload_status == "error":
1041 self.log.debug(
1042 "Model {}, Application {}, Unit {} in error state, resolving".format(
1043 model_name, application_name, unit.entity_id
1044 )
1045 )
1046 try:
1047 await unit.resolved(retry=False)
1048 all_units_active = False
1049 except Exception:
1050 pass
1051
1052 if not all_units_active:
1053 await asyncio.sleep(5)
1054 finally:
1055 await self.disconnect_model(model)
1056 await self.disconnect_controller(controller)
1057
1058 async def scale_application(
1059 self,
1060 model_name: str,
1061 application_name: str,
1062 scale: int = 1,
1063 total_timeout: float = None,
1064 ):
1065 """
1066 Scale application (K8s)
1067
1068 :param: model_name: Model name
1069 :param: application_name: Application name
1070 :param: scale: Scale to which to set this application
1071 :param: total_timeout: Timeout for the entity to be active
1072 """
1073
1074 model = None
1075 controller = await self.get_controller()
1076 try:
1077 model = await self.get_model(controller, model_name)
1078
1079 self.log.debug(
1080 "Scaling application {} in model {}".format(
1081 application_name, model_name
1082 )
1083 )
1084 application = self._get_application(model, application_name)
1085 if application is None:
1086 raise JujuApplicationNotFound("Cannot scale application")
1087 await application.scale(scale=scale)
1088 # Wait until application is scaled in model
1089 self.log.debug(
1090 "Waiting for application {} to be scaled in model {}...".format(
1091 application_name, model_name
1092 )
1093 )
1094 if total_timeout is None:
1095 total_timeout = 1800
1096 end = time.time() + total_timeout
1097 while time.time() < end:
1098 application_scale = self._get_application_count(model, application_name)
1099 # Before calling wait_for_model function,
1100 # wait until application unit count and scale count are equal.
1101 # Because there is a delay before scaling triggers in Juju model.
1102 if application_scale == scale:
1103 await JujuModelWatcher.wait_for_model(
1104 model=model, timeout=total_timeout
1105 )
1106 self.log.debug(
1107 "Application {} is scaled in model {}".format(
1108 application_name, model_name
1109 )
1110 )
1111 return
1112 await asyncio.sleep(5)
1113 raise Exception(
1114 "Timeout waiting for application {} in model {} to be scaled".format(
1115 application_name, model_name
1116 )
1117 )
1118 finally:
1119 if model:
1120 await self.disconnect_model(model)
1121 await self.disconnect_controller(controller)
1122
1123 def _get_application_count(self, model: Model, application_name: str) -> int:
1124 """Get number of units of the application
1125
1126 :param: model: Model object
1127 :param: application_name: Application name
1128
1129 :return: int (or None if application doesn't exist)
1130 """
1131 application = self._get_application(model, application_name)
1132 if application is not None:
1133 return len(application.units)
1134
1135 def _get_application(self, model: Model, application_name: str) -> Application:
1136 """Get application
1137
1138 :param: model: Model object
1139 :param: application_name: Application name
1140
1141 :return: juju.application.Application (or None if it doesn't exist)
1142 """
1143 if model.applications and application_name in model.applications:
1144 return model.applications[application_name]
1145
1146 def _get_unit(self, application: Application, machine_id: str) -> Unit:
1147 """Get unit
1148
1149 :param: application: Application object
1150 :param: machine_id: Machine id
1151
1152 :return: Unit
1153 """
1154 unit = None
1155 for u in application.units:
1156 if u.machine_id == machine_id:
1157 unit = u
1158 break
1159 return unit
1160
1161 def _get_machine_info(
1162 self,
1163 model,
1164 machine_id: str,
1165 ) -> (str, str):
1166 """Get machine info
1167
1168 :param: model: Model object
1169 :param: machine_id: Machine id
1170
1171 :return: (str, str): (machine, series)
1172 """
1173 if machine_id not in model.machines:
1174 msg = "Machine {} not found in model".format(machine_id)
1175 self.log.error(msg=msg)
1176 raise JujuMachineNotFound(msg)
1177 machine = model.machines[machine_id]
1178 return machine, machine.series
1179
1180 async def execute_action(
1181 self,
1182 application_name: str,
1183 model_name: str,
1184 action_name: str,
1185 db_dict: dict = None,
1186 machine_id: str = None,
1187 progress_timeout: float = None,
1188 total_timeout: float = None,
1189 **kwargs,
1190 ):
1191 """Execute action
1192
1193 :param: application_name: Application name
1194 :param: model_name: Model name
1195 :param: action_name: Name of the action
1196 :param: db_dict: Dictionary with data of the DB to write the updates
1197 :param: machine_id Machine id
1198 :param: progress_timeout: Maximum time between two updates in the model
1199 :param: total_timeout: Timeout for the entity to be active
1200
1201 :return: (str, str): (output and status)
1202 """
1203 self.log.debug(
1204 "Executing action {} using params {}".format(action_name, kwargs)
1205 )
1206 # Get controller
1207 controller = await self.get_controller()
1208
1209 # Get model
1210 model = await self.get_model(controller, model_name)
1211
1212 try:
1213 # Get application
1214 application = self._get_application(
1215 model,
1216 application_name=application_name,
1217 )
1218 if application is None:
1219 raise JujuApplicationNotFound("Cannot execute action")
1220 # Racing condition:
1221 # Ocassionally, self._get_leader_unit() will return None
1222 # because the leader elected hook has not been triggered yet.
1223 # Therefore, we are doing some retries. If it happens again,
1224 # re-open bug 1236
1225 if machine_id is None:
1226 unit = await self._get_leader_unit(application)
1227 self.log.debug(
1228 "Action {} is being executed on the leader unit {}".format(
1229 action_name, unit.name
1230 )
1231 )
1232 else:
1233 unit = self._get_unit(application, machine_id)
1234 if not unit:
1235 raise JujuError(
1236 "A unit with machine id {} not in available units".format(
1237 machine_id
1238 )
1239 )
1240 self.log.debug(
1241 "Action {} is being executed on {} unit".format(
1242 action_name, unit.name
1243 )
1244 )
1245
1246 actions = await application.get_actions()
1247
1248 if action_name not in actions:
1249 raise JujuActionNotFound(
1250 "Action {} not in available actions".format(action_name)
1251 )
1252
1253 action = await unit.run_action(action_name, **kwargs)
1254
1255 self.log.debug(
1256 "Wait until action {} is completed in application {} (model={})".format(
1257 action_name, application_name, model_name
1258 )
1259 )
1260 await JujuModelWatcher.wait_for(
1261 model=model,
1262 entity=action,
1263 progress_timeout=progress_timeout,
1264 total_timeout=total_timeout,
1265 db_dict=db_dict,
1266 n2vc=self.n2vc,
1267 vca_id=self.vca_connection._vca_id,
1268 )
1269
1270 output = await model.get_action_output(action_uuid=action.entity_id)
1271 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1272 status = (
1273 status[action.entity_id] if action.entity_id in status else "failed"
1274 )
1275
1276 self.log.debug(
1277 "Action {} completed with status {} in application {} (model={})".format(
1278 action_name, action.status, application_name, model_name
1279 )
1280 )
1281 finally:
1282 await self.disconnect_model(model)
1283 await self.disconnect_controller(controller)
1284
1285 return output, status
1286
1287 async def get_actions(self, application_name: str, model_name: str) -> dict:
1288 """Get list of actions
1289
1290 :param: application_name: Application name
1291 :param: model_name: Model name
1292
1293 :return: Dict with this format
1294 {
1295 "action_name": "Description of the action",
1296 ...
1297 }
1298 """
1299 self.log.debug(
1300 "Getting list of actions for application {}".format(application_name)
1301 )
1302
1303 # Get controller
1304 controller = await self.get_controller()
1305
1306 # Get model
1307 model = await self.get_model(controller, model_name)
1308
1309 try:
1310 # Get application
1311 application = self._get_application(
1312 model,
1313 application_name=application_name,
1314 )
1315
1316 # Return list of actions
1317 return await application.get_actions()
1318
1319 finally:
1320 # Disconnect from model and controller
1321 await self.disconnect_model(model)
1322 await self.disconnect_controller(controller)
1323
1324 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1325 """Get the metrics collected by the VCA.
1326
1327 :param model_name The name or unique id of the network service
1328 :param application_name The name of the application
1329 """
1330 if not model_name or not application_name:
1331 raise Exception("model_name and application_name must be non-empty strings")
1332 metrics = {}
1333 controller = await self.get_controller()
1334 model = await self.get_model(controller, model_name)
1335 try:
1336 application = self._get_application(model, application_name)
1337 if application is not None:
1338 metrics = await application.get_metrics()
1339 finally:
1340 self.disconnect_model(model)
1341 self.disconnect_controller(controller)
1342 return metrics
1343
1344 async def add_relation(
1345 self,
1346 model_name: str,
1347 endpoint_1: str,
1348 endpoint_2: str,
1349 ):
1350 """Add relation
1351
1352 :param: model_name: Model name
1353 :param: endpoint_1 First endpoint name
1354 ("app:endpoint" format or directly the saas name)
1355 :param: endpoint_2: Second endpoint name (^ same format)
1356 """
1357
1358 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1359
1360 # Get controller
1361 controller = await self.get_controller()
1362
1363 # Get model
1364 model = await self.get_model(controller, model_name)
1365
1366 # Add relation
1367 try:
1368 await model.add_relation(endpoint_1, endpoint_2)
1369 except juju.errors.JujuAPIError as e:
1370 if self._relation_is_not_found(e):
1371 self.log.warning("Relation not found: {}".format(e.message))
1372 return
1373 if self._relation_already_exist(e):
1374 self.log.warning("Relation already exists: {}".format(e.message))
1375 return
1376 # another exception, raise it
1377 raise e
1378 finally:
1379 await self.disconnect_model(model)
1380 await self.disconnect_controller(controller)
1381
1382 def _relation_is_not_found(self, juju_error):
1383 text = "not found"
1384 return (text in juju_error.message) or (
1385 juju_error.error_code and text in juju_error.error_code
1386 )
1387
1388 def _relation_already_exist(self, juju_error):
1389 text = "already exists"
1390 return (text in juju_error.message) or (
1391 juju_error.error_code and text in juju_error.error_code
1392 )
1393
1394 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1395 """
1396 Create an offer from a RelationEndpoint
1397
1398 :param: endpoint: Relation endpoint
1399
1400 :return: Offer object
1401 """
1402 model_name = endpoint.model_name
1403 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1404 controller = await self.get_controller()
1405 model = None
1406 try:
1407 model = await self.get_model(controller, model_name)
1408 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1409 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1410 if offer_list:
1411 return Offer(offer_list[0].offer_url)
1412 else:
1413 raise Exception("offer was not created")
1414 except juju.errors.JujuError as e:
1415 if "application offer already exists" not in e.message:
1416 raise e
1417 finally:
1418 if model:
1419 self.disconnect_model(model)
1420 self.disconnect_controller(controller)
1421
1422 async def consume(
1423 self,
1424 model_name: str,
1425 offer: Offer,
1426 provider_libjuju: "Libjuju",
1427 ) -> str:
1428 """
1429 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1430
1431 :param: model_name: Model name
1432 :param: offer: Offer object to consume
1433 :param: provider_libjuju: Libjuju object of the provider endpoint
1434
1435 :raises ParseError if there's a problem parsing the offer_url
1436 :raises JujuError if remote offer includes and endpoint
1437 :raises JujuAPIError if the operation is not successful
1438
1439 :returns: Saas name. It is the application name in the model that reference the remote application.
1440 """
1441 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1442 if offer.vca_id:
1443 saas_name = f"{saas_name}-{offer.vca_id}"
1444 controller = await self.get_controller()
1445 model = None
1446 provider_controller = None
1447 try:
1448 model = await controller.get_model(model_name)
1449 provider_controller = await provider_libjuju.get_controller()
1450 await model.consume(
1451 offer.url, application_alias=saas_name, controller=provider_controller
1452 )
1453 return saas_name
1454 finally:
1455 if model:
1456 await self.disconnect_model(model)
1457 if provider_controller:
1458 await provider_libjuju.disconnect_controller(provider_controller)
1459 await self.disconnect_controller(controller)
1460
1461 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1462 """
1463 Destroy model
1464
1465 :param: model_name: Model name
1466 :param: total_timeout: Timeout
1467 """
1468
1469 controller = await self.get_controller()
1470 model = None
1471 try:
1472 if not await self.model_exists(model_name, controller=controller):
1473 self.log.warn(f"Model {model_name} doesn't exist")
1474 return
1475
1476 self.log.debug(f"Getting model {model_name} to be destroyed")
1477 model = await self.get_model(controller, model_name)
1478 self.log.debug(f"Destroying manual machines in model {model_name}")
1479 # Destroy machines that are manually provisioned
1480 # and still are in pending state
1481 await self._destroy_pending_machines(model, only_manual=True)
1482 await self.disconnect_model(model)
1483
1484 await asyncio.wait_for(
1485 self._destroy_model(model_name, controller),
1486 timeout=total_timeout,
1487 )
1488 except Exception as e:
1489 if not await self.model_exists(model_name, controller=controller):
1490 self.log.warn(
1491 f"Failed deleting model {model_name}: model doesn't exist"
1492 )
1493 return
1494 self.log.warn(f"Failed deleting model {model_name}: {e}")
1495 raise e
1496 finally:
1497 if model:
1498 await self.disconnect_model(model)
1499 await self.disconnect_controller(controller)
1500
1501 async def _destroy_model(
1502 self,
1503 model_name: str,
1504 controller: Controller,
1505 ):
1506 """
1507 Destroy model from controller
1508
1509 :param: model: Model name to be removed
1510 :param: controller: Controller object
1511 :param: timeout: Timeout in seconds
1512 """
1513 self.log.debug(f"Destroying model {model_name}")
1514
1515 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1516 self.log.info(f"Gracefully deleting model {model_name}")
1517 resolved = False
1518 while model_name in await controller.list_models():
1519 if not resolved:
1520 await self.resolve(model_name)
1521 resolved = True
1522 await controller.destroy_model(model_name, destroy_storage=True)
1523
1524 await asyncio.sleep(5)
1525 self.log.info(f"Model {model_name} deleted gracefully")
1526
1527 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1528 self.log.info(f"Forcefully deleting model {model_name}")
1529 while model_name in await controller.list_models():
1530 await controller.destroy_model(
1531 model_name, destroy_storage=True, force=True, max_wait=60
1532 )
1533 await asyncio.sleep(5)
1534 self.log.info(f"Model {model_name} deleted forcefully")
1535
1536 try:
1537 try:
1538 await asyncio.wait_for(
1539 _destroy_model_gracefully(model_name, controller), timeout=120
1540 )
1541 except asyncio.TimeoutError:
1542 await _destroy_model_forcefully(model_name, controller)
1543 except juju.errors.JujuError as e:
1544 if any("has been removed" in error for error in e.errors):
1545 return
1546 if any("model not found" in error for error in e.errors):
1547 return
1548 raise e
1549
1550 async def destroy_application(
1551 self, model_name: str, application_name: str, total_timeout: float
1552 ):
1553 """
1554 Destroy application
1555
1556 :param: model_name: Model name
1557 :param: application_name: Application name
1558 :param: total_timeout: Timeout
1559 """
1560
1561 controller = await self.get_controller()
1562 model = None
1563
1564 try:
1565 model = await self.get_model(controller, model_name)
1566 self.log.debug(
1567 "Destroying application {} in model {}".format(
1568 application_name, model_name
1569 )
1570 )
1571 application = self._get_application(model, application_name)
1572 if application:
1573 await application.destroy()
1574 else:
1575 self.log.warning("Application not found: {}".format(application_name))
1576
1577 self.log.debug(
1578 "Waiting for application {} to be destroyed in model {}...".format(
1579 application_name, model_name
1580 )
1581 )
1582 if total_timeout is None:
1583 total_timeout = 3600
1584 end = time.time() + total_timeout
1585 while time.time() < end:
1586 if not self._get_application(model, application_name):
1587 self.log.debug(
1588 "The application {} was destroyed in model {} ".format(
1589 application_name, model_name
1590 )
1591 )
1592 return
1593 await asyncio.sleep(5)
1594 raise Exception(
1595 "Timeout waiting for application {} to be destroyed in model {}".format(
1596 application_name, model_name
1597 )
1598 )
1599 finally:
1600 if model is not None:
1601 await self.disconnect_model(model)
1602 await self.disconnect_controller(controller)
1603
1604 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1605 """
1606 Destroy pending machines in a given model
1607
1608 :param: only_manual: Bool that indicates only manually provisioned
1609 machines should be destroyed (if True), or that
1610 all pending machines should be destroyed
1611 """
1612 status = await model.get_status()
1613 for machine_id in status.machines:
1614 machine_status = status.machines[machine_id]
1615 if machine_status.agent_status.status == "pending":
1616 if only_manual and not machine_status.instance_id.startswith("manual:"):
1617 break
1618 machine = model.machines[machine_id]
1619 await machine.destroy(force=True)
1620
1621 async def configure_application(
1622 self, model_name: str, application_name: str, config: dict = None
1623 ):
1624 """Configure application
1625
1626 :param: model_name: Model name
1627 :param: application_name: Application name
1628 :param: config: Config to apply to the charm
1629 """
1630 self.log.debug("Configuring application {}".format(application_name))
1631
1632 if config:
1633 controller = await self.get_controller()
1634 model = None
1635 try:
1636 model = await self.get_model(controller, model_name)
1637 application = self._get_application(
1638 model,
1639 application_name=application_name,
1640 )
1641 await application.set_config(config)
1642 finally:
1643 if model:
1644 await self.disconnect_model(model)
1645 await self.disconnect_controller(controller)
1646
1647 async def health_check(self, interval: float = 300.0):
1648 """
1649 Health check to make sure controller and controller_model connections are OK
1650
1651 :param: interval: Time in seconds between checks
1652 """
1653 controller = None
1654 while True:
1655 try:
1656 controller = await self.get_controller()
1657 # self.log.debug("VCA is alive")
1658 except Exception as e:
1659 self.log.error("Health check to VCA failed: {}".format(e))
1660 finally:
1661 await self.disconnect_controller(controller)
1662 await asyncio.sleep(interval)
1663
1664 async def list_models(self, contains: str = None) -> [str]:
1665 """List models with certain names
1666
1667 :param: contains: String that is contained in model name
1668
1669 :retur: [models] Returns list of model names
1670 """
1671
1672 controller = await self.get_controller()
1673 try:
1674 models = await controller.list_models()
1675 if contains:
1676 models = [model for model in models if contains in model]
1677 return models
1678 finally:
1679 await self.disconnect_controller(controller)
1680
1681 async def _list_offers(
1682 self, model_name: str, offer_name: str = None
1683 ) -> QueryApplicationOffersResults:
1684 """
1685 List offers within a model
1686
1687 :param: model_name: Model name
1688 :param: offer_name: Offer name to filter.
1689
1690 :return: Returns application offers results in the model
1691 """
1692
1693 controller = await self.get_controller()
1694 try:
1695 offers = (await controller.list_offers(model_name)).results
1696 if offer_name:
1697 matching_offer = []
1698 for offer in offers:
1699 if offer.offer_name == offer_name:
1700 matching_offer.append(offer)
1701 break
1702 offers = matching_offer
1703 return offers
1704 finally:
1705 await self.disconnect_controller(controller)
1706
1707 async def add_k8s(
1708 self,
1709 name: str,
1710 rbac_id: str,
1711 token: str,
1712 client_cert_data: str,
1713 configuration: Configuration,
1714 storage_class: str,
1715 credential_name: str = None,
1716 ):
1717 """
1718 Add a Kubernetes cloud to the controller
1719
1720 Similar to the `juju add-k8s` command in the CLI
1721
1722 :param: name: Name for the K8s cloud
1723 :param: configuration: Kubernetes configuration object
1724 :param: storage_class: Storage Class to use in the cloud
1725 :param: credential_name: Storage Class to use in the cloud
1726 """
1727
1728 if not storage_class:
1729 raise Exception("storage_class must be a non-empty string")
1730 if not name:
1731 raise Exception("name must be a non-empty string")
1732 if not configuration:
1733 raise Exception("configuration must be provided")
1734
1735 endpoint = configuration.host
1736 credential = self.get_k8s_cloud_credential(
1737 configuration,
1738 client_cert_data,
1739 token,
1740 )
1741 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1742 cloud = client.Cloud(
1743 type_="kubernetes",
1744 auth_types=[credential.auth_type],
1745 endpoint=endpoint,
1746 ca_certificates=[client_cert_data],
1747 config={
1748 "operator-storage": storage_class,
1749 "workload-storage": storage_class,
1750 },
1751 )
1752
1753 return await self.add_cloud(
1754 name, cloud, credential, credential_name=credential_name
1755 )
1756
1757 def get_k8s_cloud_credential(
1758 self,
1759 configuration: Configuration,
1760 client_cert_data: str,
1761 token: str = None,
1762 ) -> client.CloudCredential:
1763 attrs = {}
1764 # TODO: Test with AKS
1765 key = None # open(configuration.key_file, "r").read()
1766 username = configuration.username
1767 password = configuration.password
1768
1769 if client_cert_data:
1770 attrs["ClientCertificateData"] = client_cert_data
1771 if key:
1772 attrs["ClientKeyData"] = key
1773 if token:
1774 if username or password:
1775 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1776 attrs["Token"] = token
1777
1778 auth_type = None
1779 if key:
1780 auth_type = "oauth2"
1781 if client_cert_data:
1782 auth_type = "oauth2withcert"
1783 if not token:
1784 raise JujuInvalidK8sConfiguration(
1785 "missing token for auth type {}".format(auth_type)
1786 )
1787 elif username:
1788 if not password:
1789 self.log.debug(
1790 "credential for user {} has empty password".format(username)
1791 )
1792 attrs["username"] = username
1793 attrs["password"] = password
1794 if client_cert_data:
1795 auth_type = "userpasswithcert"
1796 else:
1797 auth_type = "userpass"
1798 elif client_cert_data and token:
1799 auth_type = "certificate"
1800 else:
1801 raise JujuInvalidK8sConfiguration("authentication method not supported")
1802 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1803
1804 async def add_cloud(
1805 self,
1806 name: str,
1807 cloud: Cloud,
1808 credential: CloudCredential = None,
1809 credential_name: str = None,
1810 ) -> Cloud:
1811 """
1812 Add cloud to the controller
1813
1814 :param: name: Name of the cloud to be added
1815 :param: cloud: Cloud object
1816 :param: credential: CloudCredentials object for the cloud
1817 :param: credential_name: Credential name.
1818 If not defined, cloud of the name will be used.
1819 """
1820 controller = await self.get_controller()
1821 try:
1822 _ = await controller.add_cloud(name, cloud)
1823 if credential:
1824 await controller.add_credential(
1825 credential_name or name, credential=credential, cloud=name
1826 )
1827 # Need to return the object returned by the controller.add_cloud() function
1828 # I'm returning the original value now until this bug is fixed:
1829 # https://github.com/juju/python-libjuju/issues/443
1830 return cloud
1831 finally:
1832 await self.disconnect_controller(controller)
1833
1834 async def remove_cloud(self, name: str):
1835 """
1836 Remove cloud
1837
1838 :param: name: Name of the cloud to be removed
1839 """
1840 controller = await self.get_controller()
1841 try:
1842 await controller.remove_cloud(name)
1843 except juju.errors.JujuError as e:
1844 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1845 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1846 else:
1847 raise e
1848 finally:
1849 await self.disconnect_controller(controller)
1850
1851 @retry(
1852 attempts=20, delay=5, fallback=JujuLeaderUnitNotFound(), callback=retry_callback
1853 )
1854 async def _get_leader_unit(self, application: Application) -> Unit:
1855 unit = None
1856 for u in application.units:
1857 if await u.is_leader_from_status():
1858 unit = u
1859 break
1860 if not unit:
1861 raise Exception()
1862 return unit
1863
1864 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1865 """
1866 Get cloud credentials
1867
1868 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1869
1870 :return: List of credentials object associated to the specified cloud
1871
1872 """
1873 controller = await self.get_controller()
1874 try:
1875 facade = client.CloudFacade.from_connection(controller.connection())
1876 cloud_cred_tag = tag.credential(
1877 cloud.name, self.vca_connection.data.user, cloud.credential_name
1878 )
1879 params = [client.Entity(cloud_cred_tag)]
1880 return (await facade.Credential(params)).results
1881 finally:
1882 await self.disconnect_controller(controller)
1883
1884 async def check_application_exists(self, model_name, application_name) -> bool:
1885 """Check application exists
1886
1887 :param: model_name: Model Name
1888 :param: application_name: Application Name
1889
1890 :return: Boolean
1891 """
1892
1893 model = None
1894 controller = await self.get_controller()
1895 try:
1896 model = await self.get_model(controller, model_name)
1897 self.log.debug(
1898 "Checking if application {} exists in model {}".format(
1899 application_name, model_name
1900 )
1901 )
1902 return self._get_application(model, application_name) is not None
1903 finally:
1904 if model:
1905 await self.disconnect_model(model)
1906 await self.disconnect_controller(controller)