1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.unit
import Unit
26 from juju
.client
._definitions
import (
28 QueryApplicationOffersResults
,
32 from juju
.controller
import Controller
33 from juju
.client
import client
36 from n2vc
.juju_watcher
import JujuModelWatcher
37 from n2vc
.provisioner
import AsyncSSHProvisioner
38 from n2vc
.n2vc_conn
import N2VCConnector
39 from n2vc
.exceptions
import (
41 JujuApplicationNotFound
,
42 JujuLeaderUnitNotFound
,
44 JujuControllerFailedConnecting
,
45 JujuApplicationExists
,
46 JujuInvalidK8sConfiguration
,
49 from n2vc
.vca
.cloud
import Cloud
as VcaCloud
50 from n2vc
.vca
.connection
import Connection
51 from kubernetes
.client
.configuration
import Configuration
52 from retrying_async
import retry
55 RBAC_LABEL_KEY_NAME
= "rbac-id"
61 vca_connection
: Connection
,
62 loop
: asyncio
.AbstractEventLoop
= None,
63 log
: logging
.Logger
= None,
64 n2vc
: N2VCConnector
= None,
69 :param: vca_connection: n2vc.vca.connection object
70 :param: loop: Asyncio loop
72 :param: n2vc: N2VC object
75 self
.log
= log
or logging
.getLogger("Libjuju")
77 self
.vca_connection
= vca_connection
79 self
.loop
= loop
or asyncio
.get_event_loop()
80 self
.loop
.set_exception_handler(self
.handle_exception
)
81 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
83 if self
.vca_connection
.is_default
:
84 self
.health_check_task
= self
._create
_health
_check
_task
()
86 def _create_health_check_task(self
):
87 return self
.loop
.create_task(self
.health_check())
89 async def get_controller(self
, timeout
: float = 60.0) -> Controller
:
93 :param: timeout: Time in seconds to wait for controller to connect
97 controller
= Controller()
98 await asyncio
.wait_for(
100 endpoint
=self
.vca_connection
.data
.endpoints
,
101 username
=self
.vca_connection
.data
.user
,
102 password
=self
.vca_connection
.data
.secret
,
103 cacert
=self
.vca_connection
.data
.cacert
,
107 if self
.vca_connection
.is_default
:
108 endpoints
= await controller
.api_endpoints
110 endpoint
in self
.vca_connection
.endpoints
for endpoint
in endpoints
112 await self
.vca_connection
.update_endpoints(endpoints
)
114 except asyncio
.CancelledError
as e
:
116 except Exception as e
:
118 "Failed connecting to controller: {}... {}".format(
119 self
.vca_connection
.data
.endpoints
, e
123 await self
.disconnect_controller(controller
)
124 raise JujuControllerFailedConnecting(e
)
126 async def disconnect(self
):
128 # Cancel health check task
129 self
.health_check_task
.cancel()
130 self
.log
.debug("Libjuju disconnected!")
132 async def disconnect_model(self
, model
: Model
):
136 :param: model: Model that will be disconnected
138 await model
.disconnect()
140 async def disconnect_controller(self
, controller
: Controller
):
142 Disconnect controller
144 :param: controller: Controller that will be disconnected
147 await controller
.disconnect()
149 @retry(attempts
=3, delay
=5, timeout
=None)
150 async def add_model(self
, model_name
: str, cloud
: VcaCloud
):
154 :param: model_name: Model name
155 :param: cloud: Cloud object
159 controller
= await self
.get_controller()
162 # Block until other workers have finished model creation
163 while self
.creating_model
.locked():
164 await asyncio
.sleep(0.1)
167 async with self
.creating_model
:
168 if await self
.model_exists(model_name
, controller
=controller
):
170 self
.log
.debug("Creating model {}".format(model_name
))
171 model
= await controller
.add_model(
173 config
=self
.vca_connection
.data
.model_config
,
174 cloud_name
=cloud
.name
,
175 credential_name
=cloud
.credential_name
,
177 except juju
.errors
.JujuAPIError
as e
:
178 if "already exists" in e
.message
:
184 await self
.disconnect_model(model
)
185 await self
.disconnect_controller(controller
)
187 async def get_executed_actions(self
, model_name
: str) -> list:
189 Get executed/history of actions for a model.
191 :param: model_name: Model name, str.
192 :return: List of executed actions for a model.
195 executed_actions
= []
196 controller
= await self
.get_controller()
198 model
= await self
.get_model(controller
, model_name
)
199 # Get all unique action names
201 for application
in model
.applications
:
202 application_actions
= await self
.get_actions(application
, model_name
)
203 actions
.update(application_actions
)
204 # Get status of all actions
205 for application_action
in actions
:
206 app_action_status_list
= await model
.get_action_status(
207 name
=application_action
209 for action_id
, action_status
in app_action_status_list
.items():
212 "action": application_action
,
213 "status": action_status
,
215 # Get action output by id
216 action_status
= await model
.get_action_output(executed_action
["id"])
217 for k
, v
in action_status
.items():
218 executed_action
[k
] = v
219 executed_actions
.append(executed_action
)
220 except Exception as e
:
222 "Error in getting executed actions for model: {}. Error: {}".format(
228 await self
.disconnect_model(model
)
229 await self
.disconnect_controller(controller
)
230 return executed_actions
232 async def get_application_configs(
233 self
, model_name
: str, application_name
: str
236 Get available configs for an application.
238 :param: model_name: Model name, str.
239 :param: application_name: Application name, str.
241 :return: A dict which has key - action name, value - action description
244 application_configs
= {}
245 controller
= await self
.get_controller()
247 model
= await self
.get_model(controller
, model_name
)
248 application
= self
._get
_application
(
249 model
, application_name
=application_name
251 application_configs
= await application
.get_config()
252 except Exception as e
:
254 "Error in getting configs for application: {} in model: {}. Error: {}".format(
255 application_name
, model_name
, str(e
)
260 await self
.disconnect_model(model
)
261 await self
.disconnect_controller(controller
)
262 return application_configs
264 @retry(attempts
=3, delay
=5)
265 async def get_model(self
, controller
: Controller
, model_name
: str) -> Model
:
267 Get model from controller
269 :param: controller: Controller
270 :param: model_name: Model name
272 :return: Model: The created Juju model object
274 return await controller
.get_model(model_name
)
276 async def model_exists(
277 self
, model_name
: str, controller
: Controller
= None
280 Check if model exists
282 :param: controller: Controller
283 :param: model_name: Model name
287 need_to_disconnect
= False
289 # Get controller if not passed
291 controller
= await self
.get_controller()
292 need_to_disconnect
= True
294 # Check if model exists
296 return model_name
in await controller
.list_models()
298 if need_to_disconnect
:
299 await self
.disconnect_controller(controller
)
301 async def models_exist(self
, model_names
: [str]) -> (bool, list):
303 Check if models exists
305 :param: model_names: List of strings with model names
307 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
311 "model_names must be a non-empty array. Given value: {}".format(
315 non_existing_models
= []
316 models
= await self
.list_models()
317 existing_models
= list(set(models
).intersection(model_names
))
318 non_existing_models
= list(set(model_names
) - set(existing_models
))
321 len(non_existing_models
) == 0,
325 async def get_model_status(self
, model_name
: str) -> FullStatus
:
329 :param: model_name: Model name
331 :return: Full status object
333 controller
= await self
.get_controller()
334 model
= await self
.get_model(controller
, model_name
)
336 return await model
.get_status()
338 await self
.disconnect_model(model
)
339 await self
.disconnect_controller(controller
)
341 async def create_machine(
344 machine_id
: str = None,
345 db_dict
: dict = None,
346 progress_timeout
: float = None,
347 total_timeout
: float = None,
348 series
: str = "bionic",
350 ) -> (Machine
, bool):
354 :param: model_name: Model name
355 :param: machine_id: Machine id
356 :param: db_dict: Dictionary with data of the DB to write the updates
357 :param: progress_timeout: Maximum time between two updates in the model
358 :param: total_timeout: Timeout for the entity to be active
359 :param: series: Series of the machine (xenial, bionic, focal, ...)
360 :param: wait: Wait until machine is ready
362 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
363 if the machine is new or it already existed
369 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
373 controller
= await self
.get_controller()
376 model
= await self
.get_model(controller
, model_name
)
378 if machine_id
is not None:
380 "Searching machine (id={}) in model {}".format(
381 machine_id
, model_name
385 # Get machines from model and get the machine with machine_id if exists
386 machines
= await model
.get_machines()
387 if machine_id
in machines
:
389 "Machine (id={}) found in model {}".format(
390 machine_id
, model_name
393 machine
= machines
[machine_id
]
395 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
398 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
401 machine
= await model
.add_machine(
402 spec
=None, constraints
=None, disks
=None, series
=series
406 # Wait until the machine is ready
408 "Wait until machine {} is ready in model {}".format(
409 machine
.entity_id
, model_name
413 await JujuModelWatcher
.wait_for(
416 progress_timeout
=progress_timeout
,
417 total_timeout
=total_timeout
,
420 vca_id
=self
.vca_connection
._vca
_id
,
423 await self
.disconnect_model(model
)
424 await self
.disconnect_controller(controller
)
427 "Machine {} ready at {} in model {}".format(
428 machine
.entity_id
, machine
.dns_name
, model_name
433 async def provision_machine(
438 private_key_path
: str,
439 db_dict
: dict = None,
440 progress_timeout
: float = None,
441 total_timeout
: float = None,
444 Manually provisioning of a machine
446 :param: model_name: Model name
447 :param: hostname: IP to access the machine
448 :param: username: Username to login to the machine
449 :param: private_key_path: Local path for the private key
450 :param: db_dict: Dictionary with data of the DB to write the updates
451 :param: progress_timeout: Maximum time between two updates in the model
452 :param: total_timeout: Timeout for the entity to be active
454 :return: (Entity): Machine id
457 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
458 model_name
, hostname
, username
463 controller
= await self
.get_controller()
466 model
= await self
.get_model(controller
, model_name
)
470 provisioner
= AsyncSSHProvisioner(
473 private_key_path
=private_key_path
,
478 params
= await provisioner
.provision_machine()
480 params
.jobs
= ["JobHostUnits"]
482 self
.log
.debug("Adding machine to model")
483 connection
= model
.connection()
484 client_facade
= client
.ClientFacade
.from_connection(connection
)
486 results
= await client_facade
.AddMachines(params
=[params
])
487 error
= results
.machines
[0].error
490 msg
= "Error adding machine: {}".format(error
.message
)
491 self
.log
.error(msg
=msg
)
492 raise ValueError(msg
)
494 machine_id
= results
.machines
[0].machine
496 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
497 asyncio
.ensure_future(
498 provisioner
.install_agent(
499 connection
=connection
,
501 machine_id
=machine_id
,
502 proxy
=self
.vca_connection
.data
.api_proxy
,
503 series
=params
.series
,
509 machine_list
= await model
.get_machines()
510 if machine_id
in machine_list
:
511 self
.log
.debug("Machine {} found in model!".format(machine_id
))
512 machine
= model
.machines
.get(machine_id
)
514 await asyncio
.sleep(2)
517 msg
= "Machine {} not found in model".format(machine_id
)
518 self
.log
.error(msg
=msg
)
519 raise JujuMachineNotFound(msg
)
522 "Wait until machine {} is ready in model {}".format(
523 machine
.entity_id
, model_name
526 await JujuModelWatcher
.wait_for(
529 progress_timeout
=progress_timeout
,
530 total_timeout
=total_timeout
,
533 vca_id
=self
.vca_connection
._vca
_id
,
535 except Exception as e
:
538 await self
.disconnect_model(model
)
539 await self
.disconnect_controller(controller
)
542 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
548 self
, uri
: str, model_name
: str, wait
: bool = True, timeout
: float = 3600
551 Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
553 :param: uri: Path or Charm Store uri in which the charm or bundle can be found
554 :param: model_name: Model name
555 :param: wait: Indicates whether to wait or not until all applications are active
556 :param: timeout: Time in seconds to wait until all applications are active
558 controller
= await self
.get_controller()
559 model
= await self
.get_model(controller
, model_name
)
561 await model
.deploy(uri
, trust
=True)
563 await JujuModelWatcher
.wait_for_model(model
, timeout
=timeout
)
564 self
.log
.debug("All units active in model {}".format(model_name
))
566 await self
.disconnect_model(model
)
567 await self
.disconnect_controller(controller
)
571 application_name
: str,
574 db_dict
: dict = None,
575 progress_timeout
: float = None,
576 total_timeout
: float = None,
580 :param: application_name: Application name
581 :param: model_name: Model name
582 :param: machine_id Machine id
583 :param: db_dict: Dictionary with data of the DB to write the updates
584 :param: progress_timeout: Maximum time between two updates in the model
585 :param: total_timeout: Timeout for the entity to be active
591 controller
= await self
.get_controller()
593 model
= await self
.get_model(controller
, model_name
)
594 application
= self
._get
_application
(model
, application_name
)
596 if application
is not None:
598 # Checks if the given machine id in the model,
599 # otherwise function raises an error
600 _machine
, _series
= self
._get
_machine
_info
(model
, machine_id
)
603 "Adding unit (machine {}) to application {} in model ~{}".format(
604 machine_id
, application_name
, model_name
608 await application
.add_unit(to
=machine_id
)
610 await JujuModelWatcher
.wait_for(
613 progress_timeout
=progress_timeout
,
614 total_timeout
=total_timeout
,
617 vca_id
=self
.vca_connection
._vca
_id
,
620 "Unit is added to application {} in model {}".format(
621 application_name
, model_name
625 raise JujuApplicationNotFound(
626 "Application {} not exists".format(application_name
)
630 await self
.disconnect_model(model
)
631 await self
.disconnect_controller(controller
)
633 async def destroy_unit(
635 application_name
: str,
638 total_timeout
: float = None,
642 :param: application_name: Application name
643 :param: model_name: Model name
644 :param: machine_id Machine id
645 :param: total_timeout: Timeout for the entity to be active
651 controller
= await self
.get_controller()
653 model
= await self
.get_model(controller
, model_name
)
654 application
= self
._get
_application
(model
, application_name
)
656 if application
is None:
657 raise JujuApplicationNotFound(
658 "Application not found: {} (model={})".format(
659 application_name
, model_name
663 unit
= self
._get
_unit
(application
, machine_id
)
666 "A unit with machine id {} not in available units".format(
671 unit_name
= unit
.name
674 "Destroying unit {} from application {} in model {}".format(
675 unit_name
, application_name
, model_name
678 await application
.destroy_unit(unit_name
)
681 "Waiting for unit {} to be destroyed in application {} (model={})...".format(
682 unit_name
, application_name
, model_name
686 # TODO: Add functionality in the Juju watcher to replace this kind of blocks
687 if total_timeout
is None:
689 end
= time
.time() + total_timeout
690 while time
.time() < end
:
691 if not self
._get
_unit
(application
, machine_id
):
693 "The unit {} was destroyed in application {} (model={}) ".format(
694 unit_name
, application_name
, model_name
698 await asyncio
.sleep(5)
700 "Unit {} is destroyed from application {} in model {}".format(
701 unit_name
, application_name
, model_name
706 await self
.disconnect_model(model
)
707 await self
.disconnect_controller(controller
)
709 async def deploy_charm(
711 application_name
: str,
715 db_dict
: dict = None,
716 progress_timeout
: float = None,
717 total_timeout
: float = None,
724 :param: application_name: Application name
725 :param: path: Local path to the charm
726 :param: model_name: Model name
727 :param: machine_id ID of the machine
728 :param: db_dict: Dictionary with data of the DB to write the updates
729 :param: progress_timeout: Maximum time between two updates in the model
730 :param: total_timeout: Timeout for the entity to be active
731 :param: config: Config for the charm
732 :param: series: Series of the charm
733 :param: num_units: Number of units
735 :return: (juju.application.Application): Juju application
738 "Deploying charm {} to machine {} in model ~{}".format(
739 application_name
, machine_id
, model_name
742 self
.log
.debug("charm: {}".format(path
))
745 controller
= await self
.get_controller()
748 model
= await self
.get_model(controller
, model_name
)
751 if application_name
not in model
.applications
:
753 if machine_id
is not None:
754 machine
, series
= self
._get
_machine
_info
(model
, machine_id
)
756 application
= await model
.deploy(
758 application_name
=application_name
,
767 "Wait until application {} is ready in model {}".format(
768 application_name
, model_name
772 for _
in range(num_units
- 1):
773 m
, _
= await self
.create_machine(model_name
, wait
=False)
774 await application
.add_unit(to
=m
.entity_id
)
776 await JujuModelWatcher
.wait_for(
779 progress_timeout
=progress_timeout
,
780 total_timeout
=total_timeout
,
783 vca_id
=self
.vca_connection
._vca
_id
,
786 "Application {} is ready in model {}".format(
787 application_name
, model_name
791 raise JujuApplicationExists(
792 "Application {} exists".format(application_name
)
794 except juju
.errors
.JujuError
as e
:
795 if "already exists" in e
.message
:
796 raise JujuApplicationExists(
797 "Application {} exists".format(application_name
)
802 await self
.disconnect_model(model
)
803 await self
.disconnect_controller(controller
)
807 async def scale_application(
810 application_name
: str,
812 total_timeout
: float = None,
815 Scale application (K8s)
817 :param: model_name: Model name
818 :param: application_name: Application name
819 :param: scale: Scale to which to set this application
820 :param: total_timeout: Timeout for the entity to be active
824 controller
= await self
.get_controller()
826 model
= await self
.get_model(controller
, model_name
)
829 "Scaling application {} in model {}".format(
830 application_name
, model_name
833 application
= self
._get
_application
(model
, application_name
)
834 if application
is None:
835 raise JujuApplicationNotFound("Cannot scale application")
836 await application
.scale(scale
=scale
)
837 # Wait until application is scaled in model
839 "Waiting for application {} to be scaled in model {}...".format(
840 application_name
, model_name
843 if total_timeout
is None:
845 end
= time
.time() + total_timeout
846 while time
.time() < end
:
847 application_scale
= self
._get
_application
_count
(model
, application_name
)
848 # Before calling wait_for_model function,
849 # wait until application unit count and scale count are equal.
850 # Because there is a delay before scaling triggers in Juju model.
851 if application_scale
== scale
:
852 await JujuModelWatcher
.wait_for_model(
853 model
=model
, timeout
=total_timeout
856 "Application {} is scaled in model {}".format(
857 application_name
, model_name
861 await asyncio
.sleep(5)
863 "Timeout waiting for application {} in model {} to be scaled".format(
864 application_name
, model_name
869 await self
.disconnect_model(model
)
870 await self
.disconnect_controller(controller
)
872 def _get_application_count(self
, model
: Model
, application_name
: str) -> int:
873 """Get number of units of the application
875 :param: model: Model object
876 :param: application_name: Application name
878 :return: int (or None if application doesn't exist)
880 application
= self
._get
_application
(model
, application_name
)
881 if application
is not None:
882 return len(application
.units
)
884 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
887 :param: model: Model object
888 :param: application_name: Application name
890 :return: juju.application.Application (or None if it doesn't exist)
892 if model
.applications
and application_name
in model
.applications
:
893 return model
.applications
[application_name
]
895 def _get_unit(self
, application
: Application
, machine_id
: str) -> Unit
:
898 :param: application: Application object
899 :param: machine_id: Machine id
904 for u
in application
.units
:
905 if u
.machine_id
== machine_id
:
910 def _get_machine_info(
917 :param: model: Model object
918 :param: machine_id: Machine id
920 :return: (str, str): (machine, series)
922 if machine_id
not in model
.machines
:
923 msg
= "Machine {} not found in model".format(machine_id
)
924 self
.log
.error(msg
=msg
)
925 raise JujuMachineNotFound(msg
)
926 machine
= model
.machines
[machine_id
]
927 return machine
, machine
.series
929 async def execute_action(
931 application_name
: str,
934 db_dict
: dict = None,
935 machine_id
: str = None,
936 progress_timeout
: float = None,
937 total_timeout
: float = None,
942 :param: application_name: Application name
943 :param: model_name: Model name
944 :param: action_name: Name of the action
945 :param: db_dict: Dictionary with data of the DB to write the updates
946 :param: machine_id Machine id
947 :param: progress_timeout: Maximum time between two updates in the model
948 :param: total_timeout: Timeout for the entity to be active
950 :return: (str, str): (output and status)
953 "Executing action {} using params {}".format(action_name
, kwargs
)
956 controller
= await self
.get_controller()
959 model
= await self
.get_model(controller
, model_name
)
963 application
= self
._get
_application
(
965 application_name
=application_name
,
967 if application
is None:
968 raise JujuApplicationNotFound("Cannot execute action")
970 # Ocassionally, self._get_leader_unit() will return None
971 # because the leader elected hook has not been triggered yet.
972 # Therefore, we are doing some retries. If it happens again,
974 if machine_id
is None:
975 unit
= await self
._get
_leader
_unit
(application
)
977 "Action {} is being executed on the leader unit {}".format(
978 action_name
, unit
.name
982 unit
= self
._get
_unit
(application
, machine_id
)
985 "A unit with machine id {} not in available units".format(
990 "Action {} is being executed on {} unit".format(
991 action_name
, unit
.name
995 actions
= await application
.get_actions()
997 if action_name
not in actions
:
998 raise JujuActionNotFound(
999 "Action {} not in available actions".format(action_name
)
1002 action
= await unit
.run_action(action_name
, **kwargs
)
1005 "Wait until action {} is completed in application {} (model={})".format(
1006 action_name
, application_name
, model_name
1009 await JujuModelWatcher
.wait_for(
1012 progress_timeout
=progress_timeout
,
1013 total_timeout
=total_timeout
,
1016 vca_id
=self
.vca_connection
._vca
_id
,
1019 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1020 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1022 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
1026 "Action {} completed with status {} in application {} (model={})".format(
1027 action_name
, action
.status
, application_name
, model_name
1031 await self
.disconnect_model(model
)
1032 await self
.disconnect_controller(controller
)
1034 return output
, status
1036 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
1037 """Get list of actions
1039 :param: application_name: Application name
1040 :param: model_name: Model name
1042 :return: Dict with this format
1044 "action_name": "Description of the action",
1049 "Getting list of actions for application {}".format(application_name
)
1053 controller
= await self
.get_controller()
1056 model
= await self
.get_model(controller
, model_name
)
1060 application
= self
._get
_application
(
1062 application_name
=application_name
,
1065 # Return list of actions
1066 return await application
.get_actions()
1069 # Disconnect from model and controller
1070 await self
.disconnect_model(model
)
1071 await self
.disconnect_controller(controller
)
1073 async def get_metrics(self
, model_name
: str, application_name
: str) -> dict:
1074 """Get the metrics collected by the VCA.
1076 :param model_name The name or unique id of the network service
1077 :param application_name The name of the application
1079 if not model_name
or not application_name
:
1080 raise Exception("model_name and application_name must be non-empty strings")
1082 controller
= await self
.get_controller()
1083 model
= await self
.get_model(controller
, model_name
)
1085 application
= self
._get
_application
(model
, application_name
)
1086 if application
is not None:
1087 metrics
= await application
.get_metrics()
1089 self
.disconnect_model(model
)
1090 self
.disconnect_controller(controller
)
1093 async def add_relation(
1101 :param: model_name: Model name
1102 :param: endpoint_1 First endpoint name
1103 ("app:endpoint" format or directly the saas name)
1104 :param: endpoint_2: Second endpoint name (^ same format)
1107 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
1110 controller
= await self
.get_controller()
1113 model
= await self
.get_model(controller
, model_name
)
1117 await model
.add_relation(endpoint_1
, endpoint_2
)
1118 except juju
.errors
.JujuAPIError
as e
:
1119 if "not found" in e
.message
:
1120 self
.log
.warning("Relation not found: {}".format(e
.message
))
1122 if "already exists" in e
.message
:
1123 self
.log
.warning("Relation already exists: {}".format(e
.message
))
1125 # another exception, raise it
1128 await self
.disconnect_model(model
)
1129 await self
.disconnect_controller(controller
)
1137 Adds a remote offer to the model. Relations can be created later using "juju relate".
1139 :param: offer_url: Offer Url
1140 :param: model_name: Model name
1142 :raises ParseError if there's a problem parsing the offer_url
1143 :raises JujuError if remote offer includes and endpoint
1144 :raises JujuAPIError if the operation is not successful
1146 controller
= await self
.get_controller()
1147 model
= await controller
.get_model(model_name
)
1150 await model
.consume(offer_url
)
1152 await self
.disconnect_model(model
)
1153 await self
.disconnect_controller(controller
)
1155 async def destroy_model(self
, model_name
: str, total_timeout
: float = 1800):
1159 :param: model_name: Model name
1160 :param: total_timeout: Timeout
1163 controller
= await self
.get_controller()
1166 if not await self
.model_exists(model_name
, controller
=controller
):
1169 self
.log
.debug("Destroying model {}".format(model_name
))
1171 model
= await self
.get_model(controller
, model_name
)
1172 # Destroy machines that are manually provisioned
1173 # and still are in pending state
1174 await self
._destroy
_pending
_machines
(model
, only_manual
=True)
1175 await self
.disconnect_model(model
)
1177 await self
._destroy
_model
(
1180 timeout
=total_timeout
,
1182 except Exception as e
:
1183 if not await self
.model_exists(model_name
, controller
=controller
):
1188 await self
.disconnect_model(model
)
1189 await self
.disconnect_controller(controller
)
1191 async def _destroy_model(
1192 self
, model_name
: str, controller
: Controller
, timeout
: float = 1800
1195 Destroy model from controller
1197 :param: model: Model name to be removed
1198 :param: controller: Controller object
1199 :param: timeout: Timeout in seconds
1202 async def _destroy_model_loop(model_name
: str, controller
: Controller
):
1203 while await self
.model_exists(model_name
, controller
=controller
):
1204 await controller
.destroy_model(
1205 model_name
, destroy_storage
=True, force
=True, max_wait
=0
1207 await asyncio
.sleep(5)
1210 await asyncio
.wait_for(
1211 _destroy_model_loop(model_name
, controller
), timeout
=timeout
1213 except asyncio
.TimeoutError
:
1215 "Timeout waiting for model {} to be destroyed".format(model_name
)
1217 except juju
.errors
.JujuError
as e
:
1218 if any("has been removed" in error
for error
in e
.errors
):
1222 async def destroy_application(
1223 self
, model_name
: str, application_name
: str, total_timeout
: float
1228 :param: model_name: Model name
1229 :param: application_name: Application name
1230 :param: total_timeout: Timeout
1233 controller
= await self
.get_controller()
1237 model
= await self
.get_model(controller
, model_name
)
1239 "Destroying application {} in model {}".format(
1240 application_name
, model_name
1243 application
= self
._get
_application
(model
, application_name
)
1245 await application
.destroy()
1247 self
.log
.warning("Application not found: {}".format(application_name
))
1250 "Waiting for application {} to be destroyed in model {}...".format(
1251 application_name
, model_name
1254 if total_timeout
is None:
1255 total_timeout
= 3600
1256 end
= time
.time() + total_timeout
1257 while time
.time() < end
:
1258 if not self
._get
_application
(model
, application_name
):
1260 "The application {} was destroyed in model {} ".format(
1261 application_name
, model_name
1265 await asyncio
.sleep(5)
1267 "Timeout waiting for application {} to be destroyed in model {}".format(
1268 application_name
, model_name
1272 if model
is not None:
1273 await self
.disconnect_model(model
)
1274 await self
.disconnect_controller(controller
)
1276 async def _destroy_pending_machines(self
, model
: Model
, only_manual
: bool = False):
1278 Destroy pending machines in a given model
1280 :param: only_manual: Bool that indicates only manually provisioned
1281 machines should be destroyed (if True), or that
1282 all pending machines should be destroyed
1284 status
= await model
.get_status()
1285 for machine_id
in status
.machines
:
1286 machine_status
= status
.machines
[machine_id
]
1287 if machine_status
.agent_status
.status
== "pending":
1288 if only_manual
and not machine_status
.instance_id
.startswith("manual:"):
1290 machine
= model
.machines
[machine_id
]
1291 await machine
.destroy(force
=True)
1293 async def configure_application(
1294 self
, model_name
: str, application_name
: str, config
: dict = None
1296 """Configure application
1298 :param: model_name: Model name
1299 :param: application_name: Application name
1300 :param: config: Config to apply to the charm
1302 self
.log
.debug("Configuring application {}".format(application_name
))
1305 controller
= await self
.get_controller()
1308 model
= await self
.get_model(controller
, model_name
)
1309 application
= self
._get
_application
(
1311 application_name
=application_name
,
1313 await application
.set_config(config
)
1316 await self
.disconnect_model(model
)
1317 await self
.disconnect_controller(controller
)
1319 def handle_exception(self
, loop
, context
):
1320 # All unhandled exceptions by libjuju are handled here.
1323 async def health_check(self
, interval
: float = 300.0):
1325 Health check to make sure controller and controller_model connections are OK
1327 :param: interval: Time in seconds between checks
1332 controller
= await self
.get_controller()
1333 # self.log.debug("VCA is alive")
1334 except Exception as e
:
1335 self
.log
.error("Health check to VCA failed: {}".format(e
))
1337 await self
.disconnect_controller(controller
)
1338 await asyncio
.sleep(interval
)
1340 async def list_models(self
, contains
: str = None) -> [str]:
1341 """List models with certain names
1343 :param: contains: String that is contained in model name
1345 :retur: [models] Returns list of model names
1348 controller
= await self
.get_controller()
1350 models
= await controller
.list_models()
1352 models
= [model
for model
in models
if contains
in model
]
1355 await self
.disconnect_controller(controller
)
1357 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
1358 """List models with certain names
1360 :param: model_name: Model name
1362 :return: Returns list of offers
1365 controller
= await self
.get_controller()
1367 return await controller
.list_offers(model_name
)
1369 await self
.disconnect_controller(controller
)
1376 client_cert_data
: str,
1377 configuration
: Configuration
,
1379 credential_name
: str = None,
1382 Add a Kubernetes cloud to the controller
1384 Similar to the `juju add-k8s` command in the CLI
1386 :param: name: Name for the K8s cloud
1387 :param: configuration: Kubernetes configuration object
1388 :param: storage_class: Storage Class to use in the cloud
1389 :param: credential_name: Storage Class to use in the cloud
1392 if not storage_class
:
1393 raise Exception("storage_class must be a non-empty string")
1395 raise Exception("name must be a non-empty string")
1396 if not configuration
:
1397 raise Exception("configuration must be provided")
1399 endpoint
= configuration
.host
1400 credential
= self
.get_k8s_cloud_credential(
1405 credential
.attrs
[RBAC_LABEL_KEY_NAME
] = rbac_id
1406 cloud
= client
.Cloud(
1408 auth_types
=[credential
.auth_type
],
1410 ca_certificates
=[client_cert_data
],
1412 "operator-storage": storage_class
,
1413 "workload-storage": storage_class
,
1417 return await self
.add_cloud(
1418 name
, cloud
, credential
, credential_name
=credential_name
1421 def get_k8s_cloud_credential(
1423 configuration
: Configuration
,
1424 client_cert_data
: str,
1426 ) -> client
.CloudCredential
:
1428 # TODO: Test with AKS
1429 key
= None # open(configuration.key_file, "r").read()
1430 username
= configuration
.username
1431 password
= configuration
.password
1433 if client_cert_data
:
1434 attrs
["ClientCertificateData"] = client_cert_data
1436 attrs
["ClientKeyData"] = key
1438 if username
or password
:
1439 raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
1440 attrs
["Token"] = token
1444 auth_type
= "oauth2"
1445 if client_cert_data
:
1446 auth_type
= "oauth2withcert"
1448 raise JujuInvalidK8sConfiguration(
1449 "missing token for auth type {}".format(auth_type
)
1454 "credential for user {} has empty password".format(username
)
1456 attrs
["username"] = username
1457 attrs
["password"] = password
1458 if client_cert_data
:
1459 auth_type
= "userpasswithcert"
1461 auth_type
= "userpass"
1462 elif client_cert_data
and token
:
1463 auth_type
= "certificate"
1465 raise JujuInvalidK8sConfiguration("authentication method not supported")
1466 return client
.CloudCredential(auth_type
=auth_type
, attrs
=attrs
)
1468 async def add_cloud(
1472 credential
: CloudCredential
= None,
1473 credential_name
: str = None,
1476 Add cloud to the controller
1478 :param: name: Name of the cloud to be added
1479 :param: cloud: Cloud object
1480 :param: credential: CloudCredentials object for the cloud
1481 :param: credential_name: Credential name.
1482 If not defined, cloud of the name will be used.
1484 controller
= await self
.get_controller()
1486 _
= await controller
.add_cloud(name
, cloud
)
1488 await controller
.add_credential(
1489 credential_name
or name
, credential
=credential
, cloud
=name
1491 # Need to return the object returned by the controller.add_cloud() function
1492 # I'm returning the original value now until this bug is fixed:
1493 # https://github.com/juju/python-libjuju/issues/443
1496 await self
.disconnect_controller(controller
)
1498 async def remove_cloud(self
, name
: str):
1502 :param: name: Name of the cloud to be removed
1504 controller
= await self
.get_controller()
1506 await controller
.remove_cloud(name
)
1507 except juju
.errors
.JujuError
as e
:
1508 if len(e
.errors
) == 1 and f
'cloud "{name}" not found' == e
.errors
[0]:
1509 self
.log
.warning(f
"Cloud {name} not found, so it could not be deleted.")
1513 await self
.disconnect_controller(controller
)
1515 @retry(attempts
=20, delay
=5, fallback
=JujuLeaderUnitNotFound())
1516 async def _get_leader_unit(self
, application
: Application
) -> Unit
:
1518 for u
in application
.units
:
1519 if await u
.is_leader_from_status():
1526 async def get_cloud_credentials(self
, cloud
: Cloud
) -> typing
.List
:
1528 Get cloud credentials
1530 :param: cloud: Cloud object. The returned credentials will be from this cloud.
1532 :return: List of credentials object associated to the specified cloud
1535 controller
= await self
.get_controller()
1537 facade
= client
.CloudFacade
.from_connection(controller
.connection())
1538 cloud_cred_tag
= tag
.credential(
1539 cloud
.name
, self
.vca_connection
.data
.user
, cloud
.credential_name
1541 params
= [client
.Entity(cloud_cred_tag
)]
1542 return (await facade
.Credential(params
)).results
1544 await self
.disconnect_controller(controller
)
1546 async def check_application_exists(self
, model_name
, application_name
) -> bool:
1547 """Check application exists
1549 :param: model_name: Model Name
1550 :param: application_name: Application Name
1556 controller
= await self
.get_controller()
1558 model
= await self
.get_model(controller
, model_name
)
1560 "Checking if application {} exists in model {}".format(
1561 application_name
, model_name
1564 return self
._get
_application
(model
, application_name
) is not None
1567 await self
.disconnect_model(model
)
1568 await self
.disconnect_controller(controller
)