X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Fmodel.py;h=f162c7e8565592b66c28d7ae4d0ea18b66b59496;hb=1a3cee44420e79fda92943edf636eaddb393145e;hp=91aba2593ebc6795a3378f3dccaadeea82deef17;hpb=f1141c79947f1ba335c697be546761a1a1fc43b3;p=osm%2FN2VC.git diff --git a/juju/model.py b/juju/model.py index 91aba25..f162c7e 100644 --- a/juju/model.py +++ b/juju/model.py @@ -1,5 +1,7 @@ import asyncio +import base64 import collections +import hashlib import json import logging import os @@ -13,10 +15,11 @@ from functools import partial from pathlib import Path import yaml -from theblues import charmstore +import theblues.charmstore +import theblues.errors +from . import tag from .client import client -from .client import watcher from .client import connection from .constraints import parse as parse_constraints, normalize_key from .delta import get_entity_delta @@ -229,7 +232,14 @@ class ModelEntity(object): model. """ - return self.safe_data[name] + try: + return self.safe_data[name] + except KeyError: + name = name.replace('_', '-') + if name in self.safe_data: + return self.safe_data[name] + else: + raise def __bool__(self): return bool(self.data) @@ -555,8 +565,7 @@ class Model(object): explicit call to this method. """ - facade = client.ClientFacade() - facade.connect(self.connection) + facade = client.ClientFacade.from_connection(self.connection) self.info = await facade.ModelInfo() log.debug('Got ModelInfo: %s', vars(self.info)) @@ -612,9 +621,9 @@ class Model(object): async def _start_watch(): self._watch_shutdown.clear() try: - allwatcher = watcher.AllWatcher() self._watch_conn = await self.connection.clone() - allwatcher.connect(self._watch_conn) + allwatcher = client.AllWatcherFacade.from_connection( + self._watch_conn) while True: results = await allwatcher.Next() for delta in results.deltas: @@ -635,6 +644,9 @@ class Model(object): await self._watch_conn.close() self._watch_shutdown.set() self._watch_conn = None + except Exception as e: + log.exception('Error in watcher') + raise log.debug('Starting watcher task') self._watcher_task = self.loop.create_task(_start_watch()) @@ -786,12 +798,11 @@ class Model(object): params.series = series # Submit the request. - client_facade = client.ClientFacade() - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) results = await client_facade.AddMachines([params]) error = results.machines[0].error if error: - raise ValueError("Error adding machine: %s", error.message) + raise ValueError("Error adding machine: %s" % error.message) machine_id = results.machines[0].machine log.debug('Added new machine %s', machine_id) return await self._wait_for_new('machine', machine_id) @@ -803,8 +814,7 @@ class Model(object): :param str relation2: '[:]' """ - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection) log.debug( 'Adding relation %s <-> %s', relation1, relation2) @@ -840,13 +850,15 @@ class Model(object): """ raise NotImplementedError() - def add_ssh_key(self, key): + async def add_ssh_key(self, user, key): """Add a public SSH key to this model. + :param str user: The username of the user :param str key: The public ssh key """ - raise NotImplementedError() + key_facade = client.KeyManagerFacade.from_connection(self.connection) + return await key_facade.AddKeys([key], user) add_ssh_keys = add_ssh_key def add_subnet(self, cidr_or_id, space, *zones): @@ -934,6 +946,22 @@ class Model(object): """ raise NotImplementedError() + def _get_series(self, entity_url, entity): + # try to get the series from the provided charm URL + if entity_url.startswith('cs:'): + parts = entity_url[3:].split('/') + else: + parts = entity_url.split('/') + if parts[0].startswith('~'): + parts.pop(0) + if len(parts) > 1: + # series was specified in the URL + return parts[0] + # series was not supplied at all, so use the newest + # supported series according to the charm store + ss = entity['Meta']['supported-series'] + return ss['SupportedSeries'][0] + async def deploy( self, entity_url, application_name=None, bind=None, budget=None, channel=None, config=None, constraints=None, force=False, @@ -968,16 +996,9 @@ class Model(object): TODO:: - - application_name is required; fill this in automatically if not - provided by caller - - series is required; how do we pick a default? + - support local resources """ - if to: - placement = parse_placement(to) - else: - placement = [] - if storage: storage = { k: client.Constraints(**v) @@ -988,13 +1009,13 @@ class Model(object): entity_url.startswith('local:') or os.path.isdir(entity_url) ) - entity_id = await self.charmstore.entityId(entity_url) \ - if not is_local else entity_url + if is_local: + entity_id = entity_url + else: + entity = await self.charmstore.entity(entity_url) + entity_id = entity['Id'] - app_facade = client.ApplicationFacade() - client_facade = client.ClientFacade() - app_facade.connect(self.connection) - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) is_bundle = ((is_local and (Path(entity_id) / 'bundle.yaml').exists()) or @@ -1018,24 +1039,20 @@ class Model(object): return [app for name, app in self.applications.items() if name in handler.applications] else: - log.debug( - 'Deploying %s', entity_id) - if not is_local: - parts = entity_id[3:].split('/') - if parts[0].startswith('~'): - parts.pop(0) if not application_name: - application_name = parts[-1].split('-')[0] + application_name = entity['Meta']['charm-metadata']['Name'] if not series: - if len(parts) > 1: - series = parts[0] - else: - entity = await self.charmstore.entity(entity_id) - ss = entity['Meta']['supported-series'] - series = ss['SupportedSeries'][0] + series = self._get_series(entity_url, entity) + if not channel: + channel = 'stable' await client_facade.AddCharm(channel, entity_id) - elif not entity_id.startswith('local:'): + # XXX: we're dropping local resources here, but we don't + # actually support them yet anyway + resources = await self._add_store_resources(application_name, + entity_id, + entity) + else: # We have a local charm dir that needs to be uploaded charm_dir = os.path.abspath( os.path.expanduser(entity_id)) @@ -1046,30 +1063,89 @@ class Model(object): "Pass a 'series' kwarg to Model.deploy().".format( charm_dir)) entity_id = await self.add_local_charm_dir(charm_dir, series) - - app = client.ApplicationDeploy( - application=application_name, - channel=channel, + return await self._deploy( charm_url=entity_id, - config=config, - constraints=parse_constraints(constraints), + application=application_name, + series=series, + config=config or {}, + constraints=constraints, endpoint_bindings=bind, - num_units=num_units, resources=resources, - series=series, storage=storage, + channel=channel, + num_units=num_units, + placement=parse_placement(to) ) - app.placement = placement - result = await app_facade.Deploy([app]) - errors = [r.error.message for r in result.results if r.error] - if errors: - raise JujuError('\n'.join(errors)) - return await self._wait_for_new('application', application_name) + async def _add_store_resources(self, application, entity_url, entity=None): + if not entity: + # avoid extra charm store call if one was already made + entity = await self.charmstore.entity(entity_url) + resources = [ + { + 'description': resource['Description'], + 'fingerprint': resource['Fingerprint'], + 'name': resource['Name'], + 'path': resource['Path'], + 'revision': resource['Revision'], + 'size': resource['Size'], + 'type_': resource['Type'], + 'origin': 'store', + } for resource in entity['Meta']['resources'] + ] + + if not resources: + return None - def destroy(self): - """Terminate all machines and resources for this model. + resources_facade = client.ResourcesFacade.from_connection( + self.connection) + response = await resources_facade.AddPendingResources( + tag.application(application), + entity_url, + [client.CharmResource(**resource) for resource in resources]) + resource_map = {resource['name']: pid + for resource, pid + in zip(resources, response.pending_ids)} + return resource_map + async def _deploy(self, charm_url, application, series, config, + constraints, endpoint_bindings, resources, storage, + channel=None, num_units=None, placement=None): + """Logic shared between `Model.deploy` and `BundleHandler.deploy`. + """ + log.info('Deploying %s', charm_url) + + # stringify all config values for API, and convert to YAML + config = {k: str(v) for k, v in config.items()} + config = yaml.dump({application: config}, + default_flow_style=False) + + app_facade = client.ApplicationFacade.from_connection( + self.connection) + + app = client.ApplicationDeploy( + charm_url=charm_url, + application=application, + series=series, + channel=channel, + config_yaml=config, + constraints=parse_constraints(constraints), + endpoint_bindings=endpoint_bindings, + num_units=num_units, + resources=resources, + storage=storage, + placement=placement + ) + + result = await app_facade.Deploy([app]) + errors = [r.error.message for r in result.results if r.error] + if errors: + raise JujuError('\n'.join(errors)) + return await self._wait_for_new('application', application) + + async def destroy(self): + """Terminate all machines and resources for this model. + Is already implemented in controller.py. """ raise NotImplementedError() @@ -1077,8 +1153,7 @@ class Model(object): """Destroy units by name. """ - app_facade = client.ApplicationFacade() - app_facade.connect(self.connection) + app_facade = client.ApplicationFacade.from_connection(self.connection) log.debug( 'Destroying unit%s %s', @@ -1128,14 +1203,21 @@ class Model(object): """ raise NotImplementedError() - def grant(self, username, acl='read'): + async def grant(self, username, acl='read'): """Grant a user access to this model. :param str username: Username :param str acl: Access control ('read' or 'write') """ - raise NotImplementedError() + controller_conn = await self.connection.controller() + model_facade = client.ModelManagerFacade.from_connection( + controller_conn) + user = tag.user(username) + model = tag.model(self.info.uuid) + changes = client.ModifyModelAccess(acl, 'grant', model, user) + await self.revoke(username) + return await model_facade.ModifyModelAccess([changes]) def import_ssh_key(self, identity): """Add a public SSH key from a trusted indentity source to this model. @@ -1146,14 +1228,11 @@ class Model(object): raise NotImplementedError() import_ssh_keys = import_ssh_key - def get_machines(self, machine, utc=False): + async def get_machines(self): """Return list of machines in this model. - :param str machine: Machine id, e.g. '0' - :param bool utc: Display time as UTC in RFC3339 format - """ - raise NotImplementedError() + return list(self.state.machines.keys()) def get_shares(self): """Return list of all users with access to this model. @@ -1167,11 +1246,16 @@ class Model(object): """ raise NotImplementedError() - def get_ssh_key(self): + async def get_ssh_key(self, raw_ssh=False): """Return known SSH keys for this model. + :param bool raw_ssh: if True, returns the raw ssh key, + else it's fingerprint """ - raise NotImplementedError() + key_facade = client.KeyManagerFacade.from_connection(self.connection) + entity = {'tag': tag.model(self.info.uuid)} + entities = client.Entities([entity]) + return await key_facade.ListKeys(entities, raw_ssh) get_ssh_keys = get_ssh_key def get_storage(self, filesystem=False, volume=False): @@ -1234,13 +1318,18 @@ class Model(object): raise NotImplementedError() remove_machines = remove_machine - def remove_ssh_key(self, *keys): + async def remove_ssh_key(self, user, key): """Remove a public SSH key(s) from this model. - :param str \*keys: Keys to remove + :param str key: Full ssh key + :param str user: Juju user to which the key is registered """ - raise NotImplementedError() + key_facade = client.KeyManagerFacade.from_connection(self.connection) + key = base64.b64decode(bytes(key.strip().split()[1].encode('ascii'))) + key = hashlib.md5(key).hexdigest() + key = ':'.join(a+b for a, b in zip(key[::2], key[1::2])) + await key_facade.DeleteKeys([key], user) remove_ssh_keys = remove_ssh_key def restore_backup( @@ -1264,14 +1353,19 @@ class Model(object): """ raise NotImplementedError() - def revoke(self, username, acl='read'): + async def revoke(self, username): """Revoke a user's access to this model. :param str username: Username to revoke - :param str acl: Access control ('read' or 'write') """ - raise NotImplementedError() + controller_conn = await self.connection.controller() + model_facade = client.ModelManagerFacade.from_connection( + controller_conn) + user = tag.user(username) + model = tag.model(self.info.uuid) + changes = client.ModifyModelAccess('read', 'revoke', model, user) + return await model_facade.ModifyModelAccess([changes]) def run(self, command, timeout=None): """Run command on all machines in this model. @@ -1332,8 +1426,7 @@ class Model(object): :param bool utc: Display time as UTC in RFC3339 format """ - client_facade = client.ClientFacade() - client_facade.connect(self.connection) + client_facade = client.ClientFacade.from_connection(self.connection) return await client_facade.FullStatus(filters) def sync_tools( @@ -1413,8 +1506,8 @@ class Model(object): log.debug("Retrieving metrics for %s", ', '.join(tags) if tags else "all units") - metrics_facade = client.MetricsDebugFacade() - metrics_facade.connect(self.connection) + metrics_facade = client.MetricsDebugFacade.from_connection( + self.connection) entities = [client.Entity(tag) for tag in tags] metrics_result = await metrics_facade.GetMetrics(entities) @@ -1464,12 +1557,12 @@ class BundleHandler(object): for unit_name, unit in model.units.items(): app_units = self._units_by_app.setdefault(unit.application, []) app_units.append(unit_name) - self.client_facade = client.ClientFacade() - self.client_facade.connect(model.connection) - self.app_facade = client.ApplicationFacade() - self.app_facade.connect(model.connection) - self.ann_facade = client.AnnotationsFacade() - self.ann_facade.connect(model.connection) + self.client_facade = client.ClientFacade.from_connection( + model.connection) + self.app_facade = client.ApplicationFacade.from_connection( + model.connection) + self.ann_facade = client.AnnotationsFacade.from_connection( + model.connection) async def _handle_local_charms(self, bundle): """Search for references to local charms (i.e. filesystem paths) @@ -1533,9 +1626,6 @@ class BundleHandler(object): self.plan = await self.client_facade.GetBundleChanges( yaml.dump(self.bundle)) - if self.plan.errors: - raise JujuError('\n'.join(self.plan.errors)) - async def execute_plan(self): for step in self.plan.changes: method = getattr(self, step.method) @@ -1615,7 +1705,7 @@ class BundleHandler(object): results = await self.client_facade.AddMachines([params]) error = results.machines[0].error if error: - raise ValueError("Error adding machine: %s", error.message) + raise ValueError("Error adding machine: %s" % error.message) machine = results.machines[0].machine log.debug('Added new machine %s', machine) return machine @@ -1671,28 +1761,21 @@ class BundleHandler(object): """ # resolve indirect references charm = self.resolve(charm) - # stringify all config values for API, and convert to YAML - options = {k: str(v) for k, v in options.items()} - options = yaml.dump({application: options}, default_flow_style=False) - # build param object - app = client.ApplicationDeploy( + # the bundle plan doesn't actually do anything with resources, even + # though it ostensibly gives us something (None) for that param + if not charm.startswith('local:'): + resources = await self.model._add_store_resources(application, + charm) + await self.model._deploy( charm_url=charm, - series=series, application=application, - # Pass options to config-yaml rather than config, as - # config-yaml invokes a newer codepath that better handles - # empty strings in the options values. - config_yaml=options, - constraints=parse_constraints(constraints), - storage=storage, + series=series, + config=options, + constraints=constraints, endpoint_bindings=endpoint_bindings, resources=resources, + storage=storage, ) - # do the do - log.info('Deploying %s', charm) - await self.app_facade.Deploy([app]) - # ensure the app is in the model for future operations - await self.model._wait_for_new('application', application) return application async def addUnit(self, application, to): @@ -1760,7 +1843,7 @@ class CharmStore(object): """ def __init__(self, loop): self.loop = loop - self._cs = charmstore.CharmStore() + self._cs = theblues.charmstore.CharmStore(timeout=5) def __getattr__(self, name): """ @@ -1774,7 +1857,13 @@ class CharmStore(object): else: async def coro(*args, **kwargs): method = partial(attr, *args, **kwargs) - return await self.loop.run_in_executor(None, method) + for attempt in range(1, 4): + try: + return await self.loop.run_in_executor(None, method) + except theblues.errors.ServerError: + if attempt == 3: + raise + await asyncio.sleep(1, loop=self.loop) setattr(self, name, coro) wrapper = coro return wrapper