#!/usr/bin/env python3
+
#
# Copyright 2016 RIFT.IO Inc
#
# See the License for the specific language governing permissions and
# limitations under the License.
#
-# Author(s): Austin Cormier
-# Creation Date: 2016/05/23
-#
+
import argparse
import hashlib
import ipaddress
import itertools
-import jujuclient
import logging
import sys
import time
import yaml
+from ncclient import manager
+
+from juju_api import JujuApi
+
logging.basicConfig(filename="/tmp/rift_ns_add_corp.log", level=logging.DEBUG)
logger = logging.getLogger()
dry_run = False
-class JujuActionError(Exception):
+
+class NccConnectError(Exception):
pass
+class NccSshConnectError(Exception):
+ pass
-class JujuClient(object):
- """Class for executing Juju actions """
- def __init__(self, ip, port, user, passwd):
- self._ip = ip
+
+class NetconfClient(object):
+
+ def __init__(self, log,
+ host='127.0.0.1',
+ port=2022,
+ user='admin',
+ password='admin'):
+ self._log = log
+ self._host = host
self._port = port
self._user = user
- self._passwd = passwd
+ self._passwd = password
- endpoint = 'wss://%s:%d' % (ip, port)
- logger.debug("Using endpoint=%s", endpoint)
- if dry_run:
+ self._manager = None
+
+ @property
+ def connected(self):
+ if self._manager != None and self._manager.connected == True:
+ return True
+ return False
+
+ def connect(self, timeout_secs=120):
+ if (self._manager != None and self._manager.connected == True):
+ self._log.debug("Disconnecting previous session")
+ self._manager.close_session
+
+ self._log.debug("connecting netconf .... %s", self._host)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+
+ try:
+ self._log.debug("Attemping netconf connection to host: %s",
+ self._host)
+
+ self._manager = manager.connect(
+ host=self._host,
+ port=self._port,
+ username=self._user,
+ password=self._passwd,
+ allow_agent=False,
+ look_for_keys=False,
+ hostkey_verify=False,
+ )
+
+ self._log.debug("Netconf connected to %s", self._host)
+ return
+
+ except Exception as e:
+ self._log.error("Netconf connection to host: %s, failed: %s",
+ self._host, str(e))
+ raise e
+
+ time.sleep(2)
+
+ raise NccConnectError(
+ "Failed to connect to host: %s within %s seconds" %
+ (self._host, timeout_secs)
+ )
+
+ def connect_ssh(self, timeout_secs=120):
+ if (self._manager != None and self._manager.connected == True):
+ self._log.debug("Disconnecting previous session")
+ self._manager.close_session
+
+ self._log.debug("connecting netconf via SSH .... %s", self._host)
+ start_time = time.time()
+ while (time.time() - start_time) < timeout_secs:
+
+ try:
+ self._log.debug("Attemping netconf connection to host: %s",
+ self._host)
+
+ self._manager = manager.connect_ssh(
+ host=self._host,
+ port=self._port,
+ username=self._user,
+ password=self._passwd,
+ allow_agent=False,
+ look_for_keys=False,
+ hostkey_verify=False,
+ )
+
+ self._log.debug("netconf over SSH connected to host: %s", self._host)
+ return
+
+ except Exception as e:
+ self._log.error("Netconf connection to host: %s, failed: %s",
+ self._host, str(e))
+ raise e
+
+ time.sleep(2)
+
+ raise NccSshConnectError(
+ "Failed to connect to host: %s within %s seconds" %
+ (self._host, timeout_secs)
+ )
+
+ def apply_edit_cfg(self, cfg):
+ self._log.debug("Attempting to apply netconf to %s: %s", self._host, cfg)
+
+ if self._manager is None:
+ self._log.error("Netconf is not connected to host: %s, aborting!", self._host)
return
- self.env = jujuclient.Environment(endpoint)
- self.env.login(passwd, user)
- def get_service(self, name):
- return self.env.get_service(name)
+ try:
+ self._log.debug("apply_edit_cfg: %s", cfg)
+ xml = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(cfg)
+ response = self._manager.edit_config(xml, target='running')
+ if hasattr(response, 'xml'):
+ response_xml = response.xml
+ else:
+ response_xml = response.data_xml.decode()
+
+ self._log.debug("apply_edit_cfg response: %s", response_xml)
+ if '<rpc-error>' in response_xml:
+ raise Exception("apply_edit_cfg response has rpc-error : %s",
+ response_xml)
+
+ self._log.debug("apply_edit_cfg Successfully applied configuration {%s}", xml)
+
+ except Exception as e:
+ self._log.error("Netconf apply config {} failed: {}".format(cfg, e))
+ raise e
+
+ def get_config(self, xpath):
+ self._log.debug("Attempting to get config on %s: %s", self._host, xpath)
+
+ if self._manager is None:
+ self._log.error("Netconf is not connected to host: %s, aborting!", self._host)
+ return
+
+ try:
+ response = self._manager.get_config(source='running',
+ filter=('xpath', xpath))
+ if hasattr(response, 'xml'):
+ response_xml = response.xml
+ else:
+ response_xml = response.data_xml.decode()
+
+ self._log.debug("get config response: %s", response_xml)
+ if '<rpc-error>' in response_xml:
+ raise Exception("get config response has rpc-error : %s",
+ response_xml)
+
+ self._log.debug("get config success {%s}", response_xml)
+ return response_xml
+
+ except Exception as e:
+ self._log.error("Netconf get config {} failed: {}".format(xpath, e))
+ raise e
+
+
+ def get_capabilities(self):
+ self._log.debug("Attempting to get capabilities on %s", self._host)
+
+ if self._manager is None:
+ self._log.error("Netconf is not connected to host: %s, aborting!", self._host)
+ return
+
+ try:
+ response = self._manager.server_capabilites()
+ self._log.debug("get capabilities: %s", response)
+ return response
+
+ except Exception as e:
+ self._log.error("Netconf get capabilities failed: {}".format(e))
+ raise e
- def _get_units(self, name):
- """
- Get the units associated with service
- """
- units = self.env.status(name)['Services'][name]['Units']
- units = list(units.keys())
- # convert to a friendly format for juju-python-client
- units[:] = [('unit-%s' % u).replace('/', '-') for u in units]
- return units
+class JujuActionError(Exception):
+ pass
+
+
+class JujuClient(object):
+ """Class for executing Juju actions """
+ def __init__(self, ip, port, user, passwd):
+ self._env = JujuApi(log=logger,
+ server=ip,
+ port=port,
+ user=user,
+ secret=passwd)
def exec_action(self, name, action_name, params, block=False):
logger.debug("execute actiion %s using params %s", action_name, params)
if dry_run:
return
- actions = jujuclient.Actions(self.env)
- results = actions.enqueue_units(self._get_units(name),
- action_name,
- params)
- if not block:
- return results
-
- if 'error' in results['results'][0].keys():
- raise JujuActionError("Juju action error: %s" % results['results'][0])
-
- action = results['results'][0]['action']
- info = actions.info([action])
- i = 0
- logging.debug("Initial action results: %s", results['results'][0])
- while info['results'][0]['status'] not in ['completed', 'failed']:
+ result = self._env._execute_action(action_name, params, service=name)
+
+ if block is False:
+ return result
+
+ timeout = time.time() + 30 # Timeout after 30 secs
+ action_id = result['action']['tag']
+ status = result['status']
+ logging.debug("Initial action results: %s", status)
+ while status not in [ 'completed', 'failed' ]:
time.sleep(1)
- info = actions.info([action])
+ result = self._env._get_action_status(action_id)
+ status = result['status']
- # break out if the action doesn't complete in 10 secs
- i += 1
- if i == 10:
+ # break out if the action doesn't complete in 30 secs
+ if time.time() > timeout:
raise JujuActionError("Juju action timed out after 30 seconds")
- if info['results'][0]['status'] != 'completed':
- raise JujuActionError("Action %s failure: %s" % (action_name, info['results'][0]))
+ if status != 'completed':
+ raise JujuActionError("Action %s failure: %s" % (action_name, result))
- return info
+ return result
class CharmAction(object):
self._params = action_params if action_params is not None else []
def execute(self, juju_client):
- logger.info("Executing charm (%s) action (%s) with params (%s)",
+ logger.debug("Executing charm (%s) action (%s) with params (%s)",
self._deployed_name, self._action_name, self._params)
try:
info = juju_client.exec_action(
raise ValueError("PE param not found: %s" % param_name)
+ @property
+ def name(self):
+ return self._pe_group_cfg["name"]
+
@property
def vlan_id(self):
return self._get_param_value("Vlan ID")
def corp_gateway(self):
return self._get_param_value("Corp. Gateway")
+ @property
+ def vim_account(self):
+ acc = None
+ try:
+ acc = self._get_param_value("VIM Account")
+ except ValueError:
+ logger.debug("{}: Did not find VIM account".format(self.name))
+ return acc
+
+ @property
+ def network_name(self):
+ return self._get_param_value("Network Name")
+
+ @property
+ def network_type(self):
+ return self._get_param_value("Network Type")
+
+ @property
+ def overlay_type(self):
+ return self._get_param_value("Network Overlay Type")
+
+ @property
+ def physical_network(self):
+ return self._get_param_value("Physical Network")
+
class AddCorporationRequest(object):
def __init__(self, add_corporation_rpc):
def name(self):
return self._add_corporation_rpc["name"]
+ @property
+ def nsr_id(self):
+ return self._add_corporation_rpc["nsr_id_ref"]
+
@property
def param_groups(self):
return self._add_corporation_rpc["parameter_group"]
self._vnf_init_config_map = vnf_name_map
def get_service_name(self, vnf_index):
- for vnfr_id, index in self._vnfr_index_map.items():
+ for index, vnfr_id in self._vnfr_index_map.items():
if index != vnf_index:
continue
tunnel_key)
+def add_vl(ncc, nsr_id, pe_group_cfg):
+ cfg = PEGroupConfig(pe_group_cfg)
+ acc = cfg.vim_account
+
+ def vl_desc():
+ vld = '<vld>'
+ vld += '<id>{}</id>'.format(cfg.network_name)
+ vld += '<name>{}</name>'.format(cfg.network_name)
+ vld += '<type>{}</type>'.format(cfg.network_type)
+ vld += '<provider_network><overlay_type>{}</overlay_type>' \
+ '<physical_network>{}</physical_network></provider_network>'. \
+ format(cfg.overlay_type, cfg.physical_network)
+ vld += '</vld>'
+ logger.debug("New VLD: {}".format(vld))
+ return vld
+
+ def vl_ca_map():
+ ca_map = '<vl_cloud_account_map>'
+ ca_map += '<vld_id_ref>{}</vld_id_ref>'.format(cfg.network_name)
+ ca_map += '<cloud_accounts>{}</cloud_accounts>'.format(acc)
+ ca_map += '</vl_cloud_account_map>'
+ logger.debug("New VL cloud account map: {}".format(ca_map))
+ return ca_map
+
+ if acc:
+ vld = vl_desc()
+ ca = vl_ca_map()
+
+ if dry_run:
+ return
+
+ if not ncc.connected:
+ ncc.connect()
+
+ cur_cfg = ncc.get_config('/ns-instance-config/nsr[id="{}"]'.format(nsr_id))
+ logger.debug("Current NS {} config: {}".format(nsr_id, cur_cfg))
+
+ vld_index = cur_cfg.find('<vld>')
+ start_idx = cur_cfg.find('<ns-instance-config ')
+ end_idx = cur_cfg.find('</data>')
+ vld_cfg = cur_cfg[start_idx:index] + vld_index + cur_cfg[index:end_idx]
+
+ if 'vl_cloud_account_map' in vld_cfg:
+ ca_index = vld_cfg.find('<vl_cloud_account_map>')
+ else:
+ ca_index = vld_cfg.find('</ns-instance-config>')
+ upd_cfg = vld_cfg[:ca_index] + ca + vld_cfg[ca_index:]
+
+ logger.debug("Update NS {} config: {}".format(nsr_id, upd_cfg))
+ ncc.apply_edit_cfg(upd_cfg)
+
def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser()
parser.add_argument("yaml_cfg_file", type=argparse.FileType('r'))
parser.add_argument("--dry-run", action="store_true")
- parser.add_argument("--quiet", "-q", dest="verbose", action="store_false")
+ parser.add_argument("--verbose", "-v", dest="verbose", action="store_true")
args = parser.parse_args()
if args.verbose:
ch.setLevel(logging.DEBUG)
rpc_request = AddCorporationRequest.from_yaml_cfg(yaml_str)
pe_param_group_map = rpc_request.get_pe_parameter_group_map()
+ # Netconf client to use
+ # Assuming netconf is on localhost with default credentails
+ ncc = NetconfClient(logger)
+
pe_name_charm_map = {}
for pe_name, pe_group_cfg in pe_param_group_map.items():
# The PE name (i.e. PE1) must be in the parameter group name so we can correlate
pe_name_charm_map[pe_name] = SixWindPEProxyCharm(juju_client, pe_charm_service_name)
+ # Create network if required
+ add_vl(ncc, rpc_request.nsr_id, pe_group_cfg)
+
# At this point we have SixWindPEProxyCharm() instances for each PE and each
# PE param group configuration.
for src_pe_name in pe_param_group_map:
tunnel_key=rpc_request.tunnel_key,
)
+ logging.info("Script completed successfully")
+
if __name__ == "__main__":
try:
main()
--- /dev/null
+############################################################################
+# Copyright 2016 RIFT.io Inc #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); #
+# you may not use this file except in compliance with the License. #
+# You may obtain a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+############################################################################
+
+import argparse
+import asyncio
+from functools import partial
+import logging
+import os
+import ssl
+import sys
+import time
+
+try:
+ from jujuclient.juju1.environment import Environment as Env1
+ from jujuclient.juju2.environment import Environment as Env2
+except ImportError as e:
+ # Try importing older jujuclient
+ from jujuclient import Environment as Env1
+
+try:
+ ssl._create_default_https_context = ssl._create_unverified_context
+except AttributeError:
+ # Legacy Python doesn't verify by default (see pep-0476)
+ # https://www.python.org/dev/peps/pep-0476/
+ pass
+
+
+class JujuVersionError(Exception):
+ pass
+
+
+class JujuApiError(Exception):
+ pass
+
+
+class JujuEnvError(JujuApiError):
+ pass
+
+
+class JujuModelError(JujuApiError):
+ pass
+
+
+class JujuStatusError(JujuApiError):
+ pass
+
+
+class JujuUnitsError(JujuApiError):
+ pass
+
+
+class JujuWaitUnitsError(JujuApiError):
+ pass
+
+
+class JujuSrvNotDeployedError(JujuApiError):
+ pass
+
+
+class JujuAddCharmError(JujuApiError):
+ pass
+
+
+class JujuDeployError(JujuApiError):
+ pass
+
+
+class JujuDestroyError(JujuApiError):
+ pass
+
+
+class JujuResolveError(JujuApiError):
+ pass
+
+
+class JujuActionError(JujuApiError):
+ pass
+
+
+class JujuActionApiError(JujuActionError):
+ pass
+
+
+class JujuActionInfoError(JujuActionError):
+ pass
+
+
+class JujuActionExecError(JujuActionError):
+ pass
+
+
+class JujuApi(object):
+ '''
+ JujuApi wrapper on jujuclient library
+
+ There should be one instance of JujuApi for each VNF manged by Juju.
+
+ Assumption:
+ Currently we use one unit per service/VNF. So once a service
+ is deployed, we store the unit name and reuse it
+'''
+ log = None
+
+ def __init__ (self,
+ log=None,
+ loop=None,
+ server='127.0.0.1',
+ port=17070,
+ user='admin',
+ secret=None,
+ version=None):
+ '''Initialize with the Juju credentials'''
+ self.server = server
+ self.port = port
+
+ self.secret = secret
+ if user.startswith('user-'):
+ self.user = user
+ else:
+ self.user = 'user-{}'.format(user)
+
+ self.loop = loop
+
+ if log is not None:
+ self.log = log
+ else:
+ self.log = JujuApi._get_logger()
+
+ if self.log is None:
+ raise JujuApiError("Logger not defined")
+
+ self.version = None
+ if version:
+ self.version = version
+ else:
+ try:
+ if Env2:
+ pass
+ except NameError:
+ self.log.warn("Using older version of Juju client, which " \
+ "supports only Juju 1.x")
+ self.version = 1
+
+ endpoint = 'wss://%s:%d' % (server, int(port))
+ self.endpoint = endpoint
+
+ self.charm = None # Charm used
+ self.service = None # Service deployed
+ self.units = [] # Storing as list to support more units in future
+
+ self.destroy_retries = 25 # Number retires to destroy service
+ self.retry_delay = 5 # seconds
+
+ def __str__(self):
+ return ("JujuApi-{}".format(self.endpoint))
+
+ @classmethod
+ def _get_logger(cls):
+ if cls.log is not None:
+ return cls.log
+
+ fmt = logging.Formatter(
+ '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \
+ '%(filename)s:%(lineno)d) - %(message)s')
+ stderr_handler = logging.StreamHandler(stream=sys.stderr)
+ stderr_handler.setFormatter(fmt)
+ logging.basicConfig(level=logging.DEBUG)
+ cls.log = logging.getLogger('juju-api')
+ cls.log.addHandler(stderr_handler)
+
+ return cls.log
+
+ @staticmethod
+ def format_charm_name(name):
+ '''Format the name to valid charm name
+
+ Charm service name accepts only a to z and -.
+ '''
+
+ new_name = ''
+ for c in name:
+ if c.isdigit():
+ c = chr(97 + int(c))
+ elif not c.isalpha():
+ c = "-"
+ new_name += c
+ return new_name.lower()
+
+ def _get_version_tag(self, tag):
+ version_tag_map = {
+ 'applications': {
+ 1: 'Services',
+ 2: 'applications',
+ },
+ 'units': {
+ 1: 'Units',
+ 2: 'units',
+ },
+ 'status': {
+ 1: 'Status',
+ 2: 'status',
+ },
+ 'workload-status': {
+ 1: 'Workload',
+ 2: 'workload-status',
+ },
+ 'charm-url': {
+ 1: 'CharmURL',
+ 2: 'charm-url',
+ },
+ }
+
+ return version_tag_map[tag][self.version]
+
+ def _get_env1(self):
+ try:
+ env = Env1(self.endpoint)
+ l = env.login(self.secret, user=self.user)
+ return env
+
+ except ConnectionRefusedError as e:
+ msg = "{}: Failed Juju 1.x connect: {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise e
+
+ except Exception as e:
+ msg = "{}: Failed Juju 1.x connect: {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuEnvError(msg)
+
+ def _get_env2(self):
+ try:
+ env = Env2(self.endpoint)
+ l = env.login(self.secret, user=self.user)
+ except KeyError as e:
+ msg = "{}: Failed Juju 2.x connect: {}".format(self, e)
+ self.log.debug(msg)
+ raise JujuVersionError(msg)
+
+ try:
+ models = env.models.list()
+ for m in models['user-models']:
+ if m['model']['name'] == 'default':
+ mep = '{}/model/{}/api'.format(self.endpoint,
+ m['model']['uuid'])
+ model = Env2(mep, env_uuid=m['model']['uuid'])
+ l = model.login(self.secret, user=self.user)
+ break
+
+ if model is None:
+ raise
+
+ return model
+
+ except Exception as e:
+ msg = "{}: Failed logging to model: {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ env.close()
+ raise JujuModelError(msg)
+
+ def _get_env(self):
+ self.log.debug("{}: Connect to endpoint {}".
+ format(self, self.endpoint))
+
+ if self.version is None:
+ # Try version 2 first
+ try:
+ env = self._get_env2()
+ self.version = 2
+
+ except JujuVersionError as e:
+ self.log.info("Unable to login as Juju 2.x, trying 1.x")
+ env = self._get_env1()
+ self.version = 1
+
+ return env
+
+ elif self.version == 2:
+ return self._get_env2()
+
+ elif self.version == 1:
+ return self._get_env1()
+
+ else:
+ msg = "{}: Unknown version set: {}".format(self, self.version)
+ self.log.error(msg)
+ raise JujuVersionError(msg)
+
+ @asyncio.coroutine
+ def get_env(self):
+ ''' Connect to the Juju controller'''
+ env = yield from self.loop.run_in_executor(
+ None,
+ self._get_env,
+ )
+ return env
+
+ def _get_status(self, env=None):
+ if env is None:
+ env = self._get_env()
+
+ try:
+ status = env.status()
+ return status
+
+ except Exception as e:
+ msg = "{}: exception in getting status: {}". \
+ format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuStatusError(msg)
+
+ @asyncio.coroutine
+ def get_status(self, env=None):
+ '''Get Juju controller status'''
+ pf = partial(self._get_status, env=env)
+ status = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return status
+
+ def get_all_units(self, status, service=None):
+ '''Parse the status and get the units'''
+ results = {}
+ services = status.get(self._get_version_tag('applications'), {})
+
+ for svc_name, svc_data in services.items():
+ if service and service != svc_name:
+ continue
+ units = svc_data[self._get_version_tag('units')] or {}
+
+ results[svc_name] = {}
+ for unit in units:
+ results[svc_name][unit] = \
+ units[unit][self._get_version_tag('workload-status')] \
+ [self._get_version_tag('status')] or None
+ return results
+
+
+ def _get_service_units(self, service=None, status=None, env=None):
+ if service is None:
+ service = self.service
+
+ # Optimizing calls to Juju, as currently we deploy only 1 unit per
+ # service.
+ # if self.service == service and len(self.units):
+ # return self.units
+
+ if env is None:
+ env = self._get_env()
+
+ if status is None:
+ status = self._get_status(env=env)
+
+ try:
+ resp = self.get_all_units(status, service=service)
+ self.log.debug("Get all units: {}".format(resp))
+ units = set(resp[service].keys())
+
+ if self.service == service:
+ self.units = units
+
+ return units
+
+ except Exception as e:
+ msg = "{}: exception in get units {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuUnitsError(msg)
+
+ @asyncio.coroutine
+ def get_service_units(self, service=None, status=None, env=None):
+ '''Get the unit names for a service'''
+ pf = partial(self._get_service_units,
+ service=service,
+ status=status,
+ env=env)
+ units = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return units
+
+ def _get_service_status(self, service=None, status=None, env=None):
+ if env is None:
+ env = self._get_env()
+
+ if status is None:
+ status = self._get_status(env=env)
+
+ if service is None:
+ service = self.service
+
+ try:
+ srv_status = status[self._get_version_tag('applications')] \
+ [service][self._get_version_tag('status')] \
+ [self._get_version_tag('status')]
+ self.log.debug("{}: Service {} status is {}".
+ format(self, service, srv_status))
+ return srv_status
+
+ except KeyError as e:
+ self.log.info("self: Did not find service {}, e={}".format(self, service, e))
+ return 'NA'
+
+ except Exception as e:
+ msg = "{}: exception checking service status for {}, e {}". \
+ format(self, service, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuStatusError(msg)
+
+
+ @asyncio.coroutine
+ def get_service_status(self, service=None, status=None, env=None):
+ ''' Get service status
+
+ maintenance : The unit is not yet providing services, but is actively doing stuff.
+ unknown : Service has finished an event but the charm has not called status-set yet.
+ waiting : Service is unable to progress to an active state because of dependency.
+ blocked : Service needs manual intervention to get back to the Running state.
+ active : Service correctly offering all the services.
+ NA : Service is not deployed
+ '''
+ pf = partial(self._get_service_status,
+ service=service,
+ status=status,
+ env=env)
+ srv_status = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return srv_status
+
+ def _is_service_deployed(self, service=None, status=None, env=None):
+ resp = self._get_service_status(service=service,
+ status=status,
+ env=env)
+
+ if resp not in ['terminated', 'NA']:
+ return True
+
+ return False
+
+ @asyncio.coroutine
+ def is_service_deployed(self, service=None, status=None, env=None):
+ '''Check if the service is deployed'''
+ pf = partial(self._is_service_deployed,
+ service=service,
+ status=status,
+ env=env)
+ rc = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return rc
+
+ def _is_service_error(self, service=None, status=None, env=None):
+ resp = self._get_service_status(service=service,
+ status=status,
+ env=env)
+
+ if resp in ['error']:
+ return True
+
+ return False
+
+ @asyncio.coroutine
+ def is_service_error(self, service=None, status=None, env=None):
+ '''Check if the service is in error state'''
+ pf = partial(self._is_service_error,
+ service=service,
+ status=status,
+ env=env)
+ rc = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return rc
+
+ def _is_service_maint(self, service=None, status=None, env=None):
+ resp = self._get_service_status(service=service,
+ status=status,
+ env=env)
+
+ if resp in ['maintenance']:
+ return True
+
+ return False
+
+ @asyncio.coroutine
+ def is_service_maint(self, service=None, status=None, env=None):
+ '''Check if the service is in error state'''
+ pf = partial(self._is_service_maint,
+ service=service,
+ status=status,
+ env=env)
+ rc = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return rc
+
+ def _is_service_active(self, service=None, status=None, env=None):
+ resp = self._get_service_status(service=service,
+ status=status,
+ env=env)
+
+ if resp in ['active']:
+ return True
+
+ return False
+
+ @asyncio.coroutine
+ def is_service_active(self, service=None, status=None, env=None):
+ '''Check if the service is active'''
+ pf = partial(self._is_service_active,
+ service=service,
+ status=status,
+ env=env)
+ rc = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return rc
+
+ def _is_service_blocked(self, service=None, status=None, env=None):
+ resp = self._get_service_status(service=service,
+ status=status,
+ env=env)
+
+ if resp in ['blocked']:
+ return True
+
+ return False
+
+ @asyncio.coroutine
+ def is_service_blocked(self, service=None, status=None, env=None):
+ '''Check if the service is blocked'''
+ pf = partial(self._is_service_blocked,
+ service=service,
+ status=status,
+ env=env)
+ rc = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return rc
+
+ def _is_service_up(self, service=None, status=None, env=None):
+ resp = self._get_service_status(service=service,
+ status=status,
+ env=env)
+
+ if resp in ['active', 'blocked']:
+ return True
+
+ return False
+
+ @asyncio.coroutine
+ def is_service_up(self, service=None, status=None, env=None):
+ '''Check if the service is installed and up'''
+ pf = partial(self._is_service_up,
+ service=service,
+ status=status,
+ env=env)
+
+ rc = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return rc
+
+ def _apply_config(self, config, service=None, env=None):
+ if service is None:
+ service = self.service
+
+ if config is None or len(config) == 0:
+ self.log.warn("{}: Empty config passed for service {}".
+ format(self, service))
+ return
+
+ if env is None:
+ env = self._get_env()
+
+ status = self._get_status(env=env)
+
+ if not self._is_service_deployed(service=service,
+ status=status,
+ env=env):
+ raise JujuSrvNotDeployedError("{}: service {} is not deployed".
+ format(self, service))
+
+ self.log.debug("{}: Config for service {} update to: {}".
+ format(self, service, config))
+ try:
+ # Try to fix error on service, most probably due to config issue
+ if self._is_service_error(service=service, status=status, env=env):
+ self._resolve_error(service=service, env=env)
+
+ if self.version == 2:
+ env.service.set(service, config)
+ else:
+ env.set_config(service, config)
+
+ except Exception as e:
+ self.log.error("{}: exception setting config for {} with {}, e {}".
+ format(self, service, config, e))
+ self.log.exception(e)
+ raise e
+
+ @asyncio.coroutine
+ def apply_config(self, config, service=None, env=None, wait=True):
+ '''Apply a config on the service'''
+ pf = partial(self._apply_config,
+ config,
+ service=service,
+ env=env)
+ yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+
+ if wait:
+ # Wait till config finished applying
+ self.log.debug("{}: Wait for config apply to finish".
+ format(self))
+ delay = 3 # secs
+ maint = True
+ while maint:
+ # Sleep first to give time for config_changed hook to be invoked
+ yield from asyncio.sleep(delay, loop=self.loop)
+ maint = yield from self.is_service_maint(service=service,
+ env=env)
+
+ err = yield from self.is_service_error(service=service, env=env)
+ if err:
+ self.log.error("{}: Service is in error state".
+ format(self))
+ return False
+
+ self.log.debug("{}: Finished applying config".format(self))
+ return True
+
+ def _set_parameter(self, parameter, value, service=None):
+ return self._apply_config({parameter : value}, service=service)
+
+ @asyncio.coroutine
+ def set_parameter(self, parameter, value, service=None):
+ '''Set a config parameter for a service'''
+ return self.apply_config({parameter : value}, service=service)
+
+ def _resolve_error(self, service=None, status=None, env=None):
+ if env is None:
+ env = self._get_env()
+
+ if status is None:
+ status = self._get_status(env=env)
+
+ if service is None:
+ service = self.service
+
+ if env is None:
+ env = self._get_env()
+ if self._is_service_deployed(service=service, status=status):
+ units = self.get_all_units(status, service=service)
+
+ for unit, ustatus in units[service].items():
+ if ustatus == 'error':
+ self.log.info("{}: Found unit {} with status {}".
+ format(self, unit, ustatus))
+ try:
+ # Takes the unit name as service_name/idx unlike action
+ env.resolved(unit)
+
+ except Exception as e:
+ msg = "{}: Resolve on unit {}: {}". \
+ format(self, unit, e)
+ self.log.warn(msg)
+
+ @asyncio.coroutine
+ def resolve_error(self, service=None, status=None, env=None):
+ '''Resolve units in error state'''
+ pf = partial(self._resolve_error,
+ service=service,
+ status=status,
+ env=env)
+ yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+
+ def _deploy_service(self, charm, service,
+ path=None, config=None, env=None):
+ self.log.debug("{}: Deploy service for charm {}({}) with service {}".
+ format(self, charm, path, service))
+
+ if env is None:
+ env = self._get_env()
+
+ self.service = service
+ self.charm = charm
+
+ if self._is_service_deployed(service=service, env=env):
+ self.log.info("{}: Charm service {} already deployed".
+ format (self, service))
+ if config:
+ self._apply_config(config, service=service, env=env)
+ return
+
+ series = "trusty"
+
+ deploy_to = None
+ if self.version == 1:
+ deploy_to = "lxc:0"
+
+ if path is None:
+ prefix=os.getenv('RIFT_INSTALL', '/')
+ path = os.path.join(prefix, 'usr/rift/charms', series, charm)
+
+ try:
+ self.log.debug("{}: Local charm settings: dir={}, series={}".
+ format(self, path, series))
+ result = env.add_local_charm_dir(path, series)
+ url = result[self._get_version_tag('charm-url')]
+
+ except Exception as e:
+ msg = '{}: Error setting local charm directory {} for {}: {}'. \
+ format(self, path, service, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuAddCharmError(msg)
+
+ try:
+ self.log.debug("{}: Deploying using: service={}, url={}, to={}, config={}".
+ format(self, service, url, deploy_to, config))
+ env.deploy(service, url, config=config, machine_spec=deploy_to)
+
+ except Exception as e:
+ msg = '{}: Error deploying {}: {}'.format(self, service, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuDeployError(msg)
+
+ @asyncio.coroutine
+ def deploy_service(self, charm, service,
+ wait=False, timeout=300,
+ path=None, config=None):
+ '''Deploy a service using the charm name provided'''
+ env = yield from self.get_env()
+
+ pf = partial(self._deploy_service,
+ charm,
+ service,
+ path=path,
+ config=config,
+ env=env)
+ yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+
+ rc = True
+ if wait is True:
+ # Wait for the deployed units to start
+ try:
+ self.log.debug("{}: Waiting for service {} to come up".
+ format(self, service))
+ rc = yield from self.wait_for_service(timeout=timeout, env=env)
+
+ except Exception as e:
+ msg = '{}: Error starting all units for {}: {}'. \
+ format(self, service, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuWaitUnitsError(msg)
+
+ return rc
+
+ @asyncio.coroutine
+ def wait_for_service(self, service=None, timeout=0, env=None):
+ '''Wait for the service to come up'''
+ if service is None:
+ service = self.service
+
+ if env is None:
+ env = yield from self.get_env()
+
+ status = yield from self.get_status(env=env)
+
+ if self._is_service_up(service=service, status=status, env=env):
+ self.log.debug("{}: Service {} is already up".
+ format(self, service))
+ return True
+
+ # Check if service is deployed
+ if not self._is_service_deployed(service=service, status=status, env=env):
+ raise JujuSrvNotDeployedError("{}: service {} is not deployed".
+ format(self, service))
+
+ if timeout < 0:
+ timeout = 0
+
+ count = 0
+ delay = self.retry_delay # seconds
+ self.log.debug("{}: In wait for service {}".format(self, service))
+
+ start_time = time.time()
+ max_time = time.time() + timeout
+ while timeout != 0 and (time.time() <= max_time):
+ count += 1
+ rc = yield from self.is_service_up(service=service, env=env)
+ if rc:
+ self.log.debug("{}: Service {} is up after {} seconds".
+ format(self, service, time.time()-start_time))
+ return True
+ yield from asyncio.sleep(delay, loop=self.loop)
+ return False
+
+ def _destroy_service(self, service=None):
+ '''Destroy a service on Juju controller'''
+ self.log.debug("{}: Destroy charm service: {}".format(self,service))
+
+ if service is None:
+ service = self.service
+
+ env = self._get_env()
+
+ status = self._get_status(env=env)
+
+ count = 0
+ while self._is_service_deployed(service=service, status=status, env=env):
+ count += 1
+ self.log.debug("{}: Destroy service {}, count {}".
+ format(self, service, count))
+
+ if count > self.destroy_retries:
+ msg = "{}: Not able to destroy service {} after {} tries". \
+ format(self, service, count)
+ self.log.error(msg)
+ raise JujuDestroyError(msg)
+
+
+ if self._is_service_error(service=service, status=status):
+ self._resolve_error(service, status)
+
+ try:
+ env.destroy_service(service)
+
+ except Exception as e:
+ msg = "{}: Exception when running destroy on service {}: {}". \
+ format(self, service, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuDestroyError(msg)
+
+ time.sleep(self.retry_delay)
+ status = self._get_status(env=env)
+
+ self.log.debug("{}: Destroyed service {} ({})".
+ format(self, service, count))
+
+ @asyncio.coroutine
+ def destroy_service(self, service=None):
+ '''Destroy a service on Juju controller'''
+ pf = partial(self._destroy_service,
+ service=service)
+ yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+
+
+ def _get_action_status(self, action_tag, env=None):
+ if env is None:
+ env = self._get_env()
+
+ if not action_tag.startswith('action-'):
+ action_tag = 'action-{}'.format(action_tag)
+
+ try:
+ action = env.actions
+ except Exception as e:
+ msg = "{}: exception in Action API: {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuActionApiError(msg)
+
+ try:
+ status = action.info([{'Tag': action_tag}])
+
+ self.log.debug("{}: Action {} status {}".
+ format(self, action_tag, status))
+ return status['results'][0]
+
+ except Exception as e:
+ msg = "{}: exception in get action status {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuActionInfoError(msg)
+
+ @asyncio.coroutine
+ def get_action_status(self, action_tag, env=None):
+ '''
+ Get the status of an action queued on the controller
+
+ responds with the action status, which is one of three values:
+
+ - completed
+ - pending
+ - failed
+
+ @param action_tag - the action UUID return from the enqueue method
+ eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9
+ '''
+ pf = partial(self._get_action_status,
+ action_tag,
+ env=env,)
+ status = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return status
+
+ def _execute_action(self, action_name, params, service=None, env=None):
+ '''Execute the action on all units of a service'''
+ if service is None:
+ service = self.service
+
+ if env is None:
+ env = self._get_env()
+
+ try:
+ action = env.actions
+ except Exception as e:
+ msg = "{}: exception in Action API: {}".format(self, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuActionApiError(msg)
+
+ units = self._get_service_units(service)
+ self.log.debug("{}: Apply action {} on units {}".
+ format(self, action_name, units))
+
+ # Rename units from <service>/<n> to unit-<service>-<n>
+ unit_tags = []
+ for unit in units:
+ idx = int(unit[unit.index('/')+1:])
+ unit_name = "unit-%s-%d" % (service, idx)
+ unit_tags.append(unit_name)
+ self.log.debug("{}: Unit tags for action: {}".
+ format(self, unit_tags))
+
+ try:
+ result = action.enqueue_units(unit_tags, action_name, params)
+ self.log.debug("{}: Response for action: {}".
+ format(self, result))
+ return result['results'][0]
+
+ except Exception as e:
+ msg = "{}: Exception enqueing action {} on units {} with " \
+ "params {}: {}".format(self, action, unit_tags, params, e)
+ self.log.error(msg)
+ self.log.exception(e)
+ raise JujuActionExecError(msg)
+
+ @asyncio.coroutine
+ def execute_action(self, action_name, params, service=None, env=None):
+ '''Execute an action for a service on the controller
+
+ Currently, we execute the action on all units of the service
+ '''
+ pf = partial(self._execute_action,
+ action_name,
+ params,
+ service=service,
+ env=env)
+ result = yield from self.loop.run_in_executor(
+ None,
+ pf,
+ )
+ return result
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description='Test Juju')
+ parser.add_argument("-s", "--server", default='10.0.202.49', help="Juju controller")
+ parser.add_argument("-u", "--user", default='admin', help="User, default user-admin")
+ parser.add_argument("-p", "--password", default='nfvjuju', help="Password for the user")
+ parser.add_argument("-P", "--port", default=17070, help="Port number, default 17070")
+ parser.add_argument("-d", "--directory", help="Local directory for the charm")
+ parser.add_argument("--service", help="Charm service name")
+ parser.add_argument("--vnf-ip", help="IP of the VNF to configure")
+ args = parser.parse_args()
+
+ api = JujuApi(server=args.server,
+ port=args.port,
+ user=args.user,
+ secret=args.password)
+
+ env = api._get_env()
+ if env is None:
+ raise "Not able to login to the Juju controller"
+
+ print("Status: {}".format(api._get_status(env=env)))
+
+ if args.directory and args.service:
+ # Deploy the charm
+ charm = os.path.basename(args.directory)
+ api._deploy_service(charm, args.service,
+ path=args.directory,
+ env=env)
+
+ while not api._is_service_up():
+ time.sleep(5)
+
+ print ("Service {} is deployed with status {}".
+ format(args.service, api._get_service_status()))
+
+ if args.vnf_ip and \
+ ('clearwater-aio' in args.directory):
+ # Execute config on charm
+ api._apply_config({'proxied_ip': args.vnf_ip})
+
+ while not api._is_service_active():
+ time.sleep(10)
+
+ print ("Service {} is in status {}".
+ format(args.service, api._get_service_status()))
+
+ res = api._execute_action('create-update-user', {'number': '125252352525',
+ 'password': 'asfsaf'})
+
+ print ("Action 'creat-update-user response: {}".format(res))
+
+ status = res['status']
+ while status not in [ 'completed', 'failed' ]:
+ time.sleep(2)
+ status = api._get_action_status(res['action']['tag'])['status']
+
+ print("Action status: {}".format(status))
+
+ # This action will fail as the number is non-numeric
+ res = api._execute_action('delete-user', {'number': '125252352525asf'})
+
+ print ("Action 'delete-user response: {}".format(res))
+
+ status = res['status']
+ while status not in [ 'completed', 'failed' ]:
+ time.sleep(2)
+ status = api._get_action_status(res['action']['tag'])['status']
+
+ print("Action status: {}".format(status))