from juju.model import Model
from juju.machine import Machine
from juju.application import Application
+from juju.unit import Unit
from juju.client._definitions import (
FullStatus,
QueryApplicationOffersResults,
if application is None:
raise JujuApplicationNotFound("Cannot execute action")
- # Get unit
+ # Get leader unit
+ # Racing condition:
+ # Ocassionally, self._get_leader_unit() will return None
+ # because the leader elected hook has not been triggered yet.
+ # Therefore, we are doing some retries. If it happens again,
+ # re-open bug 1236
+ attempts = 3
+ time_between_retries = 10
unit = None
- for u in application.units:
- if await u.is_leader_from_status():
- unit = u
+ for _ in range(attempts):
+ unit = await self._get_leader_unit(application)
+ if unit is None:
+ await asyncio.sleep(time_between_retries)
+ else:
+ break
if unit is None:
raise JujuLeaderUnitNotFound(
"Cannot execute action: leader unit not found"
await self.disconnect_controller(controller)
async def add_k8s(
- self, name: str, configuration: Configuration, storage_class: str
+ self,
+ name: str,
+ configuration: Configuration,
+ storage_class: str,
+ credential_name: str = None,
):
"""
Add a Kubernetes cloud to the controller
Similar to the `juju add-k8s` command in the CLI
- :param: name: Name for the K8s cloud
- :param: configuration: Kubernetes configuration object
- :param: storage_class: Storage Class to use in the cloud
+ :param: name: Name for the K8s cloud
+ :param: configuration: Kubernetes configuration object
+ :param: storage_class: Storage Class to use in the cloud
+ :param: credential_name: Storage Class to use in the cloud
"""
if not storage_class:
},
)
- return await self.add_cloud(name, cloud, credential)
+ return await self.add_cloud(
+ name, cloud, credential, credential_name=credential_name
+ )
def get_k8s_cloud_credential(
self, configuration: Configuration,
return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
async def add_cloud(
- self, name: str, cloud: Cloud, credential: CloudCredential = None
+ self,
+ name: str,
+ cloud: Cloud,
+ credential: CloudCredential = None,
+ credential_name: str = None,
) -> Cloud:
"""
Add cloud to the controller
- :param: name: Name of the cloud to be added
- :param: cloud: Cloud object
- :param: credential: CloudCredentials object for the cloud
+ :param: name: Name of the cloud to be added
+ :param: cloud: Cloud object
+ :param: credential: CloudCredentials object for the cloud
+ :param: credential_name: Credential name.
+ If not defined, cloud of the name will be used.
"""
controller = await self.get_controller()
try:
_ = await controller.add_cloud(name, cloud)
if credential:
- await controller.add_credential(name, credential=credential, cloud=name)
+ await controller.add_credential(
+ credential_name or name, credential=credential, cloud=name
+ )
# Need to return the object returned by the controller.add_cloud() function
# I'm returning the original value now until this bug is fixed:
# https://github.com/juju/python-libjuju/issues/443
await controller.remove_cloud(name)
finally:
await self.disconnect_controller(controller)
+
+ async def _get_leader_unit(self, application: Application) -> Unit:
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ break
+ return unit