Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import os
18 import typing
19 import yaml
20
21 import time
22
23 import juju.errors
24 from juju.bundle import BundleHandler
25 from juju.model import Model
26 from juju.machine import Machine
27 from juju.application import Application
28 from juju.unit import Unit
29 from juju.url import URL
30 from juju.version import DEFAULT_ARCHITECTURE
31 from juju.client._definitions import (
32 FullStatus,
33 QueryApplicationOffersResults,
34 Cloud,
35 CloudCredential,
36 )
37 from juju.controller import Controller
38 from juju.client import client
39 from juju import tag
40
41 from n2vc.definitions import Offer, RelationEndpoint
42 from n2vc.juju_watcher import JujuModelWatcher
43 from n2vc.provisioner import AsyncSSHProvisioner
44 from n2vc.n2vc_conn import N2VCConnector
45 from n2vc.exceptions import (
46 JujuMachineNotFound,
47 JujuApplicationNotFound,
48 JujuLeaderUnitNotFound,
49 JujuActionNotFound,
50 JujuControllerFailedConnecting,
51 JujuApplicationExists,
52 JujuInvalidK8sConfiguration,
53 JujuError,
54 )
55 from n2vc.vca.cloud import Cloud as VcaCloud
56 from n2vc.vca.connection import Connection
57 from kubernetes.client.configuration import Configuration
58 from retrying_async import retry
59
60
61 RBAC_LABEL_KEY_NAME = "rbac-id"
62
63
64 class Libjuju:
65 def __init__(
66 self,
67 vca_connection: Connection,
68 loop: asyncio.AbstractEventLoop = None,
69 log: logging.Logger = None,
70 n2vc: N2VCConnector = None,
71 ):
72 """
73 Constructor
74
75 :param: vca_connection: n2vc.vca.connection object
76 :param: loop: Asyncio loop
77 :param: log: Logger
78 :param: n2vc: N2VC object
79 """
80
81 self.log = log or logging.getLogger("Libjuju")
82 self.n2vc = n2vc
83 self.vca_connection = vca_connection
84
85 self.loop = loop or asyncio.get_event_loop()
86 self.loop.set_exception_handler(self.handle_exception)
87 self.creating_model = asyncio.Lock(loop=self.loop)
88
89 if self.vca_connection.is_default:
90 self.health_check_task = self._create_health_check_task()
91
92 def _create_health_check_task(self):
93 return self.loop.create_task(self.health_check())
94
95 async def get_controller(self, timeout: float = 60.0) -> Controller:
96 """
97 Get controller
98
99 :param: timeout: Time in seconds to wait for controller to connect
100 """
101 controller = None
102 try:
103 controller = Controller()
104 await asyncio.wait_for(
105 controller.connect(
106 endpoint=self.vca_connection.data.endpoints,
107 username=self.vca_connection.data.user,
108 password=self.vca_connection.data.secret,
109 cacert=self.vca_connection.data.cacert,
110 ),
111 timeout=timeout,
112 )
113 if self.vca_connection.is_default:
114 endpoints = await controller.api_endpoints
115 if not all(
116 endpoint in self.vca_connection.endpoints for endpoint in endpoints
117 ):
118 await self.vca_connection.update_endpoints(endpoints)
119 return controller
120 except asyncio.CancelledError as e:
121 raise e
122 except Exception as e:
123 self.log.error(
124 "Failed connecting to controller: {}... {}".format(
125 self.vca_connection.data.endpoints, e
126 )
127 )
128 if controller:
129 await self.disconnect_controller(controller)
130
131 raise JujuControllerFailedConnecting(
132 f"Error connecting to Juju controller: {e}"
133 )
134
135 async def disconnect(self):
136 """Disconnect"""
137 # Cancel health check task
138 self.health_check_task.cancel()
139 self.log.debug("Libjuju disconnected!")
140
141 async def disconnect_model(self, model: Model):
142 """
143 Disconnect model
144
145 :param: model: Model that will be disconnected
146 """
147 await model.disconnect()
148
149 async def disconnect_controller(self, controller: Controller):
150 """
151 Disconnect controller
152
153 :param: controller: Controller that will be disconnected
154 """
155 if controller:
156 await controller.disconnect()
157
158 @retry(attempts=3, delay=5, timeout=None)
159 async def add_model(self, model_name: str, cloud: VcaCloud):
160 """
161 Create model
162
163 :param: model_name: Model name
164 :param: cloud: Cloud object
165 """
166
167 # Get controller
168 controller = await self.get_controller()
169 model = None
170 try:
171 # Block until other workers have finished model creation
172 while self.creating_model.locked():
173 await asyncio.sleep(0.1)
174
175 # Create the model
176 async with self.creating_model:
177 if await self.model_exists(model_name, controller=controller):
178 return
179 self.log.debug("Creating model {}".format(model_name))
180 model = await controller.add_model(
181 model_name,
182 config=self.vca_connection.data.model_config,
183 cloud_name=cloud.name,
184 credential_name=cloud.credential_name,
185 )
186 except juju.errors.JujuAPIError as e:
187 if "already exists" in e.message:
188 pass
189 else:
190 raise e
191 finally:
192 if model:
193 await self.disconnect_model(model)
194 await self.disconnect_controller(controller)
195
196 async def get_executed_actions(self, model_name: str) -> list:
197 """
198 Get executed/history of actions for a model.
199
200 :param: model_name: Model name, str.
201 :return: List of executed actions for a model.
202 """
203 model = None
204 executed_actions = []
205 controller = await self.get_controller()
206 try:
207 model = await self.get_model(controller, model_name)
208 # Get all unique action names
209 actions = {}
210 for application in model.applications:
211 application_actions = await self.get_actions(application, model_name)
212 actions.update(application_actions)
213 # Get status of all actions
214 for application_action in actions:
215 app_action_status_list = await model.get_action_status(
216 name=application_action
217 )
218 for action_id, action_status in app_action_status_list.items():
219 executed_action = {
220 "id": action_id,
221 "action": application_action,
222 "status": action_status,
223 }
224 # Get action output by id
225 action_status = await model.get_action_output(executed_action["id"])
226 for k, v in action_status.items():
227 executed_action[k] = v
228 executed_actions.append(executed_action)
229 except Exception as e:
230 raise JujuError(
231 "Error in getting executed actions for model: {}. Error: {}".format(
232 model_name, str(e)
233 )
234 )
235 finally:
236 if model:
237 await self.disconnect_model(model)
238 await self.disconnect_controller(controller)
239 return executed_actions
240
241 async def get_application_configs(
242 self, model_name: str, application_name: str
243 ) -> dict:
244 """
245 Get available configs for an application.
246
247 :param: model_name: Model name, str.
248 :param: application_name: Application name, str.
249
250 :return: A dict which has key - action name, value - action description
251 """
252 model = None
253 application_configs = {}
254 controller = await self.get_controller()
255 try:
256 model = await self.get_model(controller, model_name)
257 application = self._get_application(
258 model, application_name=application_name
259 )
260 application_configs = await application.get_config()
261 except Exception as e:
262 raise JujuError(
263 "Error in getting configs for application: {} in model: {}. Error: {}".format(
264 application_name, model_name, str(e)
265 )
266 )
267 finally:
268 if model:
269 await self.disconnect_model(model)
270 await self.disconnect_controller(controller)
271 return application_configs
272
273 @retry(attempts=3, delay=5)
274 async def get_model(self, controller: Controller, model_name: str) -> Model:
275 """
276 Get model from controller
277
278 :param: controller: Controller
279 :param: model_name: Model name
280
281 :return: Model: The created Juju model object
282 """
283 return await controller.get_model(model_name)
284
285 async def model_exists(
286 self, model_name: str, controller: Controller = None
287 ) -> bool:
288 """
289 Check if model exists
290
291 :param: controller: Controller
292 :param: model_name: Model name
293
294 :return bool
295 """
296 need_to_disconnect = False
297
298 # Get controller if not passed
299 if not controller:
300 controller = await self.get_controller()
301 need_to_disconnect = True
302
303 # Check if model exists
304 try:
305 return model_name in await controller.list_models()
306 finally:
307 if need_to_disconnect:
308 await self.disconnect_controller(controller)
309
310 async def models_exist(self, model_names: [str]) -> (bool, list):
311 """
312 Check if models exists
313
314 :param: model_names: List of strings with model names
315
316 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
317 """
318 if not model_names:
319 raise Exception(
320 "model_names must be a non-empty array. Given value: {}".format(
321 model_names
322 )
323 )
324 non_existing_models = []
325 models = await self.list_models()
326 existing_models = list(set(models).intersection(model_names))
327 non_existing_models = list(set(model_names) - set(existing_models))
328
329 return (
330 len(non_existing_models) == 0,
331 non_existing_models,
332 )
333
334 async def get_model_status(self, model_name: str) -> FullStatus:
335 """
336 Get model status
337
338 :param: model_name: Model name
339
340 :return: Full status object
341 """
342 controller = await self.get_controller()
343 model = await self.get_model(controller, model_name)
344 try:
345 return await model.get_status()
346 finally:
347 await self.disconnect_model(model)
348 await self.disconnect_controller(controller)
349
350 async def create_machine(
351 self,
352 model_name: str,
353 machine_id: str = None,
354 db_dict: dict = None,
355 progress_timeout: float = None,
356 total_timeout: float = None,
357 series: str = "bionic",
358 wait: bool = True,
359 ) -> (Machine, bool):
360 """
361 Create machine
362
363 :param: model_name: Model name
364 :param: machine_id: Machine id
365 :param: db_dict: Dictionary with data of the DB to write the updates
366 :param: progress_timeout: Maximum time between two updates in the model
367 :param: total_timeout: Timeout for the entity to be active
368 :param: series: Series of the machine (xenial, bionic, focal, ...)
369 :param: wait: Wait until machine is ready
370
371 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
372 if the machine is new or it already existed
373 """
374 new = False
375 machine = None
376
377 self.log.debug(
378 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
379 )
380
381 # Get controller
382 controller = await self.get_controller()
383
384 # Get model
385 model = await self.get_model(controller, model_name)
386 try:
387 if machine_id is not None:
388 self.log.debug(
389 "Searching machine (id={}) in model {}".format(
390 machine_id, model_name
391 )
392 )
393
394 # Get machines from model and get the machine with machine_id if exists
395 machines = await model.get_machines()
396 if machine_id in machines:
397 self.log.debug(
398 "Machine (id={}) found in model {}".format(
399 machine_id, model_name
400 )
401 )
402 machine = machines[machine_id]
403 else:
404 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
405
406 if machine is None:
407 self.log.debug("Creating a new machine in model {}".format(model_name))
408
409 # Create machine
410 machine = await model.add_machine(
411 spec=None, constraints=None, disks=None, series=series
412 )
413 new = True
414
415 # Wait until the machine is ready
416 self.log.debug(
417 "Wait until machine {} is ready in model {}".format(
418 machine.entity_id, model_name
419 )
420 )
421 if wait:
422 await JujuModelWatcher.wait_for(
423 model=model,
424 entity=machine,
425 progress_timeout=progress_timeout,
426 total_timeout=total_timeout,
427 db_dict=db_dict,
428 n2vc=self.n2vc,
429 vca_id=self.vca_connection._vca_id,
430 )
431 finally:
432 await self.disconnect_model(model)
433 await self.disconnect_controller(controller)
434
435 self.log.debug(
436 "Machine {} ready at {} in model {}".format(
437 machine.entity_id, machine.dns_name, model_name
438 )
439 )
440 return machine, new
441
442 async def provision_machine(
443 self,
444 model_name: str,
445 hostname: str,
446 username: str,
447 private_key_path: str,
448 db_dict: dict = None,
449 progress_timeout: float = None,
450 total_timeout: float = None,
451 ) -> str:
452 """
453 Manually provisioning of a machine
454
455 :param: model_name: Model name
456 :param: hostname: IP to access the machine
457 :param: username: Username to login to the machine
458 :param: private_key_path: Local path for the private key
459 :param: db_dict: Dictionary with data of the DB to write the updates
460 :param: progress_timeout: Maximum time between two updates in the model
461 :param: total_timeout: Timeout for the entity to be active
462
463 :return: (Entity): Machine id
464 """
465 self.log.debug(
466 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
467 model_name, hostname, username
468 )
469 )
470
471 # Get controller
472 controller = await self.get_controller()
473
474 # Get model
475 model = await self.get_model(controller, model_name)
476
477 try:
478 # Get provisioner
479 provisioner = AsyncSSHProvisioner(
480 host=hostname,
481 user=username,
482 private_key_path=private_key_path,
483 log=self.log,
484 )
485
486 # Provision machine
487 params = await provisioner.provision_machine()
488
489 params.jobs = ["JobHostUnits"]
490
491 self.log.debug("Adding machine to model")
492 connection = model.connection()
493 client_facade = client.ClientFacade.from_connection(connection)
494
495 results = await client_facade.AddMachines(params=[params])
496 error = results.machines[0].error
497
498 if error:
499 msg = "Error adding machine: {}".format(error.message)
500 self.log.error(msg=msg)
501 raise ValueError(msg)
502
503 machine_id = results.machines[0].machine
504
505 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
506 asyncio.ensure_future(
507 provisioner.install_agent(
508 connection=connection,
509 nonce=params.nonce,
510 machine_id=machine_id,
511 proxy=self.vca_connection.data.api_proxy,
512 series=params.series,
513 )
514 )
515
516 machine = None
517 for _ in range(10):
518 machine_list = await model.get_machines()
519 if machine_id in machine_list:
520 self.log.debug("Machine {} found in model!".format(machine_id))
521 machine = model.machines.get(machine_id)
522 break
523 await asyncio.sleep(2)
524
525 if machine is None:
526 msg = "Machine {} not found in model".format(machine_id)
527 self.log.error(msg=msg)
528 raise JujuMachineNotFound(msg)
529
530 self.log.debug(
531 "Wait until machine {} is ready in model {}".format(
532 machine.entity_id, model_name
533 )
534 )
535 await JujuModelWatcher.wait_for(
536 model=model,
537 entity=machine,
538 progress_timeout=progress_timeout,
539 total_timeout=total_timeout,
540 db_dict=db_dict,
541 n2vc=self.n2vc,
542 vca_id=self.vca_connection._vca_id,
543 )
544 except Exception as e:
545 raise e
546 finally:
547 await self.disconnect_model(model)
548 await self.disconnect_controller(controller)
549
550 self.log.debug(
551 "Machine provisioned {} in model {}".format(machine_id, model_name)
552 )
553
554 return machine_id
555
556 async def deploy(
557 self,
558 uri: str,
559 model_name: str,
560 wait: bool = True,
561 timeout: float = 3600,
562 instantiation_params: dict = None,
563 ):
564 """
565 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
566
567 :param uri: Path or Charm Store uri in which the charm or bundle can be found
568 :param model_name: Model name
569 :param wait: Indicates whether to wait or not until all applications are active
570 :param timeout: Time in seconds to wait until all applications are active
571 :param instantiation_params: To be applied as overlay bundle over primary bundle.
572 """
573 controller = await self.get_controller()
574 model = await self.get_model(controller, model_name)
575 overlays = []
576 try:
577 await self._validate_instantiation_params(uri, model, instantiation_params)
578 overlays = self._get_overlays(model_name, instantiation_params)
579 await model.deploy(uri, trust=True, overlays=overlays)
580 if wait:
581 await JujuModelWatcher.wait_for_model(model, timeout=timeout)
582 self.log.debug("All units active in model {}".format(model_name))
583 finally:
584 self._remove_overlay_file(overlays)
585 await self.disconnect_model(model)
586 await self.disconnect_controller(controller)
587
588 async def _validate_instantiation_params(
589 self, uri: str, model, instantiation_params: dict
590 ) -> None:
591 """Checks if all the applications in instantiation_params
592 exist ins the original bundle.
593
594 Raises:
595 JujuApplicationNotFound if there is an invalid app in
596 the instantiation params.
597 """
598 overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
599 if not overlay_apps:
600 return
601 original_apps = await self._get_apps_in_original_bundle(uri, model)
602 if not all(app in original_apps for app in overlay_apps):
603 raise JujuApplicationNotFound(
604 "Cannot find application {} in original bundle {}".format(
605 overlay_apps, original_apps
606 )
607 )
608
609 async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
610 """Bundle is downloaded in BundleHandler.fetch_plan.
611 That method takes care of opening and exception handling.
612
613 Resolve method gets all the information regarding the channel,
614 track, revision, type, source.
615
616 Returns:
617 Set with the names of the applications in original bundle.
618 """
619 url = URL.parse(uri)
620 architecture = DEFAULT_ARCHITECTURE # only AMD64 is allowed
621 res = await model.deploy_types[str(url.schema)].resolve(
622 url, architecture, entity_url=uri
623 )
624 handler = BundleHandler(model, trusted=True, forced=False)
625 await handler.fetch_plan(url, res.origin)
626 return handler.applications
627
628 def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
629 """Extract applications key in instantiation params.
630
631 Returns:
632 List with the names of the applications in instantiation params.
633
634 Raises:
635 JujuError if applications key is not found.
636 """
637 if not instantiation_params:
638 return []
639 try:
640 return [key for key in instantiation_params.get("applications")]
641 except Exception as e:
642 raise JujuError("Invalid overlay format. {}".format(str(e)))
643
644 def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
645 """Creates a temporary overlay file which includes the instantiation params.
646 Only one overlay file is created.
647
648 Returns:
649 List with one overlay filename. Empty list if there are no instantiation params.
650 """
651 if not instantiation_params:
652 return []
653 file_name = model_name + "-overlay.yaml"
654 self._write_overlay_file(file_name, instantiation_params)
655 return [file_name]
656
657 def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
658 with open(file_name, "w") as file:
659 yaml.dump(instantiation_params, file)
660
661 def _remove_overlay_file(self, overlay: list) -> None:
662 """Overlay contains either one or zero file names."""
663 if not overlay:
664 return
665 try:
666 filename = overlay[0]
667 os.remove(filename)
668 except OSError as e:
669 self.log.warning(
670 "Overlay file {} could not be removed: {}".format(filename, e)
671 )
672
673 async def add_unit(
674 self,
675 application_name: str,
676 model_name: str,
677 machine_id: str,
678 db_dict: dict = None,
679 progress_timeout: float = None,
680 total_timeout: float = None,
681 ):
682 """Add unit
683
684 :param: application_name: Application name
685 :param: model_name: Model name
686 :param: machine_id Machine id
687 :param: db_dict: Dictionary with data of the DB to write the updates
688 :param: progress_timeout: Maximum time between two updates in the model
689 :param: total_timeout: Timeout for the entity to be active
690
691 :return: None
692 """
693
694 model = None
695 controller = await self.get_controller()
696 try:
697 model = await self.get_model(controller, model_name)
698 application = self._get_application(model, application_name)
699
700 if application is not None:
701 # Checks if the given machine id in the model,
702 # otherwise function raises an error
703 _machine, _series = self._get_machine_info(model, machine_id)
704
705 self.log.debug(
706 "Adding unit (machine {}) to application {} in model ~{}".format(
707 machine_id, application_name, model_name
708 )
709 )
710
711 await application.add_unit(to=machine_id)
712
713 await JujuModelWatcher.wait_for(
714 model=model,
715 entity=application,
716 progress_timeout=progress_timeout,
717 total_timeout=total_timeout,
718 db_dict=db_dict,
719 n2vc=self.n2vc,
720 vca_id=self.vca_connection._vca_id,
721 )
722 self.log.debug(
723 "Unit is added to application {} in model {}".format(
724 application_name, model_name
725 )
726 )
727 else:
728 raise JujuApplicationNotFound(
729 "Application {} not exists".format(application_name)
730 )
731 finally:
732 if model:
733 await self.disconnect_model(model)
734 await self.disconnect_controller(controller)
735
736 async def destroy_unit(
737 self,
738 application_name: str,
739 model_name: str,
740 machine_id: str,
741 total_timeout: float = None,
742 ):
743 """Destroy unit
744
745 :param: application_name: Application name
746 :param: model_name: Model name
747 :param: machine_id Machine id
748 :param: total_timeout: Timeout for the entity to be active
749
750 :return: None
751 """
752
753 model = None
754 controller = await self.get_controller()
755 try:
756 model = await self.get_model(controller, model_name)
757 application = self._get_application(model, application_name)
758
759 if application is None:
760 raise JujuApplicationNotFound(
761 "Application not found: {} (model={})".format(
762 application_name, model_name
763 )
764 )
765
766 unit = self._get_unit(application, machine_id)
767 if not unit:
768 raise JujuError(
769 "A unit with machine id {} not in available units".format(
770 machine_id
771 )
772 )
773
774 unit_name = unit.name
775
776 self.log.debug(
777 "Destroying unit {} from application {} in model {}".format(
778 unit_name, application_name, model_name
779 )
780 )
781 await application.destroy_unit(unit_name)
782
783 self.log.debug(
784 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
785 unit_name, application_name, model_name
786 )
787 )
788
789 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
790 if total_timeout is None:
791 total_timeout = 3600
792 end = time.time() + total_timeout
793 while time.time() < end:
794 if not self._get_unit(application, machine_id):
795 self.log.debug(
796 "The unit {} was destroyed in application {} (model={}) ".format(
797 unit_name, application_name, model_name
798 )
799 )
800 return
801 await asyncio.sleep(5)
802 self.log.debug(
803 "Unit {} is destroyed from application {} in model {}".format(
804 unit_name, application_name, model_name
805 )
806 )
807 finally:
808 if model:
809 await self.disconnect_model(model)
810 await self.disconnect_controller(controller)
811
812 async def deploy_charm(
813 self,
814 application_name: str,
815 path: str,
816 model_name: str,
817 machine_id: str,
818 db_dict: dict = None,
819 progress_timeout: float = None,
820 total_timeout: float = None,
821 config: dict = None,
822 series: str = None,
823 num_units: int = 1,
824 ):
825 """Deploy charm
826
827 :param: application_name: Application name
828 :param: path: Local path to the charm
829 :param: model_name: Model name
830 :param: machine_id ID of the machine
831 :param: db_dict: Dictionary with data of the DB to write the updates
832 :param: progress_timeout: Maximum time between two updates in the model
833 :param: total_timeout: Timeout for the entity to be active
834 :param: config: Config for the charm
835 :param: series: Series of the charm
836 :param: num_units: Number of units
837
838 :return: (juju.application.Application): Juju application
839 """
840 self.log.debug(
841 "Deploying charm {} to machine {} in model ~{}".format(
842 application_name, machine_id, model_name
843 )
844 )
845 self.log.debug("charm: {}".format(path))
846
847 # Get controller
848 controller = await self.get_controller()
849
850 # Get model
851 model = await self.get_model(controller, model_name)
852
853 try:
854 if application_name not in model.applications:
855 if machine_id is not None:
856 machine, series = self._get_machine_info(model, machine_id)
857
858 application = await model.deploy(
859 entity_url=path,
860 application_name=application_name,
861 channel="stable",
862 num_units=1,
863 series=series,
864 to=machine_id,
865 config=config,
866 )
867
868 self.log.debug(
869 "Wait until application {} is ready in model {}".format(
870 application_name, model_name
871 )
872 )
873 if num_units > 1:
874 for _ in range(num_units - 1):
875 m, _ = await self.create_machine(model_name, wait=False)
876 await application.add_unit(to=m.entity_id)
877
878 await JujuModelWatcher.wait_for(
879 model=model,
880 entity=application,
881 progress_timeout=progress_timeout,
882 total_timeout=total_timeout,
883 db_dict=db_dict,
884 n2vc=self.n2vc,
885 vca_id=self.vca_connection._vca_id,
886 )
887 self.log.debug(
888 "Application {} is ready in model {}".format(
889 application_name, model_name
890 )
891 )
892 else:
893 raise JujuApplicationExists(
894 "Application {} exists".format(application_name)
895 )
896 except juju.errors.JujuError as e:
897 if "already exists" in e.message:
898 raise JujuApplicationExists(
899 "Application {} exists".format(application_name)
900 )
901 else:
902 raise e
903 finally:
904 await self.disconnect_model(model)
905 await self.disconnect_controller(controller)
906
907 return application
908
909 async def upgrade_charm(
910 self,
911 application_name: str,
912 path: str,
913 model_name: str,
914 total_timeout: float = None,
915 **kwargs,
916 ):
917 """Upgrade Charm
918
919 :param: application_name: Application name
920 :param: model_name: Model name
921 :param: path: Local path to the charm
922 :param: total_timeout: Timeout for the entity to be active
923
924 :return: (str, str): (output and status)
925 """
926
927 self.log.debug(
928 "Upgrading charm {} in model {} from path {}".format(
929 application_name, model_name, path
930 )
931 )
932
933 await self.resolve_application(
934 model_name=model_name, application_name=application_name
935 )
936
937 # Get controller
938 controller = await self.get_controller()
939
940 # Get model
941 model = await self.get_model(controller, model_name)
942
943 try:
944 # Get application
945 application = self._get_application(
946 model,
947 application_name=application_name,
948 )
949 if application is None:
950 raise JujuApplicationNotFound(
951 "Cannot find application {} to upgrade".format(application_name)
952 )
953
954 await application.refresh(path=path)
955
956 self.log.debug(
957 "Wait until charm upgrade is completed for application {} (model={})".format(
958 application_name, model_name
959 )
960 )
961
962 await JujuModelWatcher.ensure_units_idle(
963 model=model, application=application
964 )
965
966 if application.status == "error":
967 error_message = "Unknown"
968 for unit in application.units:
969 if (
970 unit.workload_status == "error"
971 and unit.workload_status_message != ""
972 ):
973 error_message = unit.workload_status_message
974
975 message = "Application {} failed update in {}: {}".format(
976 application_name, model_name, error_message
977 )
978 self.log.error(message)
979 raise JujuError(message=message)
980
981 self.log.debug(
982 "Application {} is ready in model {}".format(
983 application_name, model_name
984 )
985 )
986
987 finally:
988 await self.disconnect_model(model)
989 await self.disconnect_controller(controller)
990
991 return application
992
993 async def resolve_application(self, model_name: str, application_name: str):
994 controller = await self.get_controller()
995 model = await self.get_model(controller, model_name)
996
997 try:
998 application = self._get_application(
999 model,
1000 application_name=application_name,
1001 )
1002 if application is None:
1003 raise JujuApplicationNotFound(
1004 "Cannot find application {} to resolve".format(application_name)
1005 )
1006
1007 while application.status == "error":
1008 for unit in application.units:
1009 if unit.workload_status == "error":
1010 self.log.debug(
1011 "Model {}, Application {}, Unit {} in error state, resolving".format(
1012 model_name, application_name, unit.entity_id
1013 )
1014 )
1015 try:
1016 await unit.resolved(retry=False)
1017 except Exception:
1018 pass
1019
1020 await asyncio.sleep(1)
1021
1022 finally:
1023 await self.disconnect_model(model)
1024 await self.disconnect_controller(controller)
1025
1026 async def resolve(self, model_name: str):
1027 controller = await self.get_controller()
1028 model = await self.get_model(controller, model_name)
1029 all_units_active = False
1030 try:
1031 while not all_units_active:
1032 all_units_active = True
1033 for application_name, application in model.applications.items():
1034 if application.status == "error":
1035 for unit in application.units:
1036 if unit.workload_status == "error":
1037 self.log.debug(
1038 "Model {}, Application {}, Unit {} in error state, resolving".format(
1039 model_name, application_name, unit.entity_id
1040 )
1041 )
1042 try:
1043 await unit.resolved(retry=False)
1044 all_units_active = False
1045 except Exception:
1046 pass
1047
1048 if not all_units_active:
1049 await asyncio.sleep(5)
1050 finally:
1051 await self.disconnect_model(model)
1052 await self.disconnect_controller(controller)
1053
1054 async def scale_application(
1055 self,
1056 model_name: str,
1057 application_name: str,
1058 scale: int = 1,
1059 total_timeout: float = None,
1060 ):
1061 """
1062 Scale application (K8s)
1063
1064 :param: model_name: Model name
1065 :param: application_name: Application name
1066 :param: scale: Scale to which to set this application
1067 :param: total_timeout: Timeout for the entity to be active
1068 """
1069
1070 model = None
1071 controller = await self.get_controller()
1072 try:
1073 model = await self.get_model(controller, model_name)
1074
1075 self.log.debug(
1076 "Scaling application {} in model {}".format(
1077 application_name, model_name
1078 )
1079 )
1080 application = self._get_application(model, application_name)
1081 if application is None:
1082 raise JujuApplicationNotFound("Cannot scale application")
1083 await application.scale(scale=scale)
1084 # Wait until application is scaled in model
1085 self.log.debug(
1086 "Waiting for application {} to be scaled in model {}...".format(
1087 application_name, model_name
1088 )
1089 )
1090 if total_timeout is None:
1091 total_timeout = 1800
1092 end = time.time() + total_timeout
1093 while time.time() < end:
1094 application_scale = self._get_application_count(model, application_name)
1095 # Before calling wait_for_model function,
1096 # wait until application unit count and scale count are equal.
1097 # Because there is a delay before scaling triggers in Juju model.
1098 if application_scale == scale:
1099 await JujuModelWatcher.wait_for_model(
1100 model=model, timeout=total_timeout
1101 )
1102 self.log.debug(
1103 "Application {} is scaled in model {}".format(
1104 application_name, model_name
1105 )
1106 )
1107 return
1108 await asyncio.sleep(5)
1109 raise Exception(
1110 "Timeout waiting for application {} in model {} to be scaled".format(
1111 application_name, model_name
1112 )
1113 )
1114 finally:
1115 if model:
1116 await self.disconnect_model(model)
1117 await self.disconnect_controller(controller)
1118
1119 def _get_application_count(self, model: Model, application_name: str) -> int:
1120 """Get number of units of the application
1121
1122 :param: model: Model object
1123 :param: application_name: Application name
1124
1125 :return: int (or None if application doesn't exist)
1126 """
1127 application = self._get_application(model, application_name)
1128 if application is not None:
1129 return len(application.units)
1130
1131 def _get_application(self, model: Model, application_name: str) -> Application:
1132 """Get application
1133
1134 :param: model: Model object
1135 :param: application_name: Application name
1136
1137 :return: juju.application.Application (or None if it doesn't exist)
1138 """
1139 if model.applications and application_name in model.applications:
1140 return model.applications[application_name]
1141
1142 def _get_unit(self, application: Application, machine_id: str) -> Unit:
1143 """Get unit
1144
1145 :param: application: Application object
1146 :param: machine_id: Machine id
1147
1148 :return: Unit
1149 """
1150 unit = None
1151 for u in application.units:
1152 if u.machine_id == machine_id:
1153 unit = u
1154 break
1155 return unit
1156
1157 def _get_machine_info(
1158 self,
1159 model,
1160 machine_id: str,
1161 ) -> (str, str):
1162 """Get machine info
1163
1164 :param: model: Model object
1165 :param: machine_id: Machine id
1166
1167 :return: (str, str): (machine, series)
1168 """
1169 if machine_id not in model.machines:
1170 msg = "Machine {} not found in model".format(machine_id)
1171 self.log.error(msg=msg)
1172 raise JujuMachineNotFound(msg)
1173 machine = model.machines[machine_id]
1174 return machine, machine.series
1175
1176 async def execute_action(
1177 self,
1178 application_name: str,
1179 model_name: str,
1180 action_name: str,
1181 db_dict: dict = None,
1182 machine_id: str = None,
1183 progress_timeout: float = None,
1184 total_timeout: float = None,
1185 **kwargs,
1186 ):
1187 """Execute action
1188
1189 :param: application_name: Application name
1190 :param: model_name: Model name
1191 :param: action_name: Name of the action
1192 :param: db_dict: Dictionary with data of the DB to write the updates
1193 :param: machine_id Machine id
1194 :param: progress_timeout: Maximum time between two updates in the model
1195 :param: total_timeout: Timeout for the entity to be active
1196
1197 :return: (str, str): (output and status)
1198 """
1199 self.log.debug(
1200 "Executing action {} using params {}".format(action_name, kwargs)
1201 )
1202 # Get controller
1203 controller = await self.get_controller()
1204
1205 # Get model
1206 model = await self.get_model(controller, model_name)
1207
1208 try:
1209 # Get application
1210 application = self._get_application(
1211 model,
1212 application_name=application_name,
1213 )
1214 if application is None:
1215 raise JujuApplicationNotFound("Cannot execute action")
1216 # Racing condition:
1217 # Ocassionally, self._get_leader_unit() will return None
1218 # because the leader elected hook has not been triggered yet.
1219 # Therefore, we are doing some retries. If it happens again,
1220 # re-open bug 1236
1221 if machine_id is None:
1222 unit = await self._get_leader_unit(application)
1223 self.log.debug(
1224 "Action {} is being executed on the leader unit {}".format(
1225 action_name, unit.name
1226 )
1227 )
1228 else:
1229 unit = self._get_unit(application, machine_id)
1230 if not unit:
1231 raise JujuError(
1232 "A unit with machine id {} not in available units".format(
1233 machine_id
1234 )
1235 )
1236 self.log.debug(
1237 "Action {} is being executed on {} unit".format(
1238 action_name, unit.name
1239 )
1240 )
1241
1242 actions = await application.get_actions()
1243
1244 if action_name not in actions:
1245 raise JujuActionNotFound(
1246 "Action {} not in available actions".format(action_name)
1247 )
1248
1249 action = await unit.run_action(action_name, **kwargs)
1250
1251 self.log.debug(
1252 "Wait until action {} is completed in application {} (model={})".format(
1253 action_name, application_name, model_name
1254 )
1255 )
1256 await JujuModelWatcher.wait_for(
1257 model=model,
1258 entity=action,
1259 progress_timeout=progress_timeout,
1260 total_timeout=total_timeout,
1261 db_dict=db_dict,
1262 n2vc=self.n2vc,
1263 vca_id=self.vca_connection._vca_id,
1264 )
1265
1266 output = await model.get_action_output(action_uuid=action.entity_id)
1267 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1268 status = (
1269 status[action.entity_id] if action.entity_id in status else "failed"
1270 )
1271
1272 self.log.debug(
1273 "Action {} completed with status {} in application {} (model={})".format(
1274 action_name, action.status, application_name, model_name
1275 )
1276 )
1277 finally:
1278 await self.disconnect_model(model)
1279 await self.disconnect_controller(controller)
1280
1281 return output, status
1282
1283 async def get_actions(self, application_name: str, model_name: str) -> dict:
1284 """Get list of actions
1285
1286 :param: application_name: Application name
1287 :param: model_name: Model name
1288
1289 :return: Dict with this format
1290 {
1291 "action_name": "Description of the action",
1292 ...
1293 }
1294 """
1295 self.log.debug(
1296 "Getting list of actions for application {}".format(application_name)
1297 )
1298
1299 # Get controller
1300 controller = await self.get_controller()
1301
1302 # Get model
1303 model = await self.get_model(controller, model_name)
1304
1305 try:
1306 # Get application
1307 application = self._get_application(
1308 model,
1309 application_name=application_name,
1310 )
1311
1312 # Return list of actions
1313 return await application.get_actions()
1314
1315 finally:
1316 # Disconnect from model and controller
1317 await self.disconnect_model(model)
1318 await self.disconnect_controller(controller)
1319
1320 async def get_metrics(self, model_name: str, application_name: str) -> dict:
1321 """Get the metrics collected by the VCA.
1322
1323 :param model_name The name or unique id of the network service
1324 :param application_name The name of the application
1325 """
1326 if not model_name or not application_name:
1327 raise Exception("model_name and application_name must be non-empty strings")
1328 metrics = {}
1329 controller = await self.get_controller()
1330 model = await self.get_model(controller, model_name)
1331 try:
1332 application = self._get_application(model, application_name)
1333 if application is not None:
1334 metrics = await application.get_metrics()
1335 finally:
1336 self.disconnect_model(model)
1337 self.disconnect_controller(controller)
1338 return metrics
1339
1340 async def add_relation(
1341 self,
1342 model_name: str,
1343 endpoint_1: str,
1344 endpoint_2: str,
1345 ):
1346 """Add relation
1347
1348 :param: model_name: Model name
1349 :param: endpoint_1 First endpoint name
1350 ("app:endpoint" format or directly the saas name)
1351 :param: endpoint_2: Second endpoint name (^ same format)
1352 """
1353
1354 self.log.debug("Adding relation: {} -> {}".format(endpoint_1, endpoint_2))
1355
1356 # Get controller
1357 controller = await self.get_controller()
1358
1359 # Get model
1360 model = await self.get_model(controller, model_name)
1361
1362 # Add relation
1363 try:
1364 await model.add_relation(endpoint_1, endpoint_2)
1365 except juju.errors.JujuAPIError as e:
1366 if self._relation_is_not_found(e):
1367 self.log.warning("Relation not found: {}".format(e.message))
1368 return
1369 if self._relation_already_exist(e):
1370 self.log.warning("Relation already exists: {}".format(e.message))
1371 return
1372 # another exception, raise it
1373 raise e
1374 finally:
1375 await self.disconnect_model(model)
1376 await self.disconnect_controller(controller)
1377
1378 def _relation_is_not_found(self, juju_error):
1379 text = "not found"
1380 return (text in juju_error.message) or (
1381 juju_error.error_code and text in juju_error.error_code
1382 )
1383
1384 def _relation_already_exist(self, juju_error):
1385 text = "already exists"
1386 return (text in juju_error.message) or (
1387 juju_error.error_code and text in juju_error.error_code
1388 )
1389
1390 async def offer(self, endpoint: RelationEndpoint) -> Offer:
1391 """
1392 Create an offer from a RelationEndpoint
1393
1394 :param: endpoint: Relation endpoint
1395
1396 :return: Offer object
1397 """
1398 model_name = endpoint.model_name
1399 offer_name = f"{endpoint.application_name}-{endpoint.endpoint_name}"
1400 controller = await self.get_controller()
1401 model = None
1402 try:
1403 model = await self.get_model(controller, model_name)
1404 await model.create_offer(endpoint.endpoint, offer_name=offer_name)
1405 offer_list = await self._list_offers(model_name, offer_name=offer_name)
1406 if offer_list:
1407 return Offer(offer_list[0].offer_url)
1408 else:
1409 raise Exception("offer was not created")
1410 except juju.errors.JujuError as e:
1411 if "application offer already exists" not in e.message:
1412 raise e
1413 finally:
1414 if model:
1415 self.disconnect_model(model)
1416 self.disconnect_controller(controller)
1417
1418 async def consume(
1419 self,
1420 model_name: str,
1421 offer: Offer,
1422 provider_libjuju: "Libjuju",
1423 ) -> str:
1424 """
1425 Consumes a remote offer in the model. Relations can be created later using "juju relate".
1426
1427 :param: model_name: Model name
1428 :param: offer: Offer object to consume
1429 :param: provider_libjuju: Libjuju object of the provider endpoint
1430
1431 :raises ParseError if there's a problem parsing the offer_url
1432 :raises JujuError if remote offer includes and endpoint
1433 :raises JujuAPIError if the operation is not successful
1434
1435 :returns: Saas name. It is the application name in the model that reference the remote application.
1436 """
1437 saas_name = f'{offer.name}-{offer.model_name.replace("-", "")}'
1438 if offer.vca_id:
1439 saas_name = f"{saas_name}-{offer.vca_id}"
1440 controller = await self.get_controller()
1441 model = None
1442 provider_controller = None
1443 try:
1444 model = await controller.get_model(model_name)
1445 provider_controller = await provider_libjuju.get_controller()
1446 await model.consume(
1447 offer.url, application_alias=saas_name, controller=provider_controller
1448 )
1449 return saas_name
1450 finally:
1451 if model:
1452 await self.disconnect_model(model)
1453 if provider_controller:
1454 await provider_libjuju.disconnect_controller(provider_controller)
1455 await self.disconnect_controller(controller)
1456
1457 async def destroy_model(self, model_name: str, total_timeout: float = 1800):
1458 """
1459 Destroy model
1460
1461 :param: model_name: Model name
1462 :param: total_timeout: Timeout
1463 """
1464
1465 controller = await self.get_controller()
1466 model = None
1467 try:
1468 if not await self.model_exists(model_name, controller=controller):
1469 self.log.warn(f"Model {model_name} doesn't exist")
1470 return
1471
1472 self.log.debug(f"Getting model {model_name} to be destroyed")
1473 model = await self.get_model(controller, model_name)
1474 self.log.debug(f"Destroying manual machines in model {model_name}")
1475 # Destroy machines that are manually provisioned
1476 # and still are in pending state
1477 await self._destroy_pending_machines(model, only_manual=True)
1478 await self.disconnect_model(model)
1479
1480 await asyncio.wait_for(
1481 self._destroy_model(model_name, controller),
1482 timeout=total_timeout,
1483 )
1484 except Exception as e:
1485 if not await self.model_exists(model_name, controller=controller):
1486 self.log.warn(
1487 f"Failed deleting model {model_name}: model doesn't exist"
1488 )
1489 return
1490 self.log.warn(f"Failed deleting model {model_name}: {e}")
1491 raise e
1492 finally:
1493 if model:
1494 await self.disconnect_model(model)
1495 await self.disconnect_controller(controller)
1496
1497 async def _destroy_model(
1498 self,
1499 model_name: str,
1500 controller: Controller,
1501 ):
1502 """
1503 Destroy model from controller
1504
1505 :param: model: Model name to be removed
1506 :param: controller: Controller object
1507 :param: timeout: Timeout in seconds
1508 """
1509 self.log.debug(f"Destroying model {model_name}")
1510
1511 async def _destroy_model_gracefully(model_name: str, controller: Controller):
1512 self.log.info(f"Gracefully deleting model {model_name}")
1513 resolved = False
1514 while model_name in await controller.list_models():
1515 if not resolved:
1516 await self.resolve(model_name)
1517 resolved = True
1518 await controller.destroy_model(model_name, destroy_storage=True)
1519
1520 await asyncio.sleep(5)
1521 self.log.info(f"Model {model_name} deleted gracefully")
1522
1523 async def _destroy_model_forcefully(model_name: str, controller: Controller):
1524 self.log.info(f"Forcefully deleting model {model_name}")
1525 while model_name in await controller.list_models():
1526 await controller.destroy_model(
1527 model_name, destroy_storage=True, force=True, max_wait=60
1528 )
1529 await asyncio.sleep(5)
1530 self.log.info(f"Model {model_name} deleted forcefully")
1531
1532 try:
1533 try:
1534 await asyncio.wait_for(
1535 _destroy_model_gracefully(model_name, controller), timeout=120
1536 )
1537 except asyncio.TimeoutError:
1538 await _destroy_model_forcefully(model_name, controller)
1539 except juju.errors.JujuError as e:
1540 if any("has been removed" in error for error in e.errors):
1541 return
1542 if any("model not found" in error for error in e.errors):
1543 return
1544 raise e
1545
1546 async def destroy_application(
1547 self, model_name: str, application_name: str, total_timeout: float
1548 ):
1549 """
1550 Destroy application
1551
1552 :param: model_name: Model name
1553 :param: application_name: Application name
1554 :param: total_timeout: Timeout
1555 """
1556
1557 controller = await self.get_controller()
1558 model = None
1559
1560 try:
1561 model = await self.get_model(controller, model_name)
1562 self.log.debug(
1563 "Destroying application {} in model {}".format(
1564 application_name, model_name
1565 )
1566 )
1567 application = self._get_application(model, application_name)
1568 if application:
1569 await application.destroy()
1570 else:
1571 self.log.warning("Application not found: {}".format(application_name))
1572
1573 self.log.debug(
1574 "Waiting for application {} to be destroyed in model {}...".format(
1575 application_name, model_name
1576 )
1577 )
1578 if total_timeout is None:
1579 total_timeout = 3600
1580 end = time.time() + total_timeout
1581 while time.time() < end:
1582 if not self._get_application(model, application_name):
1583 self.log.debug(
1584 "The application {} was destroyed in model {} ".format(
1585 application_name, model_name
1586 )
1587 )
1588 return
1589 await asyncio.sleep(5)
1590 raise Exception(
1591 "Timeout waiting for application {} to be destroyed in model {}".format(
1592 application_name, model_name
1593 )
1594 )
1595 finally:
1596 if model is not None:
1597 await self.disconnect_model(model)
1598 await self.disconnect_controller(controller)
1599
1600 async def _destroy_pending_machines(self, model: Model, only_manual: bool = False):
1601 """
1602 Destroy pending machines in a given model
1603
1604 :param: only_manual: Bool that indicates only manually provisioned
1605 machines should be destroyed (if True), or that
1606 all pending machines should be destroyed
1607 """
1608 status = await model.get_status()
1609 for machine_id in status.machines:
1610 machine_status = status.machines[machine_id]
1611 if machine_status.agent_status.status == "pending":
1612 if only_manual and not machine_status.instance_id.startswith("manual:"):
1613 break
1614 machine = model.machines[machine_id]
1615 await machine.destroy(force=True)
1616
1617 async def configure_application(
1618 self, model_name: str, application_name: str, config: dict = None
1619 ):
1620 """Configure application
1621
1622 :param: model_name: Model name
1623 :param: application_name: Application name
1624 :param: config: Config to apply to the charm
1625 """
1626 self.log.debug("Configuring application {}".format(application_name))
1627
1628 if config:
1629 controller = await self.get_controller()
1630 model = None
1631 try:
1632 model = await self.get_model(controller, model_name)
1633 application = self._get_application(
1634 model,
1635 application_name=application_name,
1636 )
1637 await application.set_config(config)
1638 finally:
1639 if model:
1640 await self.disconnect_model(model)
1641 await self.disconnect_controller(controller)
1642
1643 def handle_exception(self, loop, context):
1644 # All unhandled exceptions by libjuju are handled here.
1645 pass
1646
1647 async def health_check(self, interval: float = 300.0):
1648 """
1649 Health check to make sure controller and controller_model connections are OK
1650
1651 :param: interval: Time in seconds between checks
1652 """
1653 controller = None
1654 while True:
1655 try:
1656 controller = await self.get_controller()
1657 # self.log.debug("VCA is alive")
1658 except Exception as e:
1659 self.log.error("Health check to VCA failed: {}".format(e))
1660 finally:
1661 await self.disconnect_controller(controller)
1662 await asyncio.sleep(interval)
1663
1664 async def list_models(self, contains: str = None) -> [str]:
1665 """List models with certain names
1666
1667 :param: contains: String that is contained in model name
1668
1669 :retur: [models] Returns list of model names
1670 """
1671
1672 controller = await self.get_controller()
1673 try:
1674 models = await controller.list_models()
1675 if contains:
1676 models = [model for model in models if contains in model]
1677 return models
1678 finally:
1679 await self.disconnect_controller(controller)
1680
1681 async def _list_offers(
1682 self, model_name: str, offer_name: str = None
1683 ) -> QueryApplicationOffersResults:
1684 """
1685 List offers within a model
1686
1687 :param: model_name: Model name
1688 :param: offer_name: Offer name to filter.
1689
1690 :return: Returns application offers results in the model
1691 """
1692
1693 controller = await self.get_controller()
1694 try:
1695 offers = (await controller.list_offers(model_name)).results
1696 if offer_name:
1697 matching_offer = []
1698 for offer in offers:
1699 if offer.offer_name == offer_name:
1700 matching_offer.append(offer)
1701 break
1702 offers = matching_offer
1703 return offers
1704 finally:
1705 await self.disconnect_controller(controller)
1706
1707 async def add_k8s(
1708 self,
1709 name: str,
1710 rbac_id: str,
1711 token: str,
1712 client_cert_data: str,
1713 configuration: Configuration,
1714 storage_class: str,
1715 credential_name: str = None,
1716 ):
1717 """
1718 Add a Kubernetes cloud to the controller
1719
1720 Similar to the `juju add-k8s` command in the CLI
1721
1722 :param: name: Name for the K8s cloud
1723 :param: configuration: Kubernetes configuration object
1724 :param: storage_class: Storage Class to use in the cloud
1725 :param: credential_name: Storage Class to use in the cloud
1726 """
1727
1728 if not storage_class:
1729 raise Exception("storage_class must be a non-empty string")
1730 if not name:
1731 raise Exception("name must be a non-empty string")
1732 if not configuration:
1733 raise Exception("configuration must be provided")
1734
1735 endpoint = configuration.host
1736 credential = self.get_k8s_cloud_credential(
1737 configuration,
1738 client_cert_data,
1739 token,
1740 )
1741 credential.attrs[RBAC_LABEL_KEY_NAME] = rbac_id
1742 cloud = client.Cloud(
1743 type_="kubernetes",
1744 auth_types=[credential.auth_type],
1745 endpoint=endpoint,
1746 ca_certificates=[client_cert_data],
1747 config={
1748 "operator-storage": storage_class,
1749 "workload-storage": storage_class,
1750 },
1751 )
1752
1753 return await self.add_cloud(
1754 name, cloud, credential, credential_name=credential_name
1755 )
1756
1757 def get_k8s_cloud_credential(
1758 self,
1759 configuration: Configuration,
1760 client_cert_data: str,
1761 token: str = None,
1762 ) -> client.CloudCredential:
1763 attrs = {}
1764 # TODO: Test with AKS
1765 key = None # open(configuration.key_file, "r").read()
1766 username = configuration.username
1767 password = configuration.password
1768
1769 if client_cert_data:
1770 attrs["ClientCertificateData"] = client_cert_data
1771 if key:
1772 attrs["ClientKeyData"] = key
1773 if token:
1774 if username or password:
1775 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1776 attrs["Token"] = token
1777
1778 auth_type = None
1779 if key:
1780 auth_type = "oauth2"
1781 if client_cert_data:
1782 auth_type = "oauth2withcert"
1783 if not token:
1784 raise JujuInvalidK8sConfiguration(
1785 "missing token for auth type {}".format(auth_type)
1786 )
1787 elif username:
1788 if not password:
1789 self.log.debug(
1790 "credential for user {} has empty password".format(username)
1791 )
1792 attrs["username"] = username
1793 attrs["password"] = password
1794 if client_cert_data:
1795 auth_type = "userpasswithcert"
1796 else:
1797 auth_type = "userpass"
1798 elif client_cert_data and token:
1799 auth_type = "certificate"
1800 else:
1801 raise JujuInvalidK8sConfiguration("authentication method not supported")
1802 return client.CloudCredential(auth_type=auth_type, attrs=attrs)
1803
1804 async def add_cloud(
1805 self,
1806 name: str,
1807 cloud: Cloud,
1808 credential: CloudCredential = None,
1809 credential_name: str = None,
1810 ) -> Cloud:
1811 """
1812 Add cloud to the controller
1813
1814 :param: name: Name of the cloud to be added
1815 :param: cloud: Cloud object
1816 :param: credential: CloudCredentials object for the cloud
1817 :param: credential_name: Credential name.
1818 If not defined, cloud of the name will be used.
1819 """
1820 controller = await self.get_controller()
1821 try:
1822 _ = await controller.add_cloud(name, cloud)
1823 if credential:
1824 await controller.add_credential(
1825 credential_name or name, credential=credential, cloud=name
1826 )
1827 # Need to return the object returned by the controller.add_cloud() function
1828 # I'm returning the original value now until this bug is fixed:
1829 # https://github.com/juju/python-libjuju/issues/443
1830 return cloud
1831 finally:
1832 await self.disconnect_controller(controller)
1833
1834 async def remove_cloud(self, name: str):
1835 """
1836 Remove cloud
1837
1838 :param: name: Name of the cloud to be removed
1839 """
1840 controller = await self.get_controller()
1841 try:
1842 await controller.remove_cloud(name)
1843 except juju.errors.JujuError as e:
1844 if len(e.errors) == 1 and f'cloud "{name}" not found' == e.errors[0]:
1845 self.log.warning(f"Cloud {name} not found, so it could not be deleted.")
1846 else:
1847 raise e
1848 finally:
1849 await self.disconnect_controller(controller)
1850
1851 @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
1852 async def _get_leader_unit(self, application: Application) -> Unit:
1853 unit = None
1854 for u in application.units:
1855 if await u.is_leader_from_status():
1856 unit = u
1857 break
1858 if not unit:
1859 raise Exception()
1860 return unit
1861
1862 async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
1863 """
1864 Get cloud credentials
1865
1866 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1867
1868 :return: List of credentials object associated to the specified cloud
1869
1870 """
1871 controller = await self.get_controller()
1872 try:
1873 facade = client.CloudFacade.from_connection(controller.connection())
1874 cloud_cred_tag = tag.credential(
1875 cloud.name, self.vca_connection.data.user, cloud.credential_name
1876 )
1877 params = [client.Entity(cloud_cred_tag)]
1878 return (await facade.Credential(params)).results
1879 finally:
1880 await self.disconnect_controller(controller)
1881
1882 async def check_application_exists(self, model_name, application_name) -> bool:
1883 """Check application exists
1884
1885 :param: model_name: Model Name
1886 :param: application_name: Application Name
1887
1888 :return: Boolean
1889 """
1890
1891 model = None
1892 controller = await self.get_controller()
1893 try:
1894 model = await self.get_model(controller, model_name)
1895 self.log.debug(
1896 "Checking if application {} exists in model {}".format(
1897 application_name, model_name
1898 )
1899 )
1900 return self._get_application(model, application_name) is not None
1901 finally:
1902 if model:
1903 await self.disconnect_model(model)
1904 await self.disconnect_controller(controller)