X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=55ad086dcbb7acb8b08be49646a3d3079ac2af81;hb=95c05eaf1dcffffe16a86c80d98dcd19f4aeed45;hp=63c306ba14b823e07a4c3531dad4a6f5f0f3bee4;hpb=2a8a35b260385c6a0c0014f0f3e62975aff8b113;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 63c306b..55ad086 100644 --- a/juju/model.py +++ b/juju/model.py @@ -13,7 +13,8 @@ from functools import partial from pathlib import Path import yaml -from theblues import charmstore +import theblues.charmstore +import theblues.errors from .client import client from .client import watcher @@ -376,8 +377,8 @@ class Model(object): self.state = ModelState(self) self.info = None self._watcher_task = None - self._watch_shutdown = asyncio.Event(loop=loop) - self._watch_received = asyncio.Event(loop=loop) + self._watch_shutdown = asyncio.Event(loop=self.loop) + self._watch_received = asyncio.Event(loop=self.loop) self._charmstore = CharmStore(self.loop) async def connect(self, *args, **kw): @@ -386,6 +387,8 @@ class Model(object): args and kw are passed through to Connection.connect() """ + if 'loop' not in kw: + kw['loop'] = self.loop self.connection = await connection.Connection.connect(*args, **kw) await self._after_connect() @@ -393,7 +396,8 @@ class Model(object): """Connect to the current Juju model. """ - self.connection = await connection.Connection.connect_current() + self.connection = await connection.Connection.connect_current( + self.loop) await self._after_connect() async def connect_model(self, model_name): @@ -402,7 +406,8 @@ class Model(object): :param model_name: Format [controller:][user/]model """ - self.connection = await connection.Connection.connect_model(model_name) + self.connection = await connection.Connection.connect_model(model_name, + self.loop) await self._after_connect() async def _after_connect(self): @@ -511,8 +516,8 @@ class Model(object): """ async def _block(): while not all(c() for c in conditions): - await asyncio.sleep(wait_period) - await asyncio.wait_for(_block(), timeout) + await asyncio.sleep(wait_period, loop=self.loop) + await asyncio.wait_for(_block(), timeout, loop=self.loop) @property def applications(self): @@ -623,7 +628,8 @@ class Model(object): # canceled with it. So we shield them. But this means # they can *never* be canceled. await asyncio.shield( - self._notify_observers(delta, old_obj, new_obj)) + self._notify_observers(delta, old_obj, new_obj), + loop=self.loop) self._watch_received.set() except CancelledError: log.debug('Closing watcher connection') @@ -662,7 +668,8 @@ class Model(object): for o in self.observers: if o.cares_about(delta): - asyncio.ensure_future(o(delta, old_obj, new_obj, self)) + asyncio.ensure_future(o(delta, old_obj, new_obj, self), + loop=self.loop) async def _wait(self, entity_type, entity_id, action, predicate=None): """ @@ -928,6 +935,22 @@ class Model(object): """ raise NotImplementedError() + def _get_series(self, entity_url, entity): + # try to get the series from the provided charm URL + if entity_url.startswith('cs:'): + parts = entity_url[3:].split('/') + else: + parts = entity_url.split('/') + if parts[0].startswith('~'): + parts.pop(0) + if len(parts) > 1: + # series was specified in the URL + return parts[0] + # series was not supplied at all, so use the newest + # supported series according to the charm store + ss = entity['Meta']['supported-series'] + return ss['SupportedSeries'][0] + async def deploy( self, entity_url, application_name=None, bind=None, budget=None, channel=None, config=None, constraints=None, force=False, @@ -967,11 +990,6 @@ class Model(object): - series is required; how do we pick a default? """ - if to: - placement = parse_placement(to) - else: - placement = [] - if storage: storage = { k: client.Constraints(**v) @@ -982,12 +1000,13 @@ class Model(object): entity_url.startswith('local:') or os.path.isdir(entity_url) ) - entity_id = await self.charmstore.entityId(entity_url) \ - if not is_local else entity_url + if is_local: + entity_id = entity_url + else: + entity = await self.charmstore.entity(entity_url) + entity_id = entity['Id'] - app_facade = client.ApplicationFacade() client_facade = client.ClientFacade() - app_facade.connect(self.connection) client_facade.connect(self.connection) is_bundle = ((is_local and @@ -1005,18 +1024,22 @@ class Model(object): # haven't made it yet we'll need to wait on them to be added await asyncio.gather(*[ asyncio.ensure_future( - self._wait_for_new('application', app_name)) + self._wait_for_new('application', app_name), + loop=self.loop) for app_name in pending_apps - ]) + ], loop=self.loop) return [app for name, app in self.applications.items() if name in handler.applications] else: - log.debug( - 'Deploying %s', entity_id) - if not is_local: + if not application_name: + application_name = entity['Meta']['charm-metadata']['Name'] + if not series: + series = self._get_series(entity_url, entity) + if not channel: + channel = 'stable' await client_facade.AddCharm(channel, entity_id) - elif not entity_id.startswith('local:'): + else: # We have a local charm dir that needs to be uploaded charm_dir = os.path.abspath( os.path.expanduser(entity_id)) @@ -1027,26 +1050,54 @@ class Model(object): "Pass a 'series' kwarg to Model.deploy().".format( charm_dir)) entity_id = await self.add_local_charm_dir(charm_dir, series) - - app = client.ApplicationDeploy( - application=application_name, - channel=channel, + return await self._deploy( charm_url=entity_id, - config=config, - constraints=parse_constraints(constraints), + application=application_name, + series=series, + config=config or {}, + constraints=constraints, endpoint_bindings=bind, - num_units=num_units, resources=resources, - series=series, storage=storage, + channel=channel, + num_units=num_units, + placement=parse_placement(to), ) - app.placement = placement - result = await app_facade.Deploy([app]) - errors = [r.error.message for r in result.results if r.error] - if errors: - raise JujuError('\n'.join(errors)) - return await self._wait_for_new('application', application_name) + async def _deploy(self, charm_url, application, series, config, + constraints, endpoint_bindings, resources, storage, + channel=None, num_units=None, placement=None): + """Logic shared between `Model.deploy` and `BundleHandler.deploy`. + """ + log.info('Deploying %s', charm_url) + + # stringify all config values for API, and convert to YAML + config = {k: str(v) for k, v in config.items()} + config = yaml.dump({application: config}, + default_flow_style=False) + + app_facade = client.ApplicationFacade() + app_facade.connect(self.connection) + + app = client.ApplicationDeploy( + charm_url=charm_url, + application=application, + series=series, + channel=channel, + config_yaml=config, + constraints=parse_constraints(constraints), + endpoint_bindings=endpoint_bindings, + num_units=num_units, + resources=resources, + storage=storage, + placement=placement, + ) + + result = await app_facade.Deploy([app]) + errors = [r.error.message for r in result.results if r.error] + if errors: + raise JujuError('\n'.join(errors)) + return await self._wait_for_new('application', application) def destroy(self): """Terminate all machines and resources for this model. @@ -1493,7 +1544,7 @@ class BundleHandler(object): charm_urls = await asyncio.gather(*[ self.model.add_local_charm_dir(*params) for params in args - ]) + ], loop=self.model.loop) # Update the 'charm:' entry for each app with the new 'local:' url. for app_name, charm_url in zip(apps, charm_urls): bundle['services'][app_name]['charm'] = charm_url @@ -1514,9 +1565,6 @@ class BundleHandler(object): self.plan = await self.client_facade.GetBundleChanges( yaml.dump(self.bundle)) - if self.plan.errors: - raise JujuError('\n'.join(self.plan.errors)) - async def execute_plan(self): for step in self.plan.changes: method = getattr(self, step.method) @@ -1652,28 +1700,16 @@ class BundleHandler(object): """ # resolve indirect references charm = self.resolve(charm) - # stringify all config values for API, and convert to YAML - options = {k: str(v) for k, v in options.items()} - options = yaml.dump({application: options}, default_flow_style=False) - # build param object - app = client.ApplicationDeploy( + await self.model._deploy( charm_url=charm, - series=series, application=application, - # Pass options to config-yaml rather than config, as - # config-yaml invokes a newer codepath that better handles - # empty strings in the options values. - config_yaml=options, - constraints=parse_constraints(constraints), - storage=storage, + series=series, + config=options, + constraints=constraints, endpoint_bindings=endpoint_bindings, resources=resources, + storage=storage, ) - # do the do - log.info('Deploying %s', charm) - await self.app_facade.Deploy([app]) - # ensure the app is in the model for future operations - await self.model._wait_for_new('application', application) return application async def addUnit(self, application, to): @@ -1741,7 +1777,7 @@ class CharmStore(object): """ def __init__(self, loop): self.loop = loop - self._cs = charmstore.CharmStore() + self._cs = theblues.charmstore.CharmStore(timeout=5) def __getattr__(self, name): """ @@ -1755,7 +1791,13 @@ class CharmStore(object): else: async def coro(*args, **kwargs): method = partial(attr, *args, **kwargs) - return await self.loop.run_in_executor(None, method) + for attempt in range(1, 4): + try: + return await self.loop.run_in_executor(None, method) + except theblues.errors.ServerError: + if attempt == 3: + raise + await asyncio.sleep(1, loop=self.loop) setattr(self, name, coro) wrapper = coro return wrapper