from pathlib import Path
import yaml
-from theblues import charmstore
+import theblues.charmstore
+import theblues.errors
from .client import client
from .client import watcher
self.state = ModelState(self)
self.info = None
self._watcher_task = None
- self._watch_shutdown = asyncio.Event(loop=loop)
- self._watch_received = asyncio.Event(loop=loop)
+ self._watch_shutdown = asyncio.Event(loop=self.loop)
+ self._watch_received = asyncio.Event(loop=self.loop)
self._charmstore = CharmStore(self.loop)
async def connect(self, *args, **kw):
args and kw are passed through to Connection.connect()
"""
+ if 'loop' not in kw:
+ kw['loop'] = self.loop
self.connection = await connection.Connection.connect(*args, **kw)
await self._after_connect()
"""Connect to the current Juju model.
"""
- self.connection = await connection.Connection.connect_current()
+ self.connection = await connection.Connection.connect_current(
+ self.loop)
await self._after_connect()
async def connect_model(self, model_name):
:param model_name: Format [controller:][user/]model
"""
- self.connection = await connection.Connection.connect_model(model_name)
+ self.connection = await connection.Connection.connect_model(model_name,
+ self.loop)
await self._after_connect()
async def _after_connect(self):
"""
async def _block():
while not all(c() for c in conditions):
- await asyncio.sleep(wait_period)
- await asyncio.wait_for(_block(), timeout)
+ await asyncio.sleep(wait_period, loop=self.loop)
+ await asyncio.wait_for(_block(), timeout, loop=self.loop)
@property
def applications(self):
# canceled with it. So we shield them. But this means
# they can *never* be canceled.
await asyncio.shield(
- self._notify_observers(delta, old_obj, new_obj))
+ self._notify_observers(delta, old_obj, new_obj),
+ loop=self.loop)
self._watch_received.set()
except CancelledError:
log.debug('Closing watcher connection')
for o in self.observers:
if o.cares_about(delta):
- asyncio.ensure_future(o(delta, old_obj, new_obj, self))
+ asyncio.ensure_future(o(delta, old_obj, new_obj, self),
+ loop=self.loop)
async def _wait(self, entity_type, entity_id, action, predicate=None):
"""
"""
raise NotImplementedError()
+ def _get_series(self, entity_url, entity):
+ # try to get the series from the provided charm URL
+ if entity_url.startswith('cs:'):
+ parts = entity_url[3:].split('/')
+ else:
+ parts = entity_url.split('/')
+ if parts[0].startswith('~'):
+ parts.pop(0)
+ if len(parts) > 1:
+ # series was specified in the URL
+ return parts[0]
+ # series was not supplied at all, so use the newest
+ # supported series according to the charm store
+ ss = entity['Meta']['supported-series']
+ return ss['SupportedSeries'][0]
+
async def deploy(
self, entity_url, application_name=None, bind=None, budget=None,
channel=None, config=None, constraints=None, force=False,
- series is required; how do we pick a default?
"""
- if to:
- placement = parse_placement(to)
- else:
- placement = []
-
if storage:
storage = {
k: client.Constraints(**v)
entity_url.startswith('local:') or
os.path.isdir(entity_url)
)
- entity_id = await self.charmstore.entityId(entity_url) \
- if not is_local else entity_url
+ if is_local:
+ entity_id = entity_url
+ else:
+ entity = await self.charmstore.entity(entity_url)
+ entity_id = entity['Id']
- app_facade = client.ApplicationFacade()
client_facade = client.ClientFacade()
- app_facade.connect(self.connection)
client_facade.connect(self.connection)
is_bundle = ((is_local and
# haven't made it yet we'll need to wait on them to be added
await asyncio.gather(*[
asyncio.ensure_future(
- self._wait_for_new('application', app_name))
+ self._wait_for_new('application', app_name),
+ loop=self.loop)
for app_name in pending_apps
- ])
+ ], loop=self.loop)
return [app for name, app in self.applications.items()
if name in handler.applications]
else:
- log.debug(
- 'Deploying %s', entity_id)
-
if not is_local:
+ if not application_name:
+ application_name = entity['Meta']['charm-metadata']['Name']
+ if not series:
+ series = self._get_series(entity_url, entity)
+ if not channel:
+ channel = 'stable'
await client_facade.AddCharm(channel, entity_id)
- elif not entity_id.startswith('local:'):
+ else:
# We have a local charm dir that needs to be uploaded
charm_dir = os.path.abspath(
os.path.expanduser(entity_id))
"Pass a 'series' kwarg to Model.deploy().".format(
charm_dir))
entity_id = await self.add_local_charm_dir(charm_dir, series)
-
- app = client.ApplicationDeploy(
- application=application_name,
- channel=channel,
+ return await self._deploy(
charm_url=entity_id,
- config=config,
- constraints=parse_constraints(constraints),
+ application=application_name,
+ series=series,
+ config=config or {},
+ constraints=constraints,
endpoint_bindings=bind,
- num_units=num_units,
resources=resources,
- series=series,
storage=storage,
+ channel=channel,
+ num_units=num_units,
+ placement=parse_placement(to),
)
- app.placement = placement
- result = await app_facade.Deploy([app])
- errors = [r.error.message for r in result.results if r.error]
- if errors:
- raise JujuError('\n'.join(errors))
- return await self._wait_for_new('application', application_name)
+ async def _deploy(self, charm_url, application, series, config,
+ constraints, endpoint_bindings, resources, storage,
+ channel=None, num_units=None, placement=None):
+ """Logic shared between `Model.deploy` and `BundleHandler.deploy`.
+ """
+ log.info('Deploying %s', charm_url)
+
+ # stringify all config values for API, and convert to YAML
+ config = {k: str(v) for k, v in config.items()}
+ config = yaml.dump({application: config},
+ default_flow_style=False)
+
+ app_facade = client.ApplicationFacade()
+ app_facade.connect(self.connection)
+
+ app = client.ApplicationDeploy(
+ charm_url=charm_url,
+ application=application,
+ series=series,
+ channel=channel,
+ config_yaml=config,
+ constraints=parse_constraints(constraints),
+ endpoint_bindings=endpoint_bindings,
+ num_units=num_units,
+ resources=resources,
+ storage=storage,
+ placement=placement,
+ )
+
+ result = await app_facade.Deploy([app])
+ errors = [r.error.message for r in result.results if r.error]
+ if errors:
+ raise JujuError('\n'.join(errors))
+ return await self._wait_for_new('application', application)
def destroy(self):
"""Terminate all machines and resources for this model.
"""
raise NotImplementedError()
- def get_status(self, filter_=None, utc=False):
+ async def get_status(self, filters=None, utc=False):
"""Return the status of the model.
- :param str filter_: Service or unit name or wildcard ('*')
+ :param str filters: Optional list of applications, units, or machines
+ to include, which can use wildcards ('*').
:param bool utc: Display time as UTC in RFC3339 format
"""
- raise NotImplementedError()
- status = get_status
+ client_facade = client.ClientFacade()
+ client_facade.connect(self.connection)
+ return await client_facade.FullStatus(filters)
def sync_tools(
self, all_=False, destination=None, dry_run=False, public=False,
charm_urls = await asyncio.gather(*[
self.model.add_local_charm_dir(*params)
for params in args
- ])
+ ], loop=self.model.loop)
# Update the 'charm:' entry for each app with the new 'local:' url.
for app_name, charm_url in zip(apps, charm_urls):
bundle['services'][app_name]['charm'] = charm_url
self.plan = await self.client_facade.GetBundleChanges(
yaml.dump(self.bundle))
- if self.plan.errors:
- raise JujuError('\n'.join(self.plan.errors))
-
async def execute_plan(self):
for step in self.plan.changes:
method = getattr(self, step.method)
"""
# resolve indirect references
charm = self.resolve(charm)
- # stringify all config values for API
- options = {k: str(v) for k, v in options.items()}
- # build param object
- app = client.ApplicationDeploy(
+ await self.model._deploy(
charm_url=charm,
- series=series,
application=application,
+ series=series,
config=options,
- constraints=parse_constraints(constraints),
- storage=storage,
+ constraints=constraints,
endpoint_bindings=endpoint_bindings,
resources=resources,
+ storage=storage,
)
- # do the do
- log.info('Deploying %s', charm)
- await self.app_facade.Deploy([app])
- # ensure the app is in the model for future operations
- await self.model._wait_for_new('application', application)
return application
async def addUnit(self, application, to):
"""
def __init__(self, loop):
self.loop = loop
- self._cs = charmstore.CharmStore()
+ self._cs = theblues.charmstore.CharmStore(timeout=5)
def __getattr__(self, name):
"""
else:
async def coro(*args, **kwargs):
method = partial(attr, *args, **kwargs)
- return await self.loop.run_in_executor(None, method)
+ for attempt in range(1, 4):
+ try:
+ return await self.loop.run_in_executor(None, method)
+ except theblues.errors.ServerError:
+ if attempt == 3:
+ raise
+ await asyncio.sleep(1, loop=self.loop)
setattr(self, name, coro)
wrapper = coro
return wrapper