import theblues.charmstore
import theblues.errors
-from . import tag
+from . import tag, utils
from .client import client
-from .client import watcher
from .client import connection
from .constraints import parse as parse_constraints, normalize_key
from .delta import get_entity_delta
self.observers = weakref.WeakValueDictionary()
self.state = ModelState(self)
self.info = None
- self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_stopping = asyncio.Event(loop=self.loop)
+ self._watch_stopped = asyncio.Event(loop=self.loop)
self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
"""Shut down the watcher task and close websockets.
"""
- self._stop_watching()
if self.connection and self.connection.is_open:
- await self._watch_shutdown.wait()
+ log.debug('Stopping watcher task')
+ self._watch_stopping.set()
+ await self._watch_stopped.wait()
log.debug('Closing model connection')
await self.connection.close()
self.connection = None
explicit call to this method.
"""
- facade = client.ClientFacade()
- facade.connect(self.connection)
+ facade = client.ClientFacade.from_connection(self.connection)
self.info = await facade.ModelInfo()
log.debug('Got ModelInfo: %s', vars(self.info))
"""
async def _start_watch():
- self._watch_shutdown.clear()
try:
- allwatcher = watcher.AllWatcher()
- self._watch_conn = await self.connection.clone()
- allwatcher.connect(self._watch_conn)
- while True:
- results = await allwatcher.Next()
+ allwatcher = client.AllWatcherFacade.from_connection(
+ self.connection)
+ while not self._watch_stopping.is_set():
+ results = await utils.run_with_interrupt(
+ allwatcher.Next(),
+ self._watch_stopping,
+ self.loop)
+ if self._watch_stopping.is_set():
+ break
for delta in results.deltas:
delta = get_entity_delta(delta)
old_obj, new_obj = self.state.apply_delta(delta)
- # XXX: Might not want to shield at this level
- # We are shielding because when the watcher is
- # canceled (on disconnect()), we don't want all of
- # its children (every observer callback) to be
- # canceled with it. So we shield them. But this means
- # they can *never* be canceled.
- await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj),
- loop=self.loop)
+ await self._notify_observers(delta, old_obj, new_obj)
self._watch_received.set()
except CancelledError:
- log.debug('Closing watcher connection')
- await self._watch_conn.close()
- self._watch_shutdown.set()
- self._watch_conn = None
+ pass
+ except Exception:
+ log.exception('Error in watcher')
+ raise
+ finally:
+ self._watch_stopped.set()
log.debug('Starting watcher task')
- self._watcher_task = self.loop.create_task(_start_watch())
-
- def _stop_watching(self):
- """Stop the asynchronous watch against this model.
-
- """
- log.debug('Stopping watcher task')
- if self._watcher_task:
- self._watcher_task.cancel()
+ self._watch_received.clear()
+ self._watch_stopping.clear()
+ self._watch_stopped.clear()
+ self.loop.create_task(_start_watch())
async def _notify_observers(self, delta, old_obj, new_obj):
"""Call observing callbacks, notifying them of a change in model state
params.series = series
# Submit the request.
- client_facade = client.ClientFacade()
- client_facade.connect(self.connection)
+ client_facade = client.ClientFacade.from_connection(self.connection)
results = await client_facade.AddMachines([params])
error = results.machines[0].error
if error:
- raise ValueError("Error adding machine: %s", error.message)
+ raise ValueError("Error adding machine: %s" % error.message)
machine_id = results.machines[0].machine
log.debug('Added new machine %s', machine_id)
return await self._wait_for_new('machine', machine_id)
:param str relation2: '<application>[:<relation_name>]'
"""
- app_facade = client.ApplicationFacade()
- app_facade.connect(self.connection)
+ app_facade = client.ApplicationFacade.from_connection(self.connection)
log.debug(
'Adding relation %s <-> %s', relation1, relation2)
:param str key: The public ssh key
"""
- key_facade = client.KeyManagerFacade()
- key_facade.connect(self.connection)
+ key_facade = client.KeyManagerFacade.from_connection(self.connection)
return await key_facade.AddKeys([key], user)
add_ssh_keys = add_ssh_key
entity = await self.charmstore.entity(entity_url)
entity_id = entity['Id']
- client_facade = client.ClientFacade()
- client_facade.connect(self.connection)
+ client_facade = client.ClientFacade.from_connection(self.connection)
is_bundle = ((is_local and
(Path(entity_id) / 'bundle.yaml').exists()) or
if not resources:
return None
- resources_facade = client.ResourcesFacade()
- resources_facade.connect(self.connection)
+ resources_facade = client.ResourcesFacade.from_connection(
+ self.connection)
response = await resources_facade.AddPendingResources(
tag.application(application),
entity_url,
config = yaml.dump({application: config},
default_flow_style=False)
- app_facade = client.ApplicationFacade()
- app_facade.connect(self.connection)
+ app_facade = client.ApplicationFacade.from_connection(
+ self.connection)
app = client.ApplicationDeploy(
charm_url=charm_url,
"""Destroy units by name.
"""
- app_facade = client.ApplicationFacade()
- app_facade.connect(self.connection)
+ app_facade = client.ApplicationFacade.from_connection(self.connection)
log.debug(
'Destroying unit%s %s',
:param str acl: Access control ('read' or 'write')
"""
- model_facade = client.ModelManagerFacade()
controller_conn = await self.connection.controller()
- model_facade.connect(controller_conn)
+ model_facade = client.ModelManagerFacade.from_connection(
+ controller_conn)
user = tag.user(username)
model = tag.model(self.info.uuid)
changes = client.ModifyModelAccess(acl, 'grant', model, user)
async def get_ssh_key(self, raw_ssh=False):
"""Return known SSH keys for this model.
- :param bool raw_ssh: if True, returns the raw ssh key, else it's fingerprint
+ :param bool raw_ssh: if True, returns the raw ssh key,
+ else it's fingerprint
"""
- key_facade = client.KeyManagerFacade()
- key_facade.connect(self.connection)
+ key_facade = client.KeyManagerFacade.from_connection(self.connection)
entity = {'tag': tag.model(self.info.uuid)}
entities = client.Entities([entity])
return await key_facade.ListKeys(entities, raw_ssh)
:param str user: Juju user to which the key is registered
"""
- key_facade = client.KeyManagerFacade()
- key_facade.connect(self.connection)
+ key_facade = client.KeyManagerFacade.from_connection(self.connection)
key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii')))
key = hashlib.md5(key).hexdigest()
key = ':'.join(a+b for a, b in zip(key[::2], key[1::2]))
:param str username: Username to revoke
"""
- model_facade = client.ModelManagerFacade()
controller_conn = await self.connection.controller()
- model_facade.connect(controller_conn)
+ model_facade = client.ModelManagerFacade.from_connection(
+ controller_conn)
user = tag.user(username)
model = tag.model(self.info.uuid)
changes = client.ModifyModelAccess('read', 'revoke', model, user)
:param bool utc: Display time as UTC in RFC3339 format
"""
- client_facade = client.ClientFacade()
- client_facade.connect(self.connection)
+ client_facade = client.ClientFacade.from_connection(self.connection)
return await client_facade.FullStatus(filters)
def sync_tools(
log.debug("Retrieving metrics for %s",
', '.join(tags) if tags else "all units")
- metrics_facade = client.MetricsDebugFacade()
- metrics_facade.connect(self.connection)
+ metrics_facade = client.MetricsDebugFacade.from_connection(
+ self.connection)
entities = [client.Entity(tag) for tag in tags]
metrics_result = await metrics_facade.GetMetrics(entities)
for unit_name, unit in model.units.items():
app_units = self._units_by_app.setdefault(unit.application, [])
app_units.append(unit_name)
- self.client_facade = client.ClientFacade()
- self.client_facade.connect(model.connection)
- self.app_facade = client.ApplicationFacade()
- self.app_facade.connect(model.connection)
- self.ann_facade = client.AnnotationsFacade()
- self.ann_facade.connect(model.connection)
+ self.client_facade = client.ClientFacade.from_connection(
+ model.connection)
+ self.app_facade = client.ApplicationFacade.from_connection(
+ model.connection)
+ self.ann_facade = client.AnnotationsFacade.from_connection(
+ model.connection)
async def _handle_local_charms(self, bundle):
"""Search for references to local charms (i.e. filesystem paths)
results = await self.client_facade.AddMachines([params])
error = results.machines[0].error
if error:
- raise ValueError("Error adding machine: %s", error.message)
+ raise ValueError("Error adding machine: %s" % error.message)
machine = results.machines[0].machine
log.debug('Added new machine %s', machine)
return machine