X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=common%2Fpython%2Frift%2Fmano%2Futils%2Fjuju_api.py;h=a06c1e7eb205069447c27c153685ec35bd6fa1bd;hb=refs%2Ftags%2Fv3.0.0rc14;hp=3d7b369ffa77aa53137c7c2779453d0c36e275ec;hpb=a9b5f161a58fc97d5690c647ce808ba53e0e86f4;p=osm%2FSO.git diff --git a/common/python/rift/mano/utils/juju_api.py b/common/python/rift/mano/utils/juju_api.py old mode 100644 new mode 100755 index 3d7b369f..a06c1e7e --- a/common/python/rift/mano/utils/juju_api.py +++ b/common/python/rift/mano/utils/juju_api.py @@ -16,19 +16,13 @@ import argparse import asyncio -from functools import partial import logging import os import ssl -import sys -import time -try: - from jujuclient.juju1.environment import Environment as Env1 - from jujuclient.juju2.environment import Environment as Env2 -except ImportError as e: - # Try importing older jujuclient - from jujuclient import Environment as Env1 +import juju.loop +from juju.controller import Controller +from juju.model import Model, ModelObserver try: ssl._create_default_https_context = ssl._create_unverified_context @@ -102,27 +96,65 @@ class JujuActionExecError(JujuActionError): pass +class JujuAuthenticationError(Exception): + pass + + +class JujuMonitor(ModelObserver): + """Monitor state changes within the Juju Model.""" + # async def on_change(self, delta, old, new, model): + # """React to changes in the Juju model.""" + # + # # TODO: Setup the hook to update the UI if the status of a unit changes + # # to be used when deploying a charm and waiting for it to be "ready" + # if delta.entity in ['application', 'unit'] and delta.type == "change": + # pass + # + # # TODO: Add a hook when an action is complete + + pass + + class JujuApi(object): - ''' - JujuApi wrapper on jujuclient library + """JujuApi wrapper on jujuclient library. There should be one instance of JujuApi for each VNF manged by Juju. Assumption: Currently we use one unit per service/VNF. So once a service is deployed, we store the unit name and reuse it -''' + """ + log = None + controller = None + models = {} + model = None + model_name = None + model_uuid = None + authenticated = False + + def __init__(self, + log=None, + loop=None, + server='127.0.0.1', + port=17070, + user='admin', + secret=None, + version=None, + model_name='default', + ): + """Initialize with the Juju credentials.""" + + if log: + self.log = log + else: + self.log = logging.getLogger(__name__) + + # Quiet websocket traffic + logging.getLogger('websockets.protocol').setLevel(logging.INFO) + + self.log.debug('JujuApi: instantiated') - def __init__ (self, - log=None, - loop=None, - server='127.0.0.1', - port=17070, - user='admin', - secret=None, - version=None): - '''Initialize with the Juju credentials''' self.server = server self.port = port @@ -132,941 +164,603 @@ class JujuApi(object): else: self.user = 'user-{}'.format(user) - self.loop = loop + self.endpoint = '%s:%d' % (server, int(port)) + + self.model_name = model_name + + if loop: + self.loop = loop + + def __del__(self): + """Close any open connections.""" + yield self.logout() + + async def apply_config(self, config, application): + """Apply a configuration to the application.""" + self.log.debug("JujuApi: Applying configuration to {}.".format( + application + )) + return await self.set_config(application=application, config=config) + + async def deploy_application(self, charm, name="", path=""): + """Deploy an application.""" + if not self.authenticated: + await self.login() + + # Check that the charm is valid and exists. + if charm is None: + return None + + app = await self.get_application(name) + if app is None: + # TODO: Handle the error if the charm isn't found. + self.log.debug("JujuApi: Deploying charm {} ({}) from {}".format( + charm, + name, + path, + )) + app = await self.model.deploy( + path, + application_name=name, + series='xenial', + ) + return app + deploy_service = deploy_application + + async def get_action_status(self, uuid): + """Get the status of an action.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Waiting for status of action uuid {}".format(uuid)) + action = await self.model.wait_for_action(uuid) + return action.status + + async def get_application(self, application): + """Get the deployed application.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Getting application {}".format(application)) + app = None + if application and self.model: + if self.model.applications: + if application in self.model.applications: + app = self.model.applications[application] + return app + + async def get_application_status(self, application): + """Get the status of an application.""" + if not self.authenticated: + await self.login() + + status = None + app = await self.get_application(application) + if app: + status = app.status + self.log.debug("JujuApi: Status of application {} is {}".format( + application, + str(status), + )) + return status + get_service_status = get_application_status - if log is not None: - self.log = log - else: - self.log = JujuApi._get_logger() + async def get_config(self, application): + """Get the configuration of an application.""" + if not self.authenticated: + await self.login() - if self.log is None: - raise JujuApiError("Logger not defined") + config = None + app = await self.get_application(application) + if app: + config = await app.get_config() - self.version = None - if version: - self.version = version - else: - try: - if Env2: - pass - except NameError: - self.log.warn("Using older version of Juju client, which " \ - "supports only Juju 1.x") - self.version = 1 - - endpoint = 'wss://%s:%d' % (server, int(port)) - self.endpoint = endpoint - - self.charm = None # Charm used - self.service = None # Service deployed - self.units = [] # Storing as list to support more units in future - - self.destroy_retries = 25 # Number retires to destroy service - self.retry_delay = 5 # seconds - - def __str__(self): - return ("JujuApi-{}".format(self.endpoint)) - - @classmethod - def _get_logger(cls): - if cls.log is not None: - return cls.log - - fmt = logging.Formatter( - '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \ - '%(filename)s:%(lineno)d) - %(message)s') - stderr_handler = logging.StreamHandler(stream=sys.stderr) - stderr_handler.setFormatter(fmt) - logging.basicConfig(level=logging.DEBUG) - cls.log = logging.getLogger('juju-api') - cls.log.addHandler(stderr_handler) - - return cls.log - - @staticmethod - def format_charm_name(name): - '''Format the name to valid charm name - - Charm service name accepts only a to z and -. - ''' - - new_name = '' - for c in name: - if c.isdigit(): - c = chr(97 + int(c)) - elif not c.isalpha(): - c = "-" - new_name += c - return new_name.lower() - - def _get_version_tag(self, tag): - version_tag_map = { - 'applications': { - 1: 'Services', - 2: 'applications', - }, - 'units': { - 1: 'Units', - 2: 'units', - }, - 'status': { - 1: 'Status', - 2: 'status', - }, - 'workload-status': { - 1: 'Workload', - 2: 'workload-status', - }, - 'charm-url': { - 1: 'CharmURL', - 2: 'charm-url', - }, - } + self.log.debug("JujuApi: Config of application {} is {}".format( + application, + str(config), + )) - return version_tag_map[tag][self.version] - - def _get_env1(self): - try: - env = Env1(self.endpoint) - l = env.login(self.secret, user=self.user) - return env - - except ConnectionRefusedError as e: - msg = "{}: Failed Juju 1.x connect: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise e - - except Exception as e: - msg = "{}: Failed Juju 1.x connect: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuEnvError(msg) - - def _get_env2(self): - try: - env = Env2(self.endpoint) - l = env.login(self.secret, user=self.user) - except KeyError as e: - msg = "{}: Failed Juju 2.x connect: {}".format(self, e) - self.log.debug(msg) - raise JujuVersionError(msg) - - try: - models = env.models.list() - for m in models['user-models']: - if m['model']['name'] == 'default': - mep = '{}/model/{}/api'.format(self.endpoint, - m['model']['uuid']) - model = Env2(mep, env_uuid=m['model']['uuid']) - l = model.login(self.secret, user=self.user) - break - - if model is None: - raise - - return model - - except Exception as e: - msg = "{}: Failed logging to model: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - env.close() - raise JujuModelError(msg) - - def _get_env(self): - self.log.debug("{}: Connect to endpoint {}". - format(self, self.endpoint)) - - if self.version is None: - # Try version 2 first - try: - env = self._get_env2() - self.version = 2 - - except JujuVersionError as e: - self.log.info("Unable to login as Juju 2.x, trying 1.x") - env = self._get_env1() - self.version = 1 - - return env - - elif self.version == 2: - return self._get_env2() - - elif self.version == 1: - return self._get_env1() + return config - else: - msg = "{}: Unknown version set: {}".format(self, self.version) - self.log.error(msg) - raise JujuVersionError(msg) - - @asyncio.coroutine - def get_env(self): - ''' Connect to the Juju controller''' - env = yield from self.loop.run_in_executor( - None, - self._get_env, - ) - return env - - def _get_status(self, env=None): - if env is None: - env = self._get_env() - - try: - status = env.status() - return status - - except Exception as e: - msg = "{}: exception in getting status: {}". \ - format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuStatusError(msg) - - @asyncio.coroutine - def get_status(self, env=None): - '''Get Juju controller status''' - pf = partial(self._get_status, env=env) - status = yield from self.loop.run_in_executor( - None, - pf, - ) - return status - - def get_all_units(self, status, service=None): - '''Parse the status and get the units''' - results = {} - services = status.get(self._get_version_tag('applications'), {}) - - for svc_name, svc_data in services.items(): - if service and service != svc_name: - continue - units = svc_data[self._get_version_tag('units')] or {} - - results[svc_name] = {} - for unit in units: - results[svc_name][unit] = \ - units[unit][self._get_version_tag('workload-status')] \ - [self._get_version_tag('status')] or None - return results - - - def _get_service_units(self, service=None, status=None, env=None): - if service is None: - service = self.service - - # Optimizing calls to Juju, as currently we deploy only 1 unit per - # service. - if self.service == service and len(self.units): - return self.units - - if env is None: - env = self._get_env() - - if status is None: - status = self._get_status(env=env) - - try: - resp = self.get_all_units(status, service=service) - self.log.debug("Get all units: {}".format(resp)) - units = set(resp[service].keys()) - - if self.service == service: - self.units = units - - return units - - except Exception as e: - msg = "{}: exception in get units {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuUnitsError(msg) - - @asyncio.coroutine - def get_service_units(self, service=None, status=None, env=None): - '''Get the unit names for a service''' - pf = partial(self._get_service_units, - service=service, - status=status, - env=env) - units = yield from self.loop.run_in_executor( - None, - pf, - ) - return units - - def _get_service_status(self, service=None, status=None, env=None): - if env is None: - env = self._get_env() - - if status is None: - status = self._get_status(env=env) - - if service is None: - service = self.service - - try: - srv_status = status[self._get_version_tag('applications')] \ - [service][self._get_version_tag('status')] \ - [self._get_version_tag('status')] - self.log.debug("{}: Service {} status is {}". - format(self, service, srv_status)) - return srv_status - - except KeyError as e: - self.log.info("self: Did not find service {}, e={}".format(self, service, e)) - return 'NA' - - except Exception as e: - msg = "{}: exception checking service status for {}, e {}". \ - format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuStatusError(msg) - - - @asyncio.coroutine - def get_service_status(self, service=None, status=None, env=None): - ''' Get service status - - maintenance : The unit is not yet providing services, but is actively doing stuff. - unknown : Service has finished an event but the charm has not called status-set yet. - waiting : Service is unable to progress to an active state because of dependency. - blocked : Service needs manual intervention to get back to the Running state. - active : Service correctly offering all the services. - NA : Service is not deployed - ''' - pf = partial(self._get_service_status, - service=service, - status=status, - env=env) - srv_status = yield from self.loop.run_in_executor( - None, - pf, - ) - return srv_status - - def _is_service_deployed(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp not in ['terminated', 'NA']: - return True - - return False - - @asyncio.coroutine - def is_service_deployed(self, service=None, status=None, env=None): - '''Check if the service is deployed''' - pf = partial(self._is_service_deployed, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_error(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['error']: - return True - - return False - - @asyncio.coroutine - def is_service_error(self, service=None, status=None, env=None): - '''Check if the service is in error state''' - pf = partial(self._is_service_error, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_maint(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['maintenance']: - return True - - return False - - @asyncio.coroutine - def is_service_maint(self, service=None, status=None, env=None): - '''Check if the service is in error state''' - pf = partial(self._is_service_maint, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_active(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['active']: - return True - - return False - - @asyncio.coroutine - def is_service_active(self, service=None, status=None, env=None): - '''Check if the service is active''' - pf = partial(self._is_service_active, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc - - def _is_service_blocked(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) - - if resp in ['blocked']: - return True - - return False - - @asyncio.coroutine - def is_service_blocked(self, service=None, status=None, env=None): - '''Check if the service is blocked''' - pf = partial(self._is_service_blocked, - service=service, - status=status, - env=env) - rc = yield from self.loop.run_in_executor( - None, - pf, - ) - return rc + async def get_model(self, name='default'): + """Get a model from the Juju Controller. - def _is_service_up(self, service=None, status=None, env=None): - resp = self._get_service_status(service=service, - status=status, - env=env) + Note: Model objects returned must call disconnected() before it goes + out of scope.""" + if not self.authenticated: + await self.login() - if resp in ['active', 'blocked']: - return True + model = Model() - return False + uuid = await self.get_model_uuid(name) - @asyncio.coroutine - def is_service_up(self, service=None, status=None, env=None): - '''Check if the service is installed and up''' - pf = partial(self._is_service_up, - service=service, - status=status, - env=env) + self.log.debug("JujuApi: Connecting to model {} ({})".format( + model, + uuid, + )) - rc = yield from self.loop.run_in_executor( + await model.connect( + self.endpoint, + uuid, + self.user, + self.secret, None, - pf, ) - return rc - def _apply_config(self, config, service=None, env=None): - if service is None: - service = self.service + return model - if config is None or len(config) == 0: - self.log.warn("{}: Empty config passed for service {}". - format(self, service)) - return + async def get_model_uuid(self, name='default'): + """Get the UUID of a model. - if env is None: - env = self._get_env() - - status = self._get_status(env=env) - - if not self._is_service_deployed(service=service, - status=status, - env=env): - raise JujuSrvNotDeployedError("{}: service {} is not deployed". - format(self, service)) - - self.log.debug("{}: Config for service {} update to: {}". - format(self, service, config)) - try: - # Try to fix error on service, most probably due to config issue - if self._is_service_error(service=service, status=status, env=env): - self._resolve_error(service=service, env=env) - - if self.version == 2: - env.service.set(service, config) - else: - env.set_config(service, config) - - except Exception as e: - self.log.error("{}: exception setting config for {} with {}, e {}". - format(self, service, config, e)) - self.log.exception(e) - raise e - - @asyncio.coroutine - def apply_config(self, config, service=None, env=None, wait=True): - '''Apply a config on the service''' - pf = partial(self._apply_config, - config, - service=service, - env=env) - yield from self.loop.run_in_executor( - None, - pf, - ) + Iterate through all models in a controller and find the matching model. + """ + if not self.authenticated: + await self.login() - if wait: - # Wait till config finished applying - self.log.debug("{}: Wait for config apply to finish". - format(self)) - delay = 3 # secs - maint = True - while maint: - # Sleep first to give time for config_changed hook to be invoked - yield from asyncio.sleep(delay, loop=self.loop) - maint = yield from self.is_service_maint(service=service, - env=env) - - err = yield from self.is_service_error(service=service, env=env) - if err: - self.log.error("{}: Service is in error state". - format(self)) - return False - - self.log.debug("{}: Finished applying config".format(self)) - return True - - def _set_parameter(self, parameter, value, service=None): - return self._apply_config({parameter : value}, service=service) - - @asyncio.coroutine - def set_parameter(self, parameter, value, service=None): - '''Set a config parameter for a service''' - return self.apply_config({parameter : value}, service=service) - - def _resolve_error(self, service=None, status=None, env=None): - if env is None: - env = self._get_env() - - if status is None: - status = self._get_status(env=env) - - if service is None: - service = self.service - - if env is None: - env = self._get_env() - if self._is_service_deployed(service=service, status=status): - units = self.get_all_units(status, service=service) - - for unit, ustatus in units[service].items(): - if ustatus == 'error': - self.log.info("{}: Found unit {} with status {}". - format(self, unit, ustatus)) - try: - # Takes the unit name as service_name/idx unlike action - env.resolved(unit) - - except Exception as e: - msg = "{}: Resolve on unit {}: {}". \ - format(self, unit, e) - self.log.error(msg) - self.log.exception(e) - raise JujuResolveError(msg) - - @asyncio.coroutine - def resolve_error(self, service=None, status=None, env=None): - '''Resolve units in error state''' - pf = partial(self._resolve_error, - service=service, - status=status, - env=env) - yield from self.loop.run_in_executor( - None, - pf, - ) + uuid = None - def _deploy_service(self, charm, service, - path=None, config=None, env=None): - self.log.debug("{}: Deploy service for charm {}({}) with service {}". - format(self, charm, path, service)) + models = await self.controller.get_models() - if env is None: - env = self._get_env() + self.log.debug("JujuApi: Looking through {} models for model {}".format( + len(models.user_models), + name, + )) + for model in models.user_models: + if model.model.name == name: + uuid = model.model.uuid + break - self.service = service - self.charm = charm + return uuid - if self._is_service_deployed(service=service, env=env): - self.log.info("{}: Charm service {} already deployed". - format (self, service)) - if config: - self._apply_config(config, service=service, env=env) - return + async def get_status(self): + """Get the model status.""" + if not self.authenticated: + await self.login() - series = "trusty" - - deploy_to = None - if self.version == 1: - deploy_to = "lxc:0" - - if path is None: - prefix=os.getenv('RIFT_INSTALL', '/') - path = os.path.join(prefix, 'usr/rift/charms', series, charm) - - try: - self.log.debug("{}: Local charm settings: dir={}, series={}". - format(self, path, series)) - result = env.add_local_charm_dir(path, series) - url = result[self._get_version_tag('charm-url')] - - except Exception as e: - msg = '{}: Error setting local charm directory {} for {}: {}'. \ - format(self, path, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuAddCharmError(msg) - - try: - self.log.debug("{}: Deploying using: service={}, url={}, to={}, config={}". - format(self, service, url, deploy_to, config)) - env.deploy(service, url, config=config, machine_spec=deploy_to) - - except Exception as e: - msg = '{}: Error deploying {}: {}'.format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuDeployError(msg) - - @asyncio.coroutine - def deploy_service(self, charm, service, - wait=False, timeout=300, - path=None, config=None): - '''Deploy a service using the charm name provided''' - env = yield from self.get_env() - - pf = partial(self._deploy_service, - charm, - service, - path=path, - config=config, - env=env) - yield from self.loop.run_in_executor( - None, - pf, - ) + if not self.model: + self.model = self.get_model(self.model_name) - rc = True - if wait is True: - # Wait for the deployed units to start - try: - self.log.debug("{}: Waiting for service {} to come up". - format(self, service)) - rc = yield from self.wait_for_service(timeout=timeout, env=env) - - except Exception as e: - msg = '{}: Error starting all units for {}: {}'. \ - format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuWaitUnitsError(msg) - - return rc - - @asyncio.coroutine - def wait_for_service(self, service=None, timeout=0, env=None): - '''Wait for the service to come up''' - if service is None: - service = self.service - - if env is None: - env = yield from self.get_env() - - status = yield from self.get_status(env=env) - - if self._is_service_up(service=service, status=status, env=env): - self.log.debug("{}: Service {} is already up". - format(self, service)) - return True - - # Check if service is deployed - if not self._is_service_deployed(service=service, status=status, env=env): - raise JujuSrvNotDeployedError("{}: service {} is not deployed". - format(self, service)) - - if timeout < 0: - timeout = 0 - - count = 0 - delay = self.retry_delay # seconds - self.log.debug("{}: In wait for service {}".format(self, service)) - - start_time = time.time() - max_time = time.time() + timeout - while timeout != 0 and (time.time() <= max_time): - count += 1 - rc = yield from self.is_service_up(service=service, env=env) - if rc: - self.log.debug("{}: Service {} is up after {} seconds". - format(self, service, time.time()-start_time)) - return True - yield from asyncio.sleep(delay, loop=self.loop) - return False - - def _destroy_service(self, service=None): - '''Destroy a service on Juju controller''' - self.log.debug("{}: Destroy charm service: {}".format(self,service)) - - if service is None: - service = self.service - - env = self._get_env() - - status = self._get_status(env=env) - - count = 0 - while self._is_service_deployed(service=service, status=status, env=env): - count += 1 - self.log.debug("{}: Destroy service {}, count {}". - format(self, service, count)) - - if count > self.destroy_retries: - msg = "{}: Not able to destroy service {} after {} tries". \ - format(self, service, count) - self.log.error(msg) - raise JujuDestroyError(msg) - - - if self._is_service_error(service=service, status=status): - self._resolve_error(service, status) - - try: - env.destroy_service(service) - - except Exception as e: - msg = "{}: Exception when running destroy on service {}: {}". \ - format(self, service, e) - self.log.error(msg) - self.log.exception(e) - raise JujuDestroyError(msg) - - time.sleep(self.retry_delay) - status = self._get_status(env=env) - - self.log.debug("{}: Destroyed service {} ({})". - format(self, service, count)) - - @asyncio.coroutine - def destroy_service(self, service=None): - '''Destroy a service on Juju controller''' - pf = partial(self._destroy_service, - service=service) - yield from self.loop.run_in_executor( - None, - pf, - ) + class model_state: + applications = {} + machines = {} + relations = {} + self.log.debug("JujuApi: Getting model status") + status = model_state() + status.applications = self.model.applications + status.machines = self.model.machines - def _get_action_status(self, action_tag, env=None): - if env is None: - env = self._get_env() - - if not action_tag.startswith('action-'): - action_tag = 'action-{}'.format(action_tag) - - try: - action = env.actions - except Exception as e: - msg = "{}: exception in Action API: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionApiError(msg) - - try: - status = action.info([{'Tag': action_tag}]) - - self.log.debug("{}: Action {} status {}". - format(self, action_tag, status)) - return status['results'][0] - - except Exception as e: - msg = "{}: exception in get action status {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionInfoError(msg) - - @asyncio.coroutine - def get_action_status(self, action_tag, env=None): - ''' - Get the status of an action queued on the controller - - responds with the action status, which is one of three values: - - - completed - - pending - - failed - - @param action_tag - the action UUID return from the enqueue method - eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9 - ''' - pf = partial(self._get_action_status, - action_tag, - env=env,) - status = yield from self.loop.run_in_executor( - None, - pf, - ) return status - def _execute_action(self, action_name, params, service=None, env=None): - '''Execute the action on all units of a service''' - if service is None: - service = self.service - - if env is None: - env = self._get_env() - - try: - action = env.actions - except Exception as e: - msg = "{}: exception in Action API: {}".format(self, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionApiError(msg) - - units = self._get_service_units(service) - self.log.debug("{}: Apply action {} on units {}". - format(self, action_name, units)) - - # Rename units from / to unit-- - unit_tags = [] - for unit in units: - idx = int(unit[unit.index('/')+1:]) - unit_name = "unit-%s-%d" % (service, idx) - unit_tags.append(unit_name) - self.log.debug("{}: Unit tags for action: {}". - format(self, unit_tags)) - - try: - result = action.enqueue_units(unit_tags, action_name, params) - self.log.debug("{}: Response for action: {}". - format(self, result)) - return result['results'][0] - - except Exception as e: - msg = "{}: Exception enqueing action {} on units {} with " \ - "params {}: {}".format(self, action, unit_tags, params, e) - self.log.error(msg) - self.log.exception(e) - raise JujuActionExecError(msg) - - @asyncio.coroutine - def execute_action(self, action_name, params, service=None, env=None): - '''Execute an action for a service on the controller - - Currently, we execute the action on all units of the service - ''' - pf = partial(self._execute_action, - action_name, - params, - service=service, - env=env) - result = yield from self.loop.run_in_executor( - None, - pf, - ) - return result + async def is_application_active(self, application): + """Check if the application is in an active state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['active']: + state = True + + self.log.debug("JujuApi: Application {} is {} active".format( + application, + "" if status else "not", + )) + + return state + is_service_active = is_application_active + + async def is_application_blocked(self, application): + """Check if the application is in a blocked state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['blocked']: + state = True + + self.log.debug("JujuApi: Application {} is {} blocked".format( + application, + "" if status else "not", + )) + + return state + is_service_blocked = is_application_blocked + + async def is_application_deployed(self, application): + """Check if the application is in a deployed state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['active']: + state = True + self.log.debug("JujuApi: Application {} is {} deployed".format( + application, + "" if status else "not", + )) + + return state + is_service_deployed = is_application_deployed + + async def is_application_error(self, application): + """Check if the application is in an error state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['error']: + state = True + self.log.debug("JujuApi: Application {} is {} errored".format( + application, + "" if status else "not", + )) + + return state + is_service_error = is_application_error + + async def is_application_maint(self, application): + """Check if the application is in a maintenance state.""" + if not self.authenticated: + await self.login() + + state = False + status = await self.get_application_status(application) + if status and status in ['maintenance']: + state = True + self.log.debug("JujuApi: Application {} is {} in maintenence".format( + application, + "" if status else "not", + )) + + return state + is_service_maint = is_application_maint + + async def is_application_up(self, application=None): + """Check if the application is up.""" + if not self.authenticated: + await self.login() + state = False + + status = await self.get_application_status(application) + if status and status in ['active', 'blocked']: + state = True + self.log.debug("JujuApi: Application {} is {} up".format( + application, + "" if status else "not", + )) + return state + is_service_up = is_application_up + + async def login(self): + """Login to the Juju controller.""" + if self.authenticated: + return + cacert = None + self.controller = Controller() + + self.log.debug("JujuApi: Logging into controller") + + if self.secret: + await self.controller.connect( + self.endpoint, + self.user, + self.secret, + cacert, + ) + else: + await self.controller.connect_current() + self.authenticated = True + self.model = await self.get_model(self.model_name) -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Test Juju') - parser.add_argument("-s", "--server", default='10.0.202.49', help="Juju controller") - parser.add_argument("-u", "--user", default='admin', help="User, default user-admin") - parser.add_argument("-p", "--password", default='nfvjuju', help="Password for the user") - parser.add_argument("-P", "--port", default=17070, help="Port number, default 17070") - parser.add_argument("-d", "--directory", help="Local directory for the charm") - parser.add_argument("--service", help="Charm service name") - parser.add_argument("--vnf-ip", help="IP of the VNF to configure") - args = parser.parse_args() + async def logout(self): + """Logout of the Juju controller.""" + if not self.authenticated: + return - api = JujuApi(server=args.server, - port=args.port, - user=args.user, - secret=args.password) + if self.model: + await self.model.disconnect() + self.model = None + if self.controller: + await self.controller.disconnect() + self.controller = None + + self.authenticated = False + + async def remove_application(self, name): + """Remove the application.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(name) + if app: + self.log.debug("JujuApi: Destroying application {}".format( + name, + )) + + await app.destroy() + + async def resolve_error(self, application=None): + """Resolve units in error state.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(application) + if app: + self.log.debug("JujuApi: Resolving errors for application {}".format( + application, + )) + + for unit in app.units: + app.resolved(retry=True) + + async def run_action(self, application, action_name, **params): + """Execute an action and return an Action object.""" + if not self.authenticated: + await self.login() + result = { + 'status': '', + 'action': { + 'tag': None, + 'results': None, + } + } + app = await self.get_application(application) + if app: + # We currently only have one unit per application + # so use the first unit available. + unit = app.units[0] - env = api._get_env() - if env is None: - raise "Not able to login to the Juju controller" + self.log.debug("JujuApi: Running Action {} against Application {}".format( + action_name, + application, + )) - print("Status: {}".format(api._get_status(env=env))) + action = await unit.run_action(action_name, **params) - if args.directory and args.service: - # Deploy the charm - charm = os.path.basename(args.directory) - api._deploy_service(charm, args.service, - path=args.directory, - env=env) + # Wait for the action to complete + await action.wait() - while not api._is_service_up(): - time.sleep(5) + result['status'] = action.status + result['action']['tag'] = action.data['id'] + result['action']['results'] = action.results - print ("Service {} is deployed with status {}". - format(args.service, api._get_service_status())) + return result + execute_action = run_action + + async def set_config(self, application, config): + """Apply a configuration to the application.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(application) + if app: + self.log.debug("JujuApi: Setting config for Application {}".format( + application, + )) + await app.set_config(config) + + # Verify the config is set + newconf = await app.get_config() + for key in config: + if config[key] != newconf[key]: + self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key])) + + + async def set_parameter(self, parameter, value, application=None): + """Set a config parameter for a service.""" + if not self.authenticated: + await self.login() + + self.log.debug("JujuApi: Setting {}={} for Application {}".format( + parameter, + value, + application, + )) + return await self.apply_config( + {parameter: value}, + application=application, + ) - if args.vnf_ip and \ - ('clearwater-aio' in args.directory): - # Execute config on charm - api._apply_config({'proxied_ip': args.vnf_ip}) + async def wait_for_application(self, name, timeout=300): + """Wait for an application to become active.""" + if not self.authenticated: + await self.login() + + app = await self.get_application(name) + if app: + self.log.debug("JujuApi: Waiting {} seconds for Application {}".format( + timeout, + name, + )) + + await self.model.block_until( + lambda: all( + unit.agent_status == 'idle' + and unit.workload_status + in ['active', 'unknown'] for unit in app.units + ), + timeout=timeout, + ) + + +def get_argparser(): + parser = argparse.ArgumentParser(description='Test Juju') + parser.add_argument( + "-s", "--server", + default='10.0.202.49', + help="Juju controller" + ) + parser.add_argument( + "-u", "--user", + default='admin', + help="User, default user-admin" + ) + parser.add_argument( + "-p", "--password", + default='', + help="Password for the user" + ) + parser.add_argument( + "-P", "--port", + default=17070, + help="Port number, default 17070" + ) + parser.add_argument( + "-d", "--directory", + help="Local directory for the charm" + ) + parser.add_argument( + "--application", + help="Charm name" + ) + parser.add_argument( + "--vnf-ip", + help="IP of the VNF to configure" + ) + parser.add_argument( + "-m", "--model", + default='default', + help="The model to connect to." + ) + return parser.parse_args() - while not api._is_service_active(): - time.sleep(10) - print ("Service {} is in status {}". - format(args.service, api._get_service_status())) +if __name__ == "__main__": + args = get_argparser() - res = api._execute_action('create-update-user', {'number': '125252352525', - 'password': 'asfsaf'}) + # Set logging level to debug so we can see verbose output from the + # juju library. + logging.basicConfig(level=logging.DEBUG) - print ("Action 'creat-update-user response: {}".format(res)) + # Quiet logging from the websocket library. If you want to see + # everything sent over the wire, set this to DEBUG. + ws_logger = logging.getLogger('websockets.protocol') + ws_logger.setLevel(logging.INFO) - status = res['status'] - while status not in [ 'completed', 'failed' ]: - time.sleep(2) - status = api._get_action_status(res['action']['tag'])['status'] + endpoint = '%s:%d' % (args.server, int(args.port)) - print("Action status: {}".format(status)) + loop = asyncio.get_event_loop() - # This action will fail as the number is non-numeric - res = api._execute_action('delete-user', {'number': '125252352525asf'}) + api = JujuApi(server=args.server, + port=args.port, + user=args.user, + secret=args.password, + loop=loop, + log=ws_logger, + model_name=args.model + ) - print ("Action 'delete-user response: {}".format(res)) + juju.loop.run(api.login()) - status = res['status'] - while status not in [ 'completed', 'failed' ]: - time.sleep(2) - status = api._get_action_status(res['action']['tag'])['status'] + status = juju.loop.run(api.get_status()) - print("Action status: {}".format(status)) + print('Applications:', list(status.applications.keys())) + print('Machines:', list(status.machines.keys())) + + if args.directory and args.application: + # Deploy the charm + charm = os.path.basename(args.directory) + juju.loop.run( + api.deploy_application(charm, + name=args.application, + path=args.directory, + ) + ) + + juju.loop.run(api.wait_for_application(charm)) + + # Wait for the service to come up + up = juju.loop.run(api.is_application_up(charm)) + print("Application is {}".format("up" if up else "down")) + + print("Service {} is deployed".format(args.application)) + + ########################### + # Execute config on charm # + ########################### + config = juju.loop.run(api.get_config(args.application)) + hostname = config['ssh-username']['value'] + rhostname = hostname[::-1] + + # Apply the configuration + juju.loop.run(api.apply_config( + {'ssh-username': rhostname}, application=args.application + )) + + # Get the configuration + config = juju.loop.run(api.get_config(args.application)) + + # Verify the configuration has been updated + assert(config['ssh-username']['value'] == rhostname) + + #################################### + # Get the status of an application # + #################################### + status = juju.loop.run(api.get_application_status(charm)) + print("Application Status: {}".format(status)) + + ########################### + # Execute a simple action # + ########################### + result = juju.loop.run(api.run_action(charm, 'get-ssh-public-key')) + print("Action {} status is {} and returned {}".format( + result['status'], + result['action']['tag'], + result['action']['results'] + )) + + ##################################### + # Execute an action with parameters # + ##################################### + result = juju.loop.run( + api.run_action(charm, 'run', command='hostname') + ) + print("Action {} status is {} and returned {}".format( + result['status'], + result['action']['tag'], + result['action']['results'] + )) + + juju.loop.run(api.logout()) + + loop.close() + + # if args.vnf_ip and \ + # ('clearwater-aio' in args.directory): + # # Execute config on charm + # api._apply_config({'proxied_ip': args.vnf_ip}) + # + # while not api._is_service_active(): + # time.sleep(10) + # + # print ("Service {} is in status {}". + # format(args.service, api._get_service_status())) + # + # res = api._execute_action('create-update-user', {'number': '125252352525', + # 'password': 'asfsaf'}) + # + # print ("Action 'creat-update-user response: {}".format(res)) + # + # status = res['status'] + # while status not in [ 'completed', 'failed' ]: + # time.sleep(2) + # status = api._get_action_status(res['action']['tag'])['status'] + # + # print("Action status: {}".format(status)) + # + # # This action will fail as the number is non-numeric + # res = api._execute_action('delete-user', {'number': '125252352525asf'}) + # + # print ("Action 'delete-user response: {}".format(res)) + # + # status = res['status'] + # while status not in [ 'completed', 'failed' ]: + # time.sleep(2) + # status = api._get_action_status(res['action']['tag'])['status'] + # + # print("Action status: {}".format(status))