X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Futils%2Fjuju_api.py;h=3f25c2a1d839566f9a51fb9bb501c95dbe72cd85;hb=95bd37e7dc1ccc3a18be8ab21d703c6e405fb824;hp=7ba53047a49d5f849ed8e6816b724dccc060b41a;hpb=9e86badd6a6b11888cd247d8f5ccf1d428682ea9;p=osm%2FSO.git diff --git a/common/python/rift/mano/utils/juju_api.py b/common/python/rift/mano/utils/juju_api.py old mode 100644 new mode 100755 index 7ba53047..3f25c2a1 --- a/common/python/rift/mano/utils/juju_api.py +++ b/common/python/rift/mano/utils/juju_api.py @@ -16,19 +16,13 @@ import argparse import asyncio -from functools import partial import logging import os import ssl -import sys -import time -try: - from jujuclient.juju1.environment import Environment as Env1 - from jujuclient.juju2.environment import Environment as Env2 -except ImportError as e: - # Try importing older jujuclient - from jujuclient import Environment as Env1 +import juju.loop +from juju.controller import Controller +from juju.model import Model, ModelObserver try: ssl._create_default_https_context = ssl._create_unverified_context @@ -102,27 +96,65 @@ class JujuActionExecError(JujuActionError): pass +class JujuAuthenticationError(Exception): + pass + + +class JujuMonitor(ModelObserver): + """Monitor state changes within the Juju Model.""" + # async def on_change(self, delta, old, new, model): + # """React to changes in the Juju model.""" + # + # # TODO: Setup the hook to update the UI if the status of a unit changes + # # to be used when deploying a charm and waiting for it to be "ready" + # if delta.entity in ['application', 'unit'] and delta.type == "change": + # pass + # + # # TODO: Add a hook when an action is complete + + pass + + class JujuApi(object): - ''' - JujuApi wrapper on jujuclient library + """JujuApi wrapper on jujuclient library. There should be one instance of JujuApi for each VNF manged by Juju. Assumption: Currently we use one unit per service/VNF. So once a service is deployed, we store the unit name and reuse it -''' + """ + log = None + controller = None + models = {} + model = None + model_name = None + model_uuid = None + authenticated = False + + def __init__(self, + log=None, + loop=None, + server='127.0.0.1', + port=17070, + user='admin', + secret=None, + version=None, + model_name='default', + ): + """Initialize with the Juju credentials.""" + + if log: + self.log = log + else: + self.log = logging.getLogger(__name__) + + # Quiet websocket traffic + logging.getLogger('websockets.protocol').setLevel(logging.INFO) + + self.log.debug('JujuApi: instantiated') - def __init__ (self, - log=None, - loop=None, - server='127.0.0.1', - port=17070, - user='admin', - secret=None, - version=None): - '''Initialize with the Juju credentials''' self.server = server self.port = port @@ -132,939 +164,690 @@ class JujuApi(object): else: self.user = 'user-{}'.format(user) - self.loop = loop + self.endpoint = '%s:%d' % (server, int(port)) - if log is not None: - self.log = log - else: - self.log = JujuApi._get_logger() + self.model_name = model_name - if self.log is None: - raise JujuApiError("Logger not defined") + if loop: + self.loop = loop - self.version = None - if version: - self.version = version - else: - try: - if Env2: - pass - except NameError: - self.log.warn("Using older version of Juju client, which " \ - "supports only Juju 1.x") - self.version = 1 - - endpoint = 'wss://%s:%d' % (server, int(port)) - self.endpoint = endpoint - - self.charm = None # Charm used - self.service = None # Service deployed - self.units = [] # Storing as list to support more units in future - - self.destroy_retries = 25 # Number retires to destroy service - self.retry_delay = 5 # seconds - - def __str__(self): - return ("JujuApi-{}".format(self.endpoint)) - - @classmethod - def _get_logger(cls): - if cls.log is not None: - return cls.log - - fmt = logging.Formatter( - '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \ - '%(filename)s:%(lineno)d) - %(message)s') - stderr_handler = logging.StreamHandler(stream=sys.stderr) - stderr_handler.setFormatter(fmt) - logging.basicConfig(level=logging.DEBUG) - cls.log = logging.getLogger('juju-api') - cls.log.addHandler(stderr_handler) - - return cls.log - - @staticmethod - def format_charm_name(name): - '''Format the name to valid charm name - - Charm service name accepts only a to z and -. - ''' - - new_name = '' - for c in name: - if c.isdigit(): - c = chr(97 + int(c)) - elif not c.isalpha(): - c = "-" - new_name += c - return new_name.lower() - - def _get_version_tag(self, tag): - version_tag_map = { - 'applications': { - 1: 'Services', - 2: 'applications', - }, - 'units': { - 1: 'Units', - 2: 'units', - }, - 'status': { - 1: 'Status', - 2: 'status', - }, - 'workload-status': { - 1: 'Workload', - 2: 'workload-status', - }, - 'charm-url': { - 1: 'CharmURL', - 2: 'charm-url', - }, - } + def __del__(self): + """Close any open connections.""" + yield self.logout() - return version_tag_map[tag][self.version] + async def add_relation(self, a, b, via=None): + """ + Add a relation between two application endpoints. - def _get_env1(self): - try: - env = Env1(self.endpoint) - l = env.login(self.secret, user=self.user) - return env - - except ConnectionRefusedError as e: - msg = "{}: Failed Juju 1.x connect: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise e - - except Exception as e: - msg = "{}: Failed Juju 1.x connect: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuEnvError(msg) - - def _get_env2(self): - try: - env = Env2(self.endpoint) - l = env.login(self.secret, user=self.user) - except KeyError as e: - msg = "{}: Failed Juju 2.x connect: {}".format(self, e) - self.log.debug(msg) - raise JujuVersionError(msg) + :param a An application endpoint + :param b An application endpoint + :param via The egress subnet(s) for outbound traffic, e.g., (192.168.0.0/16,10.0.0.0/8) + """ + if not self.authenticated: + await self.login() + m = await self.get_model() try: - models = env.models.list() - for m in models['user-models']: - if m['model']['name'] == 'default': - mep = '{}/model/{}/api'.format(self.endpoint, - m['model']['uuid']) - model = Env2(mep, env_uuid=m['model']['uuid']) - l = model.login(self.secret, user=self.user) - break - - if model is None: - raise - - return model - - except Exception as e: - msg = "{}: Failed logging to model: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - env.close() - raise JujuModelError(msg) - - def _get_env(self): - self.log.debug("{}: Connect to endpoint {}". - format(self, self.endpoint)) - - if self.version is None: - # Try version 2 first - try: - env = self._get_env2() - self.version = 2 - - except JujuVersionError as e: - self.log.info("Unable to login as Juju 2.x, trying 1.x") - env = self._get_env1() - self.version = 1 - - return env - - elif self.version == 2: - return self._get_env2() - - elif self.version == 1: - return self._get_env1() - - else: - msg = "{}: Unknown version set: {}".format(self, self.version) - self.log.error(msg) - raise JujuVersionError(msg) - - @asyncio.coroutine - def get_env(self): - ''' Connect to the Juju controller''' - env = yield from self.loop.run_in_executor( - None, - self._get_env, - ) - return env - - def _get_status(self, env=None): - if env is None: - env = self._get_env() - - try: - status = env.status() - return status - - except Exception as e: - msg = "{}: exception in getting status: {}". \ - format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuStatusError(msg) - - @asyncio.coroutine - def get_status(self, env=None): - '''Get Juju controller status''' - pf = partial(self._get_status, env=env) - status = yield from self.loop.run_in_executor( - None, - pf, - ) + m.add_relation(a, b, via) + finally: + await m.disconnect() + + async def apply_config(self, config, application): + """Apply a configuration to the application.""" + self.log.debug("JujuApi: Applying configuration to {}.".format( + application + )) + return await self.set_config(application=application, config=config) + + async def deploy_application(self, charm, name="", path="", specs={}): + """ + Deploy an application. + + Deploy an application to a container or a machine already provisioned + by the OSM Resource Orchestrator (requires the Juju public ssh key + installed on the new machine via cloud-init). + + :param str charm: The name of the charm + :param str name: The name of the application, if different than the charm + :param str path: The path to the charm + :param dict machine: A dictionary identifying the machine to manage via Juju + Examples:: + + deploy_application(..., specs={'host': '10.0.0.4', 'user': 'ubuntu'}) + """ + if not self.authenticated: + await self.login() + + # Check that the charm is valid and exists. + if charm is None: + return None + + app = await self.get_application(name) + if app is None: + + # Check for specific machine placement + to = None + if all(k in specs for k in ['hostname', 'username']): + machine = await self.model.add_machine(spec='ssh:%@%'.format( + specs['host'], + specs['user'], + )) + to = machine.id + + # TODO: Handle the error if the charm isn't found. + self.log.debug("JujuApi: Deploying charm {} ({}) from {}".format( + charm, + name, + path, + to=to, + )) + app = await self.model.deploy( + path, + application_name=name, + series='xenial', + ) + return app + deploy_service = deploy_application + + async def get_action_status(self, uuid): + """Get the status of an action.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Waiting for status of action uuid {}".format(uuid)) + action = await self.model.wait_for_action(uuid) + return action.status + + async def get_application(self, application): + """Get the deployed application.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Getting application {}".format(application)) + app = None + if application and self.model: + if self.model.applications: + if application in self.model.applications: + app = self.model.applications[application] + return app + + async def get_application_status(self, application): + """Get the status of an application.""" + if not self.authenticated: + await self.login() + + status = None + app = await self.get_application(application) + if app: + status = app.status + self.log.debug("JujuApi: Status of application {} is {}".format( + application, + str(status), + )) return status + get_service_status = get_application_status - def get_all_units(self, status, service=None): - '''Parse the status and get the units''' - results = {} - services = status.get(self._get_version_tag('applications'), {}) - - for svc_name, svc_data in services.items(): - if service and service != svc_name: - continue - units = svc_data[self._get_version_tag('units')] or {} + async def get_config(self, application): + """Get the configuration of an application.""" + if not self.authenticated: + await self.login() - results[svc_name] = {} - for unit in units: - results[svc_name][unit] = \ - units[unit][self._get_version_tag('workload-status')] \ - [self._get_version_tag('status')] or None - return results + config = None + app = await self.get_application(application) + if app: + config = await app.get_config() + self.log.debug("JujuApi: Config of application {} is {}".format( + application, + str(config), + )) - def _get_service_units(self, service=None, status=None, env=None): - if service is None: - service = self.service + return config - # Optimizing calls to Juju, as currently we deploy only 1 unit per - # service. - if self.service == service and len(self.units): - return self.units + async def get_model(self, name='default'): + """Get a model from the Juju Controller. - if env is None: - env = self._get_env() - - if status is None: - status = self._get_status(env=env) - - try: - resp = self.get_all_units(status, service=service) - self.log.debug("Get all units: {}".format(resp)) - units = set(resp[service].keys()) - - if self.service == service: - self.units = units - - return units - - except Exception as e: - msg = "{}: exception in get units {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuUnitsError(msg) - - @asyncio.coroutine - def get_service_units(self, service=None, status=None, env=None): - '''Get the unit names for a service''' - pf = partial(self._get_service_units, - service=service, - status=status, - env=env) - units = yield from self.loop.run_in_executor( - None, - pf, - ) - return units + Note: Model objects returned must call disconnected() before it goes + out of scope.""" + if not self.authenticated: + await self.login() - def _get_service_status(self, service=None, status=None, env=None): - if env is None: - env = self._get_env() + model = Model() - if status is None: - status = self._get_status(env=env) + uuid = await self.get_model_uuid(name) - if service is None: - service = self.service + self.log.debug("JujuApi: Connecting to model {} ({})".format( + model, + uuid, + )) - try: - srv_status = status[self._get_version_tag('applications')] \ - [service][self._get_version_tag('status')] \ - [self._get_version_tag('status')] - self.log.debug("{}: Service {} status is {}". - format(self, service, srv_status)) - return srv_status - - except KeyError as e: - self.log.info("self: Did not find service {}, e={}".format(self, service, e)) - return 'NA' - - except Exception as e: - msg = "{}: exception checking service status for {}, e {}". \ - format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuStatusError(msg) - - - @asyncio.coroutine - def get_service_status(self, service=None, status=None, env=None): - ''' Get service status - - maintenance : The unit is not yet providing services, but is actively doing stuff. - unknown : Service has finished an event but the charm has not called status-set yet. - waiting : Service is unable to progress to an active state because of dependency. - blocked : Service needs manual intervention to get back to the Running state. - active : Service correctly offering all the services. - NA : Service is not deployed - ''' - pf = partial(self._get_service_status, - service=service, - status=status, - env=env) - srv_status = yield from self.loop.run_in_executor( - None, - pf, - ) - return srv_status - - def _is_service_deployed(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp not in ['terminated', 'NA']: - return True - - return False - - @asyncio.coroutine - def is_service_deployed(self, service=None, status=None, env=None): - '''Check if the service is deployed''' - pf = partial(self._is_service_deployed, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_error(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['error']: - return True - - return False - - @asyncio.coroutine - def is_service_error(self, service=None, status=None, env=None): - '''Check if the service is in error state''' - pf = partial(self._is_service_error, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_maint(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['maintenance']: - return True - - return False - - @asyncio.coroutine - def is_service_maint(self, service=None, status=None, env=None): - '''Check if the service is in error state''' - pf = partial(self._is_service_maint, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_active(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['active']: - return True - - return False - - @asyncio.coroutine - def is_service_active(self, service=None, status=None, env=None): - '''Check if the service is active''' - pf = partial(self._is_service_active, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_blocked(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['blocked']: - return True - - return False - - @asyncio.coroutine - def is_service_blocked(self, service=None, status=None, env=None): - '''Check if the service is blocked''' - pf = partial(self._is_service_blocked, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( + await model.connect( + self.endpoint, + uuid, + self.user, + self.secret, None, - pf, ) - return rc - def _is_service_up(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) + return model - if resp in ['active', 'blocked']: - return True + async def get_model_uuid(self, name='default'): + """Get the UUID of a model. - return False + Iterate through all models in a controller and find the matching model. + """ + if not self.authenticated: + await self.login() - @asyncio.coroutine - def is_service_up(self, service=None, status=None, env=None): - '''Check if the service is installed and up''' - pf = partial(self._is_service_up, - service=service, - status=status, - env=env) + uuid = None - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _apply_config(self, config, service=None, env=None): - if service is None: - service = self.service - - if config is None or len(config) == 0: - self.log.warn("{}: Empty config passed for service {}". - format(self, service)) - return - - if env is None: - env = self._get_env() + models = await self.controller.get_models() - status = self._get_status(env=env) + self.log.debug("JujuApi: Looking through {} models for model {}".format( + len(models.user_models), + name, + )) + for model in models.user_models: + if model.model.name == name: + uuid = model.model.uuid + break - if not self._is_service_deployed(service=service, - status=status, - env=env): - raise JujuSrvNotDeployedError("{}: service {} is not deployed". - format(self, service)) + return uuid - self.log.debug("{}: Config for service {} update to: {}". - format(self, service, config)) - try: - # Try to fix error on service, most probably due to config issue - if self._is_service_error(service=service, status=status, env=env): - self._resolve_error(service=service, env=env) - - if self.version == 2: - env.service.set(service, config) - else: - env.set_config(service, config) - - except Exception as e: - self.log.error("{}: exception setting config for {} with {}, e {}". - format(self, service, config, e)) - self.log.exception(e) - raise e - - @asyncio.coroutine - def apply_config(self, config, service=None, env=None, wait=True): - '''Apply a config on the service''' - pf = partial(self._apply_config, - config, - service=service, - env=env) - yield from self.loop.run_in_executor( - None, - pf, - ) + async def get_status(self): + """Get the model status.""" + if not self.authenticated: + await self.login() - if wait: - # Wait till config finished applying - self.log.debug("{}: Wait for config apply to finish". - format(self)) - delay = 3 # secs - maint = True - while maint: - # Sleep first to give time for config_changed hook to be invoked - yield from asyncio.sleep(delay, loop=self.loop) - maint = yield from self.is_service_maint(service=service, - env=env) - - err = yield from self.is_service_error(service=service, env=env) - if err: - self.log.error("{}: Service is in error state". - format(self)) - return False - - self.log.debug("{}: Finished applying config".format(self)) - return True - - def _set_parameter(self, parameter, value, service=None): - return self._apply_config({parameter : value}, service=service) - - @asyncio.coroutine - def set_parameter(self, parameter, value, service=None): - '''Set a config parameter for a service''' - return self.apply_config({parameter : value}, service=service) - - def _resolve_error(self, service=None, status=None, env=None): - if env is None: - env = self._get_env() - - if status is None: - status = self._get_status(env=env) - - if service is None: - service = self.service - - if env is None: - env = self._get_env() - if self._is_service_deployed(service=service, status=status): - units = self.get_all_units(status, service=service) - - for unit, ustatus in units[service].items(): - if ustatus == 'error': - self.log.info("{}: Found unit {} with status {}". - format(self, unit, ustatus)) - try: - # Takes the unit name as service_name/idx unlike action - env.resolved(unit) - - except Exception as e: - msg = "{}: Resolve on unit {}: {}". \ - format(self, unit, e) - self.log.warn(msg) - - @asyncio.coroutine - def resolve_error(self, service=None, status=None, env=None): - '''Resolve units in error state''' - pf = partial(self._resolve_error, - service=service, - status=status, - env=env) - yield from self.loop.run_in_executor( - None, - pf, - ) + if not self.model: + self.model = self.get_model(self.model_name) - def _deploy_service(self, charm, service, - path=None, config=None, env=None): - self.log.debug("{}: Deploy service for charm {}({}) with service {}". - format(self, charm, path, service)) + class model_state: + applications = {} + machines = {} + relations = {} - if env is None: - env = self._get_env() + self.log.debug("JujuApi: Getting model status") + status = model_state() + status.applications = self.model.applications + status.machines = self.model.machines - self.service = service - self.charm = charm + return status - if self._is_service_deployed(service=service, env=env): - self.log.info("{}: Charm service {} already deployed". - format (self, service)) - if config: - self._apply_config(config, service=service, env=env) + async def is_application_active(self, application): + """Check if the application is in an active state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['active']: + state = True + + self.log.debug("JujuApi: Application {} is {} active".format( + application, + "" if status else "not", + )) + + return state + is_service_active = is_application_active + + async def is_application_blocked(self, application): + """Check if the application is in a blocked state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['blocked']: + state = True + + self.log.debug("JujuApi: Application {} is {} blocked".format( + application, + "" if status else "not", + )) + + return state + is_service_blocked = is_application_blocked + + async def is_application_deployed(self, application): + """Check if the application is in a deployed state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['active']: + state = True + self.log.debug("JujuApi: Application {} is {} deployed".format( + application, + "" if status else "not", + )) + + return state + is_service_deployed = is_application_deployed + + async def is_application_error(self, application): + """Check if the application is in an error state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['error']: + state = True + self.log.debug("JujuApi: Application {} is {} errored".format( + application, + "" if status else "not", + )) + + return state + is_service_error = is_application_error + + async def is_application_maint(self, application): + """Check if the application is in a maintenance state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['maintenance']: + state = True + self.log.debug("JujuApi: Application {} is {} in maintenence".format( + application, + "" if status else "not", + )) + + return state + is_service_maint = is_application_maint + + async def is_application_up(self, application=None): + """Check if the application is up.""" + if not self.authenticated: + await self.login() + state = False + + status = await self.get_application_status(application) + if status and status in ['active', 'blocked']: + state = True + self.log.debug("JujuApi: Application {} is {} up".format( + application, + "" if status else "not", + )) + return state + is_service_up = is_application_up + + async def login(self): + """Login to the Juju controller.""" + if self.authenticated: return + cacert = None + self.controller = Controller() + + self.log.debug("JujuApi: Logging into controller") + + if self.secret: + await self.controller.connect( + self.endpoint, + self.user, + self.secret, + cacert, + ) + else: + await self.controller.connect_current() - series = "trusty" + self.authenticated = True + self.model = await self.get_model(self.model_name) - deploy_to = None - if self.version == 1: - deploy_to = "lxc:0" + async def logout(self): + """Logout of the Juju controller.""" + if not self.authenticated: + return - if path is None: - prefix=os.getenv('RIFT_INSTALL', '/') - path = os.path.join(prefix, 'usr/rift/charms', series, charm) + if self.model: + await self.model.disconnect() + self.model = None + if self.controller: + await self.controller.disconnect() + self.controller = None - try: - self.log.debug("{}: Local charm settings: dir={}, series={}". - format(self, path, series)) - result = env.add_local_charm_dir(path, series) - url = result[self._get_version_tag('charm-url')] - - except Exception as e: - msg = '{}: Error setting local charm directory {} for {}: {}'. \ - format(self, path, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuAddCharmError(msg) + self.authenticated = False - try: - self.log.debug("{}: Deploying using: service={}, url={}, to={}, config={}". - format(self, service, url, deploy_to, config)) - env.deploy(service, url, config=config, machine_spec=deploy_to) - - except Exception as e: - msg = '{}: Error deploying {}: {}'.format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuDeployError(msg) - - @asyncio.coroutine - def deploy_service(self, charm, service, - wait=False, timeout=300, - path=None, config=None): - '''Deploy a service using the charm name provided''' - env = yield from self.get_env() - - pf = partial(self._deploy_service, - charm, - service, - path=path, - config=config, - env=env) - yield from self.loop.run_in_executor( - None, - pf, - ) + async def remove_application(self, name): + """Remove the application.""" + if not self.authenticated: + await self.login() - rc = True - if wait is True: - # Wait for the deployed units to start - try: - self.log.debug("{}: Waiting for service {} to come up". - format(self, service)) - rc = yield from self.wait_for_service(timeout=timeout, env=env) - - except Exception as e: - msg = '{}: Error starting all units for {}: {}'. \ - format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuWaitUnitsError(msg) - - return rc - - @asyncio.coroutine - def wait_for_service(self, service=None, timeout=0, env=None): - '''Wait for the service to come up''' - if service is None: - service = self.service - - if env is None: - env = yield from self.get_env() - - status = yield from self.get_status(env=env) - - if self._is_service_up(service=service, status=status, env=env): - self.log.debug("{}: Service {} is already up". - format(self, service)) - return True - - # Check if service is deployed - if not self._is_service_deployed(service=service, status=status, env=env): - raise JujuSrvNotDeployedError("{}: service {} is not deployed". - format(self, service)) - - if timeout < 0: - timeout = 0 - - count = 0 - delay = self.retry_delay # seconds - self.log.debug("{}: In wait for service {}".format(self, service)) - - start_time = time.time() - max_time = time.time() + timeout - while timeout != 0 and (time.time() <= max_time): - count += 1 - rc = yield from self.is_service_up(service=service, env=env) - if rc: - self.log.debug("{}: Service {} is up after {} seconds". - format(self, service, time.time()-start_time)) - return True - yield from asyncio.sleep(delay, loop=self.loop) - return False - - def _destroy_service(self, service=None): - '''Destroy a service on Juju controller''' - self.log.debug("{}: Destroy charm service: {}".format(self,service)) - - if service is None: - service = self.service - - env = self._get_env() - - status = self._get_status(env=env) - - count = 0 - while self._is_service_deployed(service=service, status=status, env=env): - count += 1 - self.log.debug("{}: Destroy service {}, count {}". - format(self, service, count)) - - if count > self.destroy_retries: - msg = "{}: Not able to destroy service {} after {} tries". \ - format(self, service, count) - self.log.error(msg) - raise JujuDestroyError(msg) - - - if self._is_service_error(service=service, status=status): - self._resolve_error(service, status) - - try: - env.destroy_service(service) - - except Exception as e: - msg = "{}: Exception when running destroy on service {}: {}". \ - format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuDestroyError(msg) - - time.sleep(self.retry_delay) - status = self._get_status(env=env) - - self.log.debug("{}: Destroyed service {} ({})". - format(self, service, count)) - - @asyncio.coroutine - def destroy_service(self, service=None): - '''Destroy a service on Juju controller''' - pf = partial(self._destroy_service, - service=service) - yield from self.loop.run_in_executor( - None, - pf, - ) + app = await self.get_application(name) + if app: + self.log.debug("JujuApi: Destroying application {}".format( + name, + )) + await app.destroy() - def _get_action_status(self, action_tag, env=None): - if env is None: - env = self._get_env() + async def remove_relation(self, a, b): + """ + Remove a relation between two application endpoints - if not action_tag.startswith('action-'): - action_tag = 'action-{}'.format(action_tag) + :param a An application endpoint + :param b An application endpoint + """ + if not self.authenticated: + await self.login() + m = await self.get_model() try: - action = env.actions - except Exception as e: - msg = "{}: exception in Action API: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionApiError(msg) + m.remove_relation(a, b) + finally: + await m.disconnect() + + async def resolve_error(self, application=None): + """Resolve units in error state.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(application) + if app: + self.log.debug("JujuApi: Resolving errors for application {}".format( + application, + )) + + for unit in app.units: + app.resolved(retry=True) + + async def run_action(self, application, action_name, **params): + """Execute an action and return an Action object.""" + if not self.authenticated: + await self.login() + result = { + 'status': '', + 'action': { + 'tag': None, + 'results': None, + } + } + app = await self.get_application(application) + if app: + # We currently only have one unit per application + # so use the first unit available. + unit = app.units[0] - try: - status = action.info([{'Tag': action_tag}]) - - self.log.debug("{}: Action {} status {}". - format(self, action_tag, status)) - return status['results'][0] - - except Exception as e: - msg = "{}: exception in get action status {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionInfoError(msg) - - @asyncio.coroutine - def get_action_status(self, action_tag, env=None): - ''' - Get the status of an action queued on the controller - - responds with the action status, which is one of three values: - - - completed - - pending - - failed - - @param action_tag - the action UUID return from the enqueue method - eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9 - ''' - pf = partial(self._get_action_status, - action_tag, - env=env,) - status = yield from self.loop.run_in_executor( - None, - pf, - ) - return status + self.log.debug("JujuApi: Running Action {} against Application {}".format( + action_name, + application, + )) - def _execute_action(self, action_name, params, service=None, env=None): - '''Execute the action on all units of a service''' - if service is None: - service = self.service + action = await unit.run_action(action_name, **params) - if env is None: - env = self._get_env() + # Wait for the action to complete + await action.wait() - try: - action = env.actions - except Exception as e: - msg = "{}: exception in Action API: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionApiError(msg) - - units = self._get_service_units(service) - self.log.debug("{}: Apply action {} on units {}". - format(self, action_name, units)) - - # Rename units from / to unit-- - unit_tags = [] - for unit in units: - idx = int(unit[unit.index('/')+1:]) - unit_name = "unit-%s-%d" % (service, idx) - unit_tags.append(unit_name) - self.log.debug("{}: Unit tags for action: {}". - format(self, unit_tags)) + result['status'] = action.status + result['action']['tag'] = action.data['id'] + result['action']['results'] = action.results - try: - result = action.enqueue_units(unit_tags, action_name, params) - self.log.debug("{}: Response for action: {}". - format(self, result)) - return result['results'][0] - - except Exception as e: - msg = "{}: Exception enqueing action {} on units {} with " \ - "params {}: {}".format(self, action, unit_tags, params, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionExecError(msg) - - @asyncio.coroutine - def execute_action(self, action_name, params, service=None, env=None): - '''Execute an action for a service on the controller - - Currently, we execute the action on all units of the service - ''' - pf = partial(self._execute_action, - action_name, - params, - service=service, - env=env) - result = yield from self.loop.run_in_executor( - None, - pf, - ) return result + execute_action = run_action + + async def set_config(self, application, config): + """Apply a configuration to the application.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(application) + if app: + self.log.debug("JujuApi: Setting config for Application {}".format( + application, + )) + await app.set_config(config) + + # Verify the config is set + newconf = await app.get_config() + for key in config: + if config[key] != newconf[key]: + self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key])) + + + async def set_parameter(self, parameter, value, application=None): + """Set a config parameter for a service.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Setting {}={} for Application {}".format( + parameter, + value, + application, + )) + return await self.apply_config( + {parameter: value}, + application=application, + ) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Test Juju') - parser.add_argument("-s", "--server", default='10.0.202.49', help="Juju controller") - parser.add_argument("-u", "--user", default='admin', help="User, default user-admin") - parser.add_argument("-p", "--password", default='nfvjuju', help="Password for the user") - parser.add_argument("-P", "--port", default=17070, help="Port number, default 17070") - parser.add_argument("-d", "--directory", help="Local directory for the charm") - parser.add_argument("--service", help="Charm service name") - parser.add_argument("--vnf-ip", help="IP of the VNF to configure") - args = parser.parse_args() - + async def wait_for_application(self, name, timeout=300): + """Wait for an application to become active.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(name) + if app: + self.log.debug("JujuApi: Waiting {} seconds for Application {}".format( + timeout, + name, + )) + + await self.model.block_until( + lambda: all( + unit.agent_status == 'idle' + and unit.workload_status + in ['active', 'unknown'] for unit in app.units + ), + timeout=timeout, + ) + + +def get_argparser(): + parser = argparse.ArgumentParser(description='Test Driver for Juju API') + + ################### + # Authentication # + ################### + parser.add_argument( + "-s", "--server", + default='10.0.202.49', + help="Juju controller", + ) + parser.add_argument( + "-u", "--user", + default='admin', + help="User, default user-admin" + ) + parser.add_argument( + "-p", "--password", + default='', + help="Password for the user" + ) + parser.add_argument( + "-P", "--port", + default=17070, + help="Port number, default 17070" + ) + parser.add_argument( + "-m", "--model", + default='default', + help="The model to connect to." + ) + + ########## + # Charm # + ########## + parser.add_argument( + "-d", "--directory", + help="Local directory for the charm" + ) + parser.add_argument( + "--application", + help="Charm name" + ) + + ############# + # Placement # + ############# + + """ + To deploy to a non-Juju machine, provide the host and + credentials for Juju to manually provision (host, username, (password or key?)) + + """ + parser.add_argument( + "--proxy", + action='store_true', + help="Deploy as a proxy charm.", + ) + parser.add_argument( + "--no-proxy", + action='store_false', + dest='proxy', + help="Deploy as a full charm.", + ) + parser.set_defaults(proxy=True) + + # Test options? + # unit test? + ####### + # VNF # + ####### + + return parser.parse_args() + + +async def deploy_charm_and_wait(): + args = get_argparser() + + # Set logging level to debug so we can see verbose output from the + # juju library. + logging.basicConfig(level=logging.DEBUG) + + # Quiet logging from the websocket library. If you want to see + # everything sent over the wire, set this to DEBUG. + ws_logger = logging.getLogger('websockets.protocol') + ws_logger.setLevel(logging.INFO) + + """Here's an example of a coroutine that will deploy a charm and wait until + it's ready to be used.""" api = JujuApi(server=args.server, port=args.port, user=args.user, - secret=args.password) + secret=args.password, + # loop=loop, + log=ws_logger, + model_name=args.model + ) + print("Logging in...") + await api.login() - env = api._get_env() - if env is None: - raise "Not able to login to the Juju controller" + if api.authenticated: + status = await api.get_status() + print('Applications:', list(status.applications.keys())) + print('Machines:', list(status.machines.keys())) - print("Status: {}".format(api._get_status(env=env))) + if args.directory and args.application: - if args.directory and args.service: # Deploy the charm - charm = os.path.basename(args.directory) - api._deploy_service(charm, args.service, - path=args.directory, - env=env) - - while not api._is_service_up(): - time.sleep(5) - - print ("Service {} is deployed with status {}". - format(args.service, api._get_service_status())) - - if args.vnf_ip and \ - ('clearwater-aio' in args.directory): - # Execute config on charm - api._apply_config({'proxied_ip': args.vnf_ip}) - - while not api._is_service_active(): - time.sleep(10) - - print ("Service {} is in status {}". - format(args.service, api._get_service_status())) - - res = api._execute_action('create-update-user', {'number': '125252352525', - 'password': 'asfsaf'}) - - print ("Action 'creat-update-user response: {}".format(res)) - - status = res['status'] - while status not in [ 'completed', 'failed' ]: - time.sleep(2) - status = api._get_action_status(res['action']['tag'])['status'] - - print("Action status: {}".format(status)) + charm = os.path.basename( + os.path.expanduser( + os.path.dirname(args.directory) + ) + ) + await api.deploy_application(charm, + name=args.application, + path=args.directory, + ) + + # Wait for the application to fully deploy. This will block until the + # agent is in an idle state, and the charm's workload is either + # 'active' or 'unknown', meaning it's ready but the author did not + # explicitly set a workload state. + print("Waiting for application '{}' to deploy...".format(charm)) + while (True): + # Deploy the charm and wait, periodically checking its status + await api.wait_for_application(charm, 30) + + error = await api.is_application_error(charm) + if error: + print("This application is in an error state.") + break + + blocked = await api.is_application_blocked(charm) + if blocked: + print("This application is blocked.") + break + + # An extra check to see if the charm is ready + up = await api.is_application_up(charm) + print("Application is {}".format("up" if up else "down")) + + print("Service {} is deployed".format(args.application)) + + ################################### + # Execute config on a proxy charm # + ################################### + config = await api.get_config(args.application) + hostname = config['ssh-username']['value'] + rhostname = hostname[::-1] + + # Apply the configuration + await api.apply_config( + {'ssh-username': rhostname}, application=args.application + ) - # This action will fail as the number is non-numeric - res = api._execute_action('delete-user', {'number': '125252352525asf'}) + # Get the configuration + config = await api.get_config(args.application) + + # Verify the configuration has been updated + assert(config['ssh-username']['value'] == rhostname) + + #################################### + # Get the status of an application # + #################################### + status = await api.get_application_status(charm) + print("Application Status: {}".format(status)) + + ########################### + # Execute a simple action # + ########################### + result = await api.run_action(charm, 'get-ssh-public-key') + print("Action {} status is {} and returned {}".format( + result['status'], + result['action']['tag'], + result['action']['results'] + )) + + ##################################### + # Execute an action with parameters # + ##################################### + result = await api.run_action(charm, 'run', command='hostname') + + print("Action {} status is {} and returned {}".format( + result['status'], + result['action']['tag'], + result['action']['results'] + )) + + print("Logging out...") + await api.logout() + api = None + +# get public key in juju controller? that can be pulled without need of a charm deployed and installed to vm via cloud-init - print ("Action 'delete-user response: {}".format(res)) +if __name__ == "__main__": + # Create a single event loop for running code asyncronously. + loop = asyncio.get_event_loop() - status = res['status'] - while status not in [ 'completed', 'failed' ]: - time.sleep(2) - status = api._get_action_status(res['action']['tag'])['status'] + # An initial set of tasks to run + tasks = [ + deploy_charm_and_wait(), + ] - print("Action status: {}".format(status)) + # TODO: optionally run forever and use a Watcher to monitor what's happening + loop.run_until_complete(asyncio.wait(tasks))