There were several places where the default event loop was used instead
of the given event loop.
import websockets
from http.client import HTTPSConnection
import websockets
from http.client import HTTPSConnection
import yaml
from juju import tag
import yaml
from juju import tag
"""
def __init__(
self, endpoint, uuid, username, password, cacert=None,
"""
def __init__(
self, endpoint, uuid, username, password, cacert=None,
+ macaroons=None, loop=None):
self.endpoint = endpoint
self.uuid = uuid
self.username = username
self.password = password
self.macaroons = macaroons
self.cacert = cacert
self.endpoint = endpoint
self.uuid = uuid
self.username = username
self.password = password
self.macaroons = macaroons
self.cacert = cacert
+ self.loop = loop or asyncio.get_event_loop()
self.__request_id__ = 0
self.addr = None
self.__request_id__ = 0
self.addr = None
kw = dict()
kw['ssl'] = self._get_ssl(self.cacert)
kw = dict()
kw['ssl'] = self._get_ssl(self.cacert)
self.addr = url
self.ws = await websockets.connect(url, **kw)
log.info("Driver connected to juju %s", url)
self.addr = url
self.ws = await websockets.connect(url, **kw)
log.info("Driver connected to juju %s", url)
self.password,
self.cacert,
self.macaroons,
self.password,
self.cacert,
self.macaroons,
)
async def controller(self):
)
async def controller(self):
self.password,
self.cacert,
self.macaroons,
self.password,
self.cacert,
self.macaroons,
)
@classmethod
async def connect(
cls, endpoint, uuid, username, password, cacert=None,
)
@classmethod
async def connect(
cls, endpoint, uuid, username, password, cacert=None,
+ macaroons=None, loop=None):
"""Connect to the websocket.
If uuid is None, the connection will be to the controller. Otherwise it
will be to the model.
"""
"""Connect to the websocket.
If uuid is None, the connection will be to the controller. Otherwise it
will be to the model.
"""
- client = cls(endpoint, uuid, username, password, cacert, macaroons)
+ client = cls(endpoint, uuid, username, password, cacert, macaroons,
+ loop)
await client.open()
redirect_info = await client.redirect_info()
await client.open()
redirect_info = await client.redirect_info()
"Couldn't authenticate to %s", endpoint)
@classmethod
"Couldn't authenticate to %s", endpoint)
@classmethod
- async def connect_current(cls):
+ async def connect_current(cls, loop=None):
"""Connect to the currently active model.
"""
"""Connect to the currently active model.
"""
model_name = models['current-model']
return await cls.connect_model(
model_name = models['current-model']
return await cls.connect_model(
- '{}:{}'.format(controller_name, model_name))
+ '{}:{}'.format(controller_name, model_name), loop)
- async def connect_current_controller(cls):
+ async def connect_current_controller(cls, loop=None):
"""Connect to the currently active controller.
"""
"""Connect to the currently active controller.
"""
if not controller_name:
raise JujuConnectionError('No current controller')
if not controller_name:
raise JujuConnectionError('No current controller')
- return await cls.connect_controller(controller_name)
+ return await cls.connect_controller(controller_name, loop)
- async def connect_controller(cls, controller_name):
+ async def connect_controller(cls, controller_name, loop=None):
"""Connect to a controller by name.
"""
"""Connect to a controller by name.
"""
macaroons = get_macaroons() if not password else None
return await cls.connect(
macaroons = get_macaroons() if not password else None
return await cls.connect(
- endpoint, None, username, password, cacert, macaroons)
+ endpoint, None, username, password, cacert, macaroons, loop)
- async def connect_model(cls, model):
+ async def connect_model(cls, model, loop=None):
"""Connect to a model by name.
:param str model: <controller>:<model>
"""Connect to a model by name.
:param str model: <controller>:<model>
macaroons = get_macaroons() if not password else None
return await cls.connect(
macaroons = get_macaroons() if not password else None
return await cls.connect(
- endpoint, model_uuid, username, password, cacert, macaroons)
+ endpoint, model_uuid, username, password, cacert, macaroons, loop)
def build_facades(self, info):
self.facades.clear()
def build_facades(self, info):
self.facades.clear()
try:
ssh_key = await utils.read_ssh_key(loop=self.loop)
await utils.execute_process(
try:
ssh_key = await utils.read_ssh_key(loop=self.loop)
await utils.execute_process(
- 'juju', 'add-ssh-key', '-m', model_name, ssh_key, log=log)
- except Exception as e:
+ 'juju', 'add-ssh-key', '-m', model_name, ssh_key, log=log,
+ loop=self.loop)
+ except Exception:
log.exception(
"Could not add ssh key to model. You will not be able "
"to ssh into machines in this model. "
log.exception(
"Could not add ssh key to model. You will not be able "
"to ssh into machines in this model. "
self.connection.password,
self.connection.cacert,
self.connection.macaroons,
self.connection.password,
self.connection.cacert,
self.connection.macaroons,
self.state = ModelState(self)
self.info = None
self._watcher_task = None
self.state = ModelState(self)
self.info = None
self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=loop)
- self._watch_received = asyncio.Event(loop=loop)
+ self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
async def connect(self, *args, **kw):
self._charmstore = CharmStore(self.loop)
async def connect(self, *args, **kw):
args and kw are passed through to Connection.connect()
"""
args and kw are passed through to Connection.connect()
"""
+ if 'loop' not in kw:
+ kw['loop'] = self.loop
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
"""Connect to the current Juju model.
"""
"""Connect to the current Juju model.
"""
- self.connection = await connection.Connection.connect_current()
+ self.connection = await connection.Connection.connect_current(
+ self.loop)
await self._after_connect()
async def connect_model(self, model_name):
await self._after_connect()
async def connect_model(self, model_name):
:param model_name: Format [controller:][user/]model
"""
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(model_name)
+ self.connection = await connection.Connection.connect_model(model_name,
+ self.loop)
await self._after_connect()
async def _after_connect(self):
await self._after_connect()
async def _after_connect(self):
"""
async def _block():
while not all(c() for c in conditions):
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(wait_period)
- await asyncio.wait_for(_block(), timeout)
+ await asyncio.sleep(wait_period, loop=self.loop)
+ await asyncio.wait_for(_block(), timeout, loop=self.loop)
@property
def applications(self):
@property
def applications(self):
# canceled with it. So we shield them. But this means
# they can *never* be canceled.
await asyncio.shield(
# canceled with it. So we shield them. But this means
# they can *never* be canceled.
await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj))
+ self._notify_observers(delta, old_obj, new_obj),
+ loop=self.loop)
self._watch_received.set()
except CancelledError:
log.debug('Closing watcher connection')
self._watch_received.set()
except CancelledError:
log.debug('Closing watcher connection')
for o in self.observers:
if o.cares_about(delta):
for o in self.observers:
if o.cares_about(delta):
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+ loop=self.loop)
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
# haven't made it yet we'll need to wait on them to be added
await asyncio.gather(*[
asyncio.ensure_future(
# haven't made it yet we'll need to wait on them to be added
await asyncio.gather(*[
asyncio.ensure_future(
- self._wait_for_new('application', app_name))
+ self._wait_for_new('application', app_name),
+ loop=self.loop)
for app_name in pending_apps
for app_name in pending_apps
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
charm_urls = await asyncio.gather(*[
self.model.add_local_charm_dir(*params)
for params in args
charm_urls = await asyncio.gather(*[
self.model.add_local_charm_dir(*params)
for params in args
# Update the 'charm:' entry for each app with the new 'local:' url.
for app_name, charm_url in zip(apps, charm_urls):
bundle['services'][app_name]['charm'] = charm_url
# Update the 'charm:' entry for each app with the new 'local:' url.
for app_name, charm_url in zip(apps, charm_urls):
bundle['services'][app_name]['charm'] = charm_url
-async def execute_process(*cmd, log=None):
+async def execute_process(*cmd, log=None, loop=None):
'''
Wrapper around asyncio.create_subprocess_exec.
'''
Wrapper around asyncio.create_subprocess_exec.
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdout, stderr = await p.communicate()
if log:
log.debug("Exec %s -> %d", cmd, p.returncode)
stdout, stderr = await p.communicate()
if log:
log.debug("Exec %s -> %d", cmd, p.returncode)