self.state = ModelState(self)
self.info = None
self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=loop)
- self._watch_received = asyncio.Event(loop=loop)
+ self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
async def connect(self, *args, **kw):
args and kw are passed through to Connection.connect()
"""
+ if 'loop' not in kw:
+ kw['loop'] = self.loop
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
"""Connect to the current Juju model.
"""
- self.connection = await connection.Connection.connect_current()
+ self.connection = await connection.Connection.connect_current(
+ self.loop)
await self._after_connect()
async def connect_model(self, model_name):
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(model_name)
+ self.connection = await connection.Connection.connect_model(model_name,
+ self.loop)
await self._after_connect()
async def _after_connect(self):
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(wait_period)
- await asyncio.wait_for(_block(), timeout)
+ await asyncio.sleep(wait_period, loop=self.loop)
+ await asyncio.wait_for(_block(), timeout, loop=self.loop)
@property
def applications(self):
# canceled with it. So we shield them. But this means
# they can *never* be canceled.
await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj))
+ self._notify_observers(delta, old_obj, new_obj),
+ loop=self.loop)
self._watch_received.set()
except CancelledError:
log.debug('Closing watcher connection')
for o in self.observers:
if o.cares_about(delta):
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+ loop=self.loop)
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
# haven't made it yet we'll need to wait on them to be added
await asyncio.gather(*[
asyncio.ensure_future(
- self._wait_for_new('application', app_name))
+ self._wait_for_new('application', app_name),
+ loop=self.loop)
for app_name in pending_apps
- ])
+ ], loop=self.loop)
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
'Deploying %s', entity_id)
if not is_local:
+ parts = entity_id[3:].split('/')
+ if parts[0].startswith('~'):
+ parts.pop(0)
+ if not application_name:
+ application_name = parts[-1].split('-')[0]
+ if not series:
+ if len(parts) > 1:
+ series = parts[0]
+ else:
+ entity = await self.charmstore.entity(entity_id)
+ ss = entity['Meta']['supported-series']
+ series = ss['SupportedSeries'][0]
await client_facade.AddCharm(channel, entity_id)
elif not entity_id.startswith('local:'):
# We have a local charm dir that needs to be uploaded
"""
raise NotImplementedError()
- def get_status(self, filter_=None, utc=False):
+ async def get_status(self, filters=None, utc=False):
"""Return the status of the model.
- :param str filter_: Service or unit name or wildcard ('*')
+ :param str filters: Optional list of applications, units, or machines
+ to include, which can use wildcards ('*').
:param bool utc: Display time as UTC in RFC3339 format
"""
- raise NotImplementedError()
- status = get_status
+ client_facade = client.ClientFacade()
+ client_facade.connect(self.connection)
+ return await client_facade.FullStatus(filters)
def sync_tools(
self, all_=False, destination=None, dry_run=False, public=False,
charm_urls = await asyncio.gather(*[
self.model.add_local_charm_dir(*params)
for params in args
- ])
+ ], loop=self.model.loop)
# Update the 'charm:' entry for each app with the new 'local:' url.
for app_name, charm_url in zip(apps, charm_urls):
bundle['services'][app_name]['charm'] = charm_url