| ############################################################################ |
| # Copyright 2016 RIFT.io Inc # |
| # # |
| # Licensed under the Apache License, Version 2.0 (the "License"); # |
| # you may not use this file except in compliance with the License. # |
| # You may obtain a copy of the License at # |
| # # |
| # http://www.apache.org/licenses/LICENSE-2.0 # |
| # # |
| # Unless required by applicable law or agreed to in writing, software # |
| # distributed under the License is distributed on an "AS IS" BASIS, # |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # |
| # See the License for the specific language governing permissions and # |
| # limitations under the License. # |
| ############################################################################ |
| |
| import argparse |
| import asyncio |
| import logging |
| import os |
| import ssl |
| |
| import juju.loop |
| from juju.controller import Controller |
| from juju.model import Model, ModelObserver |
| |
| try: |
| ssl._create_default_https_context = ssl._create_unverified_context |
| except AttributeError: |
| # Legacy Python doesn't verify by default (see pep-0476) |
| # https://www.python.org/dev/peps/pep-0476/ |
| pass |
| |
| |
| class JujuVersionError(Exception): |
| pass |
| |
| |
| class JujuApiError(Exception): |
| pass |
| |
| |
| class JujuEnvError(JujuApiError): |
| pass |
| |
| |
| class JujuModelError(JujuApiError): |
| pass |
| |
| |
| class JujuStatusError(JujuApiError): |
| pass |
| |
| |
| class JujuUnitsError(JujuApiError): |
| pass |
| |
| |
| class JujuWaitUnitsError(JujuApiError): |
| pass |
| |
| |
| class JujuSrvNotDeployedError(JujuApiError): |
| pass |
| |
| |
| class JujuAddCharmError(JujuApiError): |
| pass |
| |
| |
| class JujuDeployError(JujuApiError): |
| pass |
| |
| |
| class JujuDestroyError(JujuApiError): |
| pass |
| |
| |
| class JujuResolveError(JujuApiError): |
| pass |
| |
| |
| class JujuActionError(JujuApiError): |
| pass |
| |
| |
| class JujuActionApiError(JujuActionError): |
| pass |
| |
| |
| class JujuActionInfoError(JujuActionError): |
| pass |
| |
| |
| class JujuActionExecError(JujuActionError): |
| pass |
| |
| |
| class JujuAuthenticationError(Exception): |
| pass |
| |
| |
| class JujuMonitor(ModelObserver): |
| """Monitor state changes within the Juju Model.""" |
| # async def on_change(self, delta, old, new, model): |
| # """React to changes in the Juju model.""" |
| # |
| # # TODO: Setup the hook to update the UI if the status of a unit changes |
| # # to be used when deploying a charm and waiting for it to be "ready" |
| # if delta.entity in ['application', 'unit'] and delta.type == "change": |
| # pass |
| # |
| # # TODO: Add a hook when an action is complete |
| |
| pass |
| |
| |
| class JujuApi(object): |
| """JujuApi wrapper on jujuclient library. |
| |
| There should be one instance of JujuApi for each VNF manged by Juju. |
| |
| Assumption: |
| Currently we use one unit per service/VNF. So once a service |
| is deployed, we store the unit name and reuse it |
| """ |
| |
| log = None |
| controller = None |
| models = {} |
| model = None |
| model_name = None |
| model_uuid = None |
| authenticated = False |
| |
| def __init__(self, |
| log=None, |
| loop=None, |
| server='127.0.0.1', |
| port=17070, |
| user='admin', |
| secret=None, |
| version=None, |
| model_name='default', |
| ): |
| """Initialize with the Juju credentials.""" |
| |
| if log: |
| self.log = log |
| else: |
| self.log = logging.getLogger(__name__) |
| |
| # Quiet websocket traffic |
| logging.getLogger('websockets.protocol').setLevel(logging.INFO) |
| |
| self.log.debug('JujuApi: instantiated') |
| |
| self.server = server |
| self.port = port |
| |
| self.secret = secret |
| if user.startswith('user-'): |
| self.user = user |
| else: |
| self.user = 'user-{}'.format(user) |
| |
| self.endpoint = '%s:%d' % (server, int(port)) |
| |
| self.model_name = model_name |
| |
| if loop: |
| self.loop = loop |
| |
| def __del__(self): |
| """Close any open connections.""" |
| yield self.logout() |
| |
| async def add_relation(self, a, b, via=None): |
| """ |
| Add a relation between two application endpoints. |
| |
| :param a An application endpoint |
| :param b An application endpoint |
| :param via The egress subnet(s) for outbound traffic, e.g., (192.168.0.0/16,10.0.0.0/8) |
| """ |
| if not self.authenticated: |
| await self.login() |
| |
| m = await self.get_model() |
| try: |
| m.add_relation(a, b, via) |
| finally: |
| await m.disconnect() |
| |
| async def apply_config(self, config, application): |
| """Apply a configuration to the application.""" |
| self.log.debug("JujuApi: Applying configuration to {}.".format( |
| application |
| )) |
| return await self.set_config(application=application, config=config) |
| |
| async def deploy_application(self, charm, name="", path="", specs={}): |
| """ |
| Deploy an application. |
| |
| Deploy an application to a container or a machine already provisioned |
| by the OSM Resource Orchestrator (requires the Juju public ssh key |
| installed on the new machine via cloud-init). |
| |
| :param str charm: The name of the charm |
| :param str name: The name of the application, if different than the charm |
| :param str path: The path to the charm |
| :param dict machine: A dictionary identifying the machine to manage via Juju |
| Examples:: |
| |
| deploy_application(..., specs={'host': '10.0.0.4', 'user': 'ubuntu'}) |
| """ |
| if not self.authenticated: |
| await self.login() |
| |
| # Check that the charm is valid and exists. |
| if charm is None: |
| return None |
| |
| app = await self.get_application(name) |
| if app is None: |
| |
| # Check for specific machine placement |
| to = None |
| if all(k in specs for k in ['hostname', 'username']): |
| machine = await self.model.add_machine(spec='ssh:%@%'.format( |
| specs['host'], |
| specs['user'], |
| )) |
| to = machine.id |
| |
| # TODO: Handle the error if the charm isn't found. |
| self.log.debug("JujuApi: Deploying charm {} ({}) from {}".format( |
| charm, |
| name, |
| path, |
| to=to, |
| )) |
| app = await self.model.deploy( |
| path, |
| application_name=name, |
| series='xenial', |
| ) |
| return app |
| deploy_service = deploy_application |
| |
| async def get_action_status(self, uuid): |
| """Get the status of an action.""" |
| if not self.authenticated: |
| await self.login() |
| |
| self.log.debug("JujuApi: Waiting for status of action uuid {}".format(uuid)) |
| action = await self.model.wait_for_action(uuid) |
| return action.status |
| |
| async def get_application(self, application): |
| """Get the deployed application.""" |
| if not self.authenticated: |
| await self.login() |
| |
| self.log.debug("JujuApi: Getting application {}".format(application)) |
| app = None |
| if application and self.model: |
| if self.model.applications: |
| if application in self.model.applications: |
| app = self.model.applications[application] |
| return app |
| |
| async def get_application_status(self, application): |
| """Get the status of an application.""" |
| if not self.authenticated: |
| await self.login() |
| |
| status = None |
| app = await self.get_application(application) |
| if app: |
| status = app.status |
| self.log.debug("JujuApi: Status of application {} is {}".format( |
| application, |
| str(status), |
| )) |
| return status |
| get_service_status = get_application_status |
| |
| async def get_config(self, application): |
| """Get the configuration of an application.""" |
| if not self.authenticated: |
| await self.login() |
| |
| config = None |
| app = await self.get_application(application) |
| if app: |
| config = await app.get_config() |
| |
| self.log.debug("JujuApi: Config of application {} is {}".format( |
| application, |
| str(config), |
| )) |
| |
| return config |
| |
| async def get_model(self, name='default'): |
| """Get a model from the Juju Controller. |
| |
| Note: Model objects returned must call disconnected() before it goes |
| out of scope.""" |
| if not self.authenticated: |
| await self.login() |
| |
| model = Model() |
| |
| uuid = await self.get_model_uuid(name) |
| |
| self.log.debug("JujuApi: Connecting to model {} ({})".format( |
| model, |
| uuid, |
| )) |
| |
| await model.connect( |
| self.endpoint, |
| uuid, |
| self.user, |
| self.secret, |
| None, |
| ) |
| |
| return model |
| |
| async def get_model_uuid(self, name='default'): |
| """Get the UUID of a model. |
| |
| Iterate through all models in a controller and find the matching model. |
| """ |
| if not self.authenticated: |
| await self.login() |
| |
| uuid = None |
| |
| models = await self.controller.get_models() |
| |
| self.log.debug("JujuApi: Looking through {} models for model {}".format( |
| len(models.user_models), |
| name, |
| )) |
| for model in models.user_models: |
| if model.model.name == name: |
| uuid = model.model.uuid |
| break |
| |
| return uuid |
| |
| async def get_status(self): |
| """Get the model status.""" |
| if not self.authenticated: |
| await self.login() |
| |
| if not self.model: |
| self.model = self.get_model(self.model_name) |
| |
| class model_state: |
| applications = {} |
| machines = {} |
| relations = {} |
| |
| self.log.debug("JujuApi: Getting model status") |
| status = model_state() |
| status.applications = self.model.applications |
| status.machines = self.model.machines |
| |
| return status |
| |
| async def is_application_active(self, application): |
| """Check if the application is in an active state.""" |
| if not self.authenticated: |
| await self.login() |
| |
| state = False |
| status = await self.get_application_status(application) |
| if status and status in ['active']: |
| state = True |
| |
| self.log.debug("JujuApi: Application {} is {} active".format( |
| application, |
| "" if status else "not", |
| )) |
| |
| return state |
| is_service_active = is_application_active |
| |
| async def is_application_blocked(self, application): |
| """Check if the application is in a blocked state.""" |
| if not self.authenticated: |
| await self.login() |
| |
| state = False |
| status = await self.get_application_status(application) |
| if status and status in ['blocked']: |
| state = True |
| |
| self.log.debug("JujuApi: Application {} is {} blocked".format( |
| application, |
| "" if status else "not", |
| )) |
| |
| return state |
| is_service_blocked = is_application_blocked |
| |
| async def is_application_deployed(self, application): |
| """Check if the application is in a deployed state.""" |
| if not self.authenticated: |
| await self.login() |
| |
| state = False |
| status = await self.get_application_status(application) |
| if status and status in ['active']: |
| state = True |
| self.log.debug("JujuApi: Application {} is {} deployed".format( |
| application, |
| "" if status else "not", |
| )) |
| |
| return state |
| is_service_deployed = is_application_deployed |
| |
| async def is_application_error(self, application): |
| """Check if the application is in an error state.""" |
| if not self.authenticated: |
| await self.login() |
| |
| state = False |
| status = await self.get_application_status(application) |
| if status and status in ['error']: |
| state = True |
| self.log.debug("JujuApi: Application {} is {} errored".format( |
| application, |
| "" if status else "not", |
| )) |
| |
| return state |
| is_service_error = is_application_error |
| |
| async def is_application_maint(self, application): |
| """Check if the application is in a maintenance state.""" |
| if not self.authenticated: |
| await self.login() |
| |
| state = False |
| status = await self.get_application_status(application) |
| if status and status in ['maintenance']: |
| state = True |
| self.log.debug("JujuApi: Application {} is {} in maintenence".format( |
| application, |
| "" if status else "not", |
| )) |
| |
| return state |
| is_service_maint = is_application_maint |
| |
| async def is_application_up(self, application=None): |
| """Check if the application is up.""" |
| if not self.authenticated: |
| await self.login() |
| state = False |
| |
| status = await self.get_application_status(application) |
| if status and status in ['active', 'blocked']: |
| state = True |
| self.log.debug("JujuApi: Application {} is {} up".format( |
| application, |
| "" if status else "not", |
| )) |
| return state |
| is_service_up = is_application_up |
| |
| async def login(self): |
| """Login to the Juju controller.""" |
| if self.authenticated: |
| return |
| cacert = None |
| self.controller = Controller() |
| |
| self.log.debug("JujuApi: Logging into controller") |
| |
| if self.secret: |
| await self.controller.connect( |
| self.endpoint, |
| self.user, |
| self.secret, |
| cacert, |
| ) |
| else: |
| await self.controller.connect_current() |
| |
| self.authenticated = True |
| self.model = await self.get_model(self.model_name) |
| |
| async def logout(self): |
| """Logout of the Juju controller.""" |
| if not self.authenticated: |
| return |
| |
| if self.model: |
| await self.model.disconnect() |
| self.model = None |
| if self.controller: |
| await self.controller.disconnect() |
| self.controller = None |
| |
| self.authenticated = False |
| |
| async def remove_application(self, name): |
| """Remove the application.""" |
| if not self.authenticated: |
| await self.login() |
| |
| app = await self.get_application(name) |
| if app: |
| self.log.debug("JujuApi: Destroying application {}".format( |
| name, |
| )) |
| |
| await app.destroy() |
| |
| async def remove_relation(self, a, b): |
| """ |
| Remove a relation between two application endpoints |
| |
| :param a An application endpoint |
| :param b An application endpoint |
| """ |
| if not self.authenticated: |
| await self.login() |
| |
| m = await self.get_model() |
| try: |
| m.remove_relation(a, b) |
| finally: |
| await m.disconnect() |
| |
| async def resolve_error(self, application=None): |
| """Resolve units in error state.""" |
| if not self.authenticated: |
| await self.login() |
| |
| app = await self.get_application(application) |
| if app: |
| self.log.debug("JujuApi: Resolving errors for application {}".format( |
| application, |
| )) |
| |
| for unit in app.units: |
| app.resolved(retry=True) |
| |
| async def run_action(self, application, action_name, **params): |
| """Execute an action and return an Action object.""" |
| if not self.authenticated: |
| await self.login() |
| result = { |
| 'status': '', |
| 'action': { |
| 'tag': None, |
| 'results': None, |
| } |
| } |
| app = await self.get_application(application) |
| if app: |
| # We currently only have one unit per application |
| # so use the first unit available. |
| unit = app.units[0] |
| |
| self.log.debug("JujuApi: Running Action {} against Application {}".format( |
| action_name, |
| application, |
| )) |
| |
| action = await unit.run_action(action_name, **params) |
| |
| # Wait for the action to complete |
| await action.wait() |
| |
| result['status'] = action.status |
| result['action']['tag'] = action.data['id'] |
| result['action']['results'] = action.results |
| |
| return result |
| execute_action = run_action |
| |
| async def set_config(self, application, config): |
| """Apply a configuration to the application.""" |
| if not self.authenticated: |
| await self.login() |
| |
| app = await self.get_application(application) |
| if app: |
| self.log.debug("JujuApi: Setting config for Application {}".format( |
| application, |
| )) |
| await app.set_config(config) |
| |
| # Verify the config is set |
| newconf = await app.get_config() |
| for key in config: |
| if config[key] != newconf[key]: |
| self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key])) |
| |
| |
| async def set_parameter(self, parameter, value, application=None): |
| """Set a config parameter for a service.""" |
| if not self.authenticated: |
| await self.login() |
| |
| self.log.debug("JujuApi: Setting {}={} for Application {}".format( |
| parameter, |
| value, |
| application, |
| )) |
| return await self.apply_config( |
| {parameter: value}, |
| application=application, |
| ) |
| |
| async def wait_for_application(self, name, timeout=300): |
| """Wait for an application to become active.""" |
| if not self.authenticated: |
| await self.login() |
| |
| app = await self.get_application(name) |
| if app: |
| self.log.debug("JujuApi: Waiting {} seconds for Application {}".format( |
| timeout, |
| name, |
| )) |
| |
| await self.model.block_until( |
| lambda: all( |
| unit.agent_status == 'idle' |
| and unit.workload_status |
| in ['active', 'unknown'] for unit in app.units |
| ), |
| timeout=timeout, |
| ) |
| |
| |
| def get_argparser(): |
| parser = argparse.ArgumentParser(description='Test Driver for Juju API') |
| |
| ################### |
| # Authentication # |
| ################### |
| parser.add_argument( |
| "-s", "--server", |
| default='10.0.202.49', |
| help="Juju controller", |
| ) |
| parser.add_argument( |
| "-u", "--user", |
| default='admin', |
| help="User, default user-admin" |
| ) |
| parser.add_argument( |
| "-p", "--password", |
| default='', |
| help="Password for the user" |
| ) |
| parser.add_argument( |
| "-P", "--port", |
| default=17070, |
| help="Port number, default 17070" |
| ) |
| parser.add_argument( |
| "-m", "--model", |
| default='default', |
| help="The model to connect to." |
| ) |
| |
| ########## |
| # Charm # |
| ########## |
| parser.add_argument( |
| "-d", "--directory", |
| help="Local directory for the charm" |
| ) |
| parser.add_argument( |
| "--application", |
| help="Charm name" |
| ) |
| |
| ############# |
| # Placement # |
| ############# |
| |
| """ |
| To deploy to a non-Juju machine, provide the host and |
| credentials for Juju to manually provision (host, username, (password or key?)) |
| |
| """ |
| parser.add_argument( |
| "--proxy", |
| action='store_true', |
| help="Deploy as a proxy charm.", |
| ) |
| parser.add_argument( |
| "--no-proxy", |
| action='store_false', |
| dest='proxy', |
| help="Deploy as a full charm.", |
| ) |
| parser.set_defaults(proxy=True) |
| |
| # Test options? |
| # unit test? |
| ####### |
| # VNF # |
| ####### |
| |
| return parser.parse_args() |
| |
| |
| async def deploy_charm_and_wait(): |
| args = get_argparser() |
| |
| # Set logging level to debug so we can see verbose output from the |
| # juju library. |
| logging.basicConfig(level=logging.DEBUG) |
| |
| # Quiet logging from the websocket library. If you want to see |
| # everything sent over the wire, set this to DEBUG. |
| ws_logger = logging.getLogger('websockets.protocol') |
| ws_logger.setLevel(logging.INFO) |
| |
| """Here's an example of a coroutine that will deploy a charm and wait until |
| it's ready to be used.""" |
| api = JujuApi(server=args.server, |
| port=args.port, |
| user=args.user, |
| secret=args.password, |
| # loop=loop, |
| log=ws_logger, |
| model_name=args.model |
| ) |
| print("Logging in...") |
| await api.login() |
| |
| if api.authenticated: |
| status = await api.get_status() |
| print('Applications:', list(status.applications.keys())) |
| print('Machines:', list(status.machines.keys())) |
| |
| if args.directory and args.application: |
| |
| |
| |
| # Deploy the charm |
| charm = os.path.basename( |
| os.path.expanduser( |
| os.path.dirname(args.directory) |
| ) |
| ) |
| await api.deploy_application(charm, |
| name=args.application, |
| path=args.directory, |
| ) |
| |
| # Wait for the application to fully deploy. This will block until the |
| # agent is in an idle state, and the charm's workload is either |
| # 'active' or 'unknown', meaning it's ready but the author did not |
| # explicitly set a workload state. |
| print("Waiting for application '{}' to deploy...".format(charm)) |
| while (True): |
| # Deploy the charm and wait, periodically checking its status |
| await api.wait_for_application(charm, 30) |
| |
| error = await api.is_application_error(charm) |
| if error: |
| print("This application is in an error state.") |
| break |
| |
| blocked = await api.is_application_blocked(charm) |
| if blocked: |
| print("This application is blocked.") |
| break |
| |
| # An extra check to see if the charm is ready |
| up = await api.is_application_up(charm) |
| print("Application is {}".format("up" if up else "down")) |
| |
| print("Service {} is deployed".format(args.application)) |
| |
| ################################### |
| # Execute config on a proxy charm # |
| ################################### |
| config = await api.get_config(args.application) |
| hostname = config['ssh-username']['value'] |
| rhostname = hostname[::-1] |
| |
| # Apply the configuration |
| await api.apply_config( |
| {'ssh-username': rhostname}, application=args.application |
| ) |
| |
| # Get the configuration |
| config = await api.get_config(args.application) |
| |
| # Verify the configuration has been updated |
| assert(config['ssh-username']['value'] == rhostname) |
| |
| #################################### |
| # Get the status of an application # |
| #################################### |
| status = await api.get_application_status(charm) |
| print("Application Status: {}".format(status)) |
| |
| ########################### |
| # Execute a simple action # |
| ########################### |
| result = await api.run_action(charm, 'get-ssh-public-key') |
| print("Action {} status is {} and returned {}".format( |
| result['status'], |
| result['action']['tag'], |
| result['action']['results'] |
| )) |
| |
| ##################################### |
| # Execute an action with parameters # |
| ##################################### |
| result = await api.run_action(charm, 'run', command='hostname') |
| |
| print("Action {} status is {} and returned {}".format( |
| result['status'], |
| result['action']['tag'], |
| result['action']['results'] |
| )) |
| |
| print("Logging out...") |
| await api.logout() |
| api = None |
| |
| # get public key in juju controller? that can be pulled without need of a charm deployed and installed to vm via cloud-init |
| |
| if __name__ == "__main__": |
| # Create a single event loop for running code asyncronously. |
| loop = asyncio.get_event_loop() |
| |
| # An initial set of tasks to run |
| tasks = [ |
| deploy_charm_and_wait(), |
| ] |
| |
| # TODO: optionally run forever and use a Watcher to monitor what's happening |
| loop.run_until_complete(asyncio.wait(tasks)) |