import websockets
from http.client import HTTPSConnection
+import asyncio
import yaml
from juju import tag
"""
def __init__(
self, endpoint, uuid, username, password, cacert=None,
- macaroons=None):
+ macaroons=None, loop=None):
self.endpoint = endpoint
self.uuid = uuid
self.username = username
self.password = password
self.macaroons = macaroons
self.cacert = cacert
+ self.loop = loop or asyncio.get_event_loop()
self.__request_id__ = 0
self.addr = None
kw = dict()
kw['ssl'] = self._get_ssl(self.cacert)
+ kw['loop'] = self.loop
self.addr = url
self.ws = await websockets.connect(url, **kw)
log.info("Driver connected to juju %s", url)
self.password,
self.cacert,
self.macaroons,
+ self.loop,
)
async def controller(self):
self.password,
self.cacert,
self.macaroons,
+ self.loop,
)
@classmethod
async def connect(
cls, endpoint, uuid, username, password, cacert=None,
- macaroons=None):
+ macaroons=None, loop=None):
"""Connect to the websocket.
If uuid is None, the connection will be to the controller. Otherwise it
will be to the model.
"""
- client = cls(endpoint, uuid, username, password, cacert, macaroons)
+ client = cls(endpoint, uuid, username, password, cacert, macaroons,
+ loop)
await client.open()
redirect_info = await client.redirect_info()
"Couldn't authenticate to %s", endpoint)
@classmethod
- async def connect_current(cls):
+ async def connect_current(cls, loop=None):
"""Connect to the currently active model.
"""
model_name = models['current-model']
return await cls.connect_model(
- '{}:{}'.format(controller_name, model_name))
+ '{}:{}'.format(controller_name, model_name), loop)
@classmethod
- async def connect_current_controller(cls):
+ async def connect_current_controller(cls, loop=None):
"""Connect to the currently active controller.
"""
if not controller_name:
raise JujuConnectionError('No current controller')
- return await cls.connect_controller(controller_name)
+ return await cls.connect_controller(controller_name, loop)
@classmethod
- async def connect_controller(cls, controller_name):
+ async def connect_controller(cls, controller_name, loop=None):
"""Connect to a controller by name.
"""
macaroons = get_macaroons() if not password else None
return await cls.connect(
- endpoint, None, username, password, cacert, macaroons)
+ endpoint, None, username, password, cacert, macaroons, loop)
@classmethod
- async def connect_model(cls, model):
+ async def connect_model(cls, model, loop=None):
"""Connect to a model by name.
:param str model: <controller>:<model>
macaroons = get_macaroons() if not password else None
return await cls.connect(
- endpoint, model_uuid, username, password, cacert, macaroons)
+ endpoint, model_uuid, username, password, cacert, macaroons, loop)
def build_facades(self, info):
self.facades.clear()
try:
ssh_key = await utils.read_ssh_key(loop=self.loop)
await utils.execute_process(
- 'juju', 'add-ssh-key', '-m', model_name, ssh_key, log=log)
- except Exception as e:
+ 'juju', 'add-ssh-key', '-m', model_name, ssh_key, log=log,
+ loop=self.loop)
+ except Exception:
log.exception(
"Could not add ssh key to model. You will not be able "
"to ssh into machines in this model. "
self.connection.password,
self.connection.cacert,
self.connection.macaroons,
+ loop=self.loop,
)
return model
self.state = ModelState(self)
self.info = None
self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=loop)
- self._watch_received = asyncio.Event(loop=loop)
+ self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
async def connect(self, *args, **kw):
args and kw are passed through to Connection.connect()
"""
+ if 'loop' not in kw:
+ kw['loop'] = self.loop
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
"""Connect to the current Juju model.
"""
- self.connection = await connection.Connection.connect_current()
+ self.connection = await connection.Connection.connect_current(
+ self.loop)
await self._after_connect()
async def connect_model(self, model_name):
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(model_name)
+ self.connection = await connection.Connection.connect_model(model_name,
+ self.loop)
await self._after_connect()
async def _after_connect(self):
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(wait_period)
- await asyncio.wait_for(_block(), timeout)
+ await asyncio.sleep(wait_period, loop=self.loop)
+ await asyncio.wait_for(_block(), timeout, loop=self.loop)
@property
def applications(self):
# canceled with it. So we shield them. But this means
# they can *never* be canceled.
await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj))
+ self._notify_observers(delta, old_obj, new_obj),
+ loop=self.loop)
self._watch_received.set()
except CancelledError:
log.debug('Closing watcher connection')
for o in self.observers:
if o.cares_about(delta):
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+ loop=self.loop)
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
# haven't made it yet we'll need to wait on them to be added
await asyncio.gather(*[
asyncio.ensure_future(
- self._wait_for_new('application', app_name))
+ self._wait_for_new('application', app_name),
+ loop=self.loop)
for app_name in pending_apps
- ])
+ ], loop=self.loop)
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
charm_urls = await asyncio.gather(*[
self.model.add_local_charm_dir(*params)
for params in args
- ])
+ ], loop=self.loop)
# Update the 'charm:' entry for each app with the new 'local:' url.
for app_name, charm_url in zip(apps, charm_urls):
bundle['services'][app_name]['charm'] = charm_url
from pathlib import Path
-async def execute_process(*cmd, log=None):
+async def execute_process(*cmd, log=None, loop=None):
'''
Wrapper around asyncio.create_subprocess_exec.
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
- )
+ loop=loop)
stdout, stderr = await p.communicate()
if log:
log.debug("Exec %s -> %d", cmd, p.returncode)