RBAC_LABEL_KEY_NAME = "rbac-id"
+@asyncio.coroutine
+def retry_callback(attempt, exc, args, kwargs, delay=0.5, *, loop):
+ # Specifically overridden from upstream implementation so it can
+ # continue to work with Python 3.10
+ yield from asyncio.sleep(attempt * delay)
+ return retry
+
+
class Libjuju:
def __init__(
self,
vca_connection: Connection,
- loop: asyncio.AbstractEventLoop = None,
log: logging.Logger = None,
n2vc: N2VCConnector = None,
):
Constructor
:param: vca_connection: n2vc.vca.connection object
- :param: loop: Asyncio loop
:param: log: Logger
:param: n2vc: N2VC object
"""
self.n2vc = n2vc
self.vca_connection = vca_connection
- self.loop = loop or asyncio.get_event_loop()
- self.loop.set_exception_handler(self.handle_exception)
self.creating_model = asyncio.Lock()
if self.vca_connection.is_default:
self.health_check_task = self._create_health_check_task()
def _create_health_check_task(self):
- return self.loop.create_task(self.health_check())
+ return asyncio.get_event_loop().create_task(self.health_check())
async def get_controller(self, timeout: float = 60.0) -> Controller:
"""
if controller:
await controller.disconnect()
- @retry(attempts=3, delay=5, timeout=None)
+ @retry(attempts=3, delay=5, timeout=None, callback=retry_callback)
async def add_model(self, model_name: str, cloud: VcaCloud):
"""
Create model
await self.disconnect_controller(controller)
return application_configs
- @retry(attempts=3, delay=5)
+ @retry(attempts=3, delay=5, callback=retry_callback)
async def get_model(self, controller: Controller, model_name: str) -> Model:
"""
Get model from controller
await self.disconnect_model(model)
await self.disconnect_controller(controller)
- def handle_exception(self, loop, context):
- # All unhandled exceptions by libjuju are handled here.
- pass
-
async def health_check(self, interval: float = 300.0):
"""
Health check to make sure controller and controller_model connections are OK
finally:
await self.disconnect_controller(controller)
- @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
+ @retry(
+ attempts=20, delay=5, fallback=JujuLeaderUnitNotFound(), callback=retry_callback
+ )
async def _get_leader_unit(self, application: Application) -> Unit:
unit = None
for u in application.units: