*.pyc
+.cache
+deb_dist/
+dist/
+*.gz
+*egg-info/
+.eggs
+*venv/
+*venv3/
--- /dev/null
+node {
+ stage("Checkout") {
+ checkout scm
+ }
+ stage("Test") {
+ sh 'make test'
+ }
+ stage("Build") {
+ sh 'make package'
+ stash name: "deb-files", includes: "deb_dist/*.deb"
+ }
+ stage("Repo Component") {
+ releaseDir = "ReleaseTWO"
+ unstash "deb-files"
+ sh '''
+ mkdir -p pool/osmclient
+ mv deb_dist/*.deb pool/osmclient/
+ mkdir -p dists/${releaseDir}/unstable/osmclient/binary-amd64/
+ apt-ftparchive packages pool/osmclient > dists/${releaseDir}/unstable/osmclient/binary-amd64/Packages
+ gzip -9fk dists/${releaseDir}/unstable/osmclient/binary-amd64/Packages
+ '''
+ archiveArtifacts artifacts: "dists/**,pool/osmclient/*.deb"
+ }
+}
-package:
- @python setup.py --command-packages=stdeb.command bdist_deb > /dev/null 2>&1
+# Copyright 2017 Sandvine
+#
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+all: build_tools
+ $(MAKE) test
+ $(MAKE) package
+
+BUILD_TOOLS=python python3 virtualenv \
+ libcurl4-gnutls-dev python-pip \
+ python3-pip libgnutls-dev debhelper
+
+VENV=osmclient-venv
+VENV3=osmclient-venv3
+VENV_BIN=$(VENV)/bin/python
+VENV3_BIN=$(VENV3)/bin/python
+
+venv: $(VENV)/bin/activate
+venv3: $(VENV3)/bin/activate
+
+$(VENV)/bin/activate: test_requirements.txt
+ test -d $(VENV) || virtualenv $(VENV)
+ $(VENV)/bin/pip install -Ur test_requirements.txt
+ touch $(VENV)/bin/activate
+
+$(VENV3)/bin/activate: test_requirements.txt
+ test -d $(VENV3) || virtualenv -p python3 $(VENV3)
+ $(VENV3)/bin/pip3 install -Ur test_requirements.txt
+ touch $(VENV3)/bin/activate
+
+build_tools:
+ sudo apt-get -y install $(BUILD_TOOLS)
+
+package: build_tools
+ $(VENV_BIN) setup.py --command-packages=stdeb.command bdist_deb
+
+test_flake8: venv
+ $(VENV_BIN) setup.py flake8
+
+test_nose: venv
+ $(VENV_BIN) setup.py test
+
+test_nose3: venv3
+ $(VENV3_BIN) setup.py test
+
+test: test_flake8 test_nose test_nose3
+
+.PHONY: package build_tools test test_flake8 test_nose test_nose3
+
+clean:
+ rm -rf $(VENV) $(VENV3) deb_dist dist osmclient.egg-info
from osmclient.v1 import client
-def Client(version=1, host = None, *args, **kwargs):
+def Client(version=1, host=None, *args, **kwargs):
if version == 1:
return client.Client(host, *args, **kwargs)
else:
class ClientException(Exception):
pass
+
class NotFound(ClientException):
pass
class Http(object):
- def __init__(self,url,user='admin',password='admin'):
+ def __init__(self, url, user='admin', password='admin'):
self._url = url
self._user = user
self._password = password
self._http_header = None
- def set_http_header(self,header):
+ def set_http_header(self, header):
self._http_header = header
- def _get_curl_cmd(self,endpoint):
+ def _get_curl_cmd(self, endpoint):
curl_cmd = pycurl.Curl()
- curl_cmd.setopt(pycurl.URL, self._url + endpoint )
- curl_cmd.setopt(pycurl.SSL_VERIFYPEER,0)
- curl_cmd.setopt(pycurl.SSL_VERIFYHOST,0)
- curl_cmd.setopt(pycurl.USERPWD, '{}:{}'.format(self._user,self._password))
+ curl_cmd.setopt(pycurl.URL, self._url + endpoint)
+ curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
+ curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
+ curl_cmd.setopt(
+ pycurl.USERPWD,
+ '{}:{}'.format(
+ self._user,
+ self._password))
if self._http_header:
curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
return curl_cmd
- def get_cmd( self, endpoint ):
+ def get_cmd(self, endpoint):
data = BytesIO()
- curl_cmd=self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.HTTPGET,1)
+ curl_cmd = self._get_curl_cmd(endpoint)
+ curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- curl_cmd.perform()
+ curl_cmd.perform()
curl_cmd.close()
if data.getvalue():
return json.loads(data.getvalue().decode())
return None
- def delete_cmd( self, endpoint ):
+ def delete_cmd(self, endpoint):
data = BytesIO()
- curl_cmd=self._get_curl_cmd(endpoint)
+ curl_cmd = self._get_curl_cmd(endpoint)
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
- curl_cmd.perform()
+ curl_cmd.perform()
curl_cmd.close()
if data.getvalue():
return json.loads(data.getvalue().decode())
return None
- def post_cmd( self, endpoint='', postfields_dict=None, formfile=None, ):
+ def post_cmd(self, endpoint='', postfields_dict=None, formfile=None, ):
data = BytesIO()
- curl_cmd=self._get_curl_cmd(endpoint)
- curl_cmd.setopt(pycurl.POST,1)
+ curl_cmd = self._get_curl_cmd(endpoint)
+ curl_cmd.setopt(pycurl.POST, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
if postfields_dict is not None:
- jsondata=json.dumps(postfields_dict)
- curl_cmd.setopt(pycurl.POSTFIELDS,jsondata)
+ jsondata = json.dumps(postfields_dict)
+ curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
if formfile is not None:
- curl_cmd.setopt(pycurl.HTTPPOST,[((formfile[0],(pycurl.FORM_FILE,formfile[1])))])
+ curl_cmd.setopt(
+ pycurl.HTTPPOST,
+ [((formfile[0],
+ (pycurl.FORM_FILE,
+ formfile[1])))])
- curl_cmd.perform()
+ curl_cmd.perform()
curl_cmd.close()
if data.getvalue():
return json.loads(data.getvalue().decode())
import unittest
from osmclient.common import utils
+
class TestUtil(unittest.TestCase):
- def test_wait_for_method_basic(self):
- def foobar():
- return True
- assert utils.wait_for_value(lambda: foobar())
-
- def test_wait_for_method_timeout(self):
- def foobar():
- return False
- assert not utils.wait_for_value(lambda: foobar(),wait_time=0)
-
- def test_wait_for_method_paramter(self):
- def foobar(input):
- return input
- assert not utils.wait_for_value(lambda: foobar(False),wait_time=0)
- assert utils.wait_for_value(lambda: foobar(True),wait_time=0)
-
- def test_wait_for_method_wait_for_change(self):
- def foobar():
- if foobar.counter == 0:
- return True
- foobar.counter -=1
- return False
- foobar.counter=1
- assert utils.wait_for_value(lambda: foobar(),wait_time=1)
-
- def test_wait_for_method_exception(self):
- def foobar():
- raise Exception('send exception')
- assert not utils.wait_for_value(lambda: foobar(),wait_time=0,catch_exception=Exception)
-
- def test_wait_for_method_first_exception(self):
- def foobar():
- if foobar.counter == 0:
- return True
- foobar.counter -=1
- raise Exception('send exception')
- foobar.counter=1
- assert utils.wait_for_value(lambda: foobar(),wait_time=1,catch_exception=Exception)
+ def test_wait_for_method_basic(self):
+ def foobar():
+ return True
+ assert utils.wait_for_value(lambda: foobar())
+
+ def test_wait_for_method_timeout(self):
+ def foobar():
+ return False
+ assert not utils.wait_for_value(lambda: foobar(), wait_time=0)
+
+ def test_wait_for_method_paramter(self):
+ def foobar(input):
+ return input
+ assert not utils.wait_for_value(lambda: foobar(False), wait_time=0)
+ assert utils.wait_for_value(lambda: foobar(True), wait_time=0)
+
+ def test_wait_for_method_wait_for_change(self):
+ def foobar():
+ if foobar.counter == 0:
+ return True
+ foobar.counter -= 1
+ return False
+ foobar.counter = 1
+ assert utils.wait_for_value(lambda: foobar(), wait_time=1)
+
+ def test_wait_for_method_exception(self):
+ def foobar():
+ raise Exception('send exception')
+ assert not utils.wait_for_value(
+ lambda: foobar(),
+ wait_time=0,
+ catch_exception=Exception)
+
+ def test_wait_for_method_first_exception(self):
+ def foobar():
+ if foobar.counter == 0:
+ return True
+ foobar.counter -= 1
+ raise Exception('send exception')
+ foobar.counter = 1
+ assert utils.wait_for_value(
+ lambda: foobar(),
+ wait_time=1,
+ catch_exception=Exception)
import time
-def wait_for_value(func, result=True, wait_time=10, catch_exception = None ):
+def wait_for_value(func, result=True, wait_time=10, catch_exception=None):
maxtime = time.time() + wait_time
while time.time() < maxtime:
try:
if func() == result:
- return True
- except catch_exception as inst:
+ return True
+ except catch_exception:
pass
time.sleep(1)
try:
return func() == result
- except catch_exception as inst:
+ except catch_exception:
return False
OSM shell/cli
"""
-import click
+import click
from osmclient.client import client
from osmclient.common.exceptions import ClientException
from prettytable import PrettyTable
import json
+import time
@click.group()
-@click.option('--hostname',default=None,envvar='OSM_HOSTNAME',help='hostname of server. Also can set OSM_HOSTNAME in environment')
+@click.option('--hostname',
+ default=None,
+ envvar='OSM_HOSTNAME',
+ help='hostname of server. ' +
+ 'Also can set OSM_HOSTNAME in environment')
+@click.option('--so-port',
+ default=8008,
+ envvar='OSM_SO_PORT',
+ help='hostname of server. ' +
+ 'Also can set OSM_SO_PORT in environment')
+@click.option('--ro-hostname',
+ default=None,
+ envvar='OSM_RO_HOSTNAME',
+ help='hostname of RO server. ' +
+ 'Also can set OSM_RO_HOSTNAME in environment')
+@click.option('--ro-port',
+ default=9090,
+ envvar='OSM_RO_PORT',
+ help='hostname of RO server. ' +
+ 'Also can set OSM_RO_PORT in environment')
@click.pass_context
-def cli(ctx,hostname):
+def cli(ctx, hostname, so_port, ro_hostname, ro_port):
if hostname is None:
- print("either hostname option or OSM_HOSTNAME environment variable needs to be specified")
+ print(
+ "either hostname option or OSM_HOSTNAME " +
+ "environment variable needs to be specified")
exit(1)
- ctx.obj=client.Client(host=hostname)
+ ctx.obj = client.Client(
+ host=hostname,
+ so_port=so_port,
+ ro_host=ro_hostname,
+ ro_port=ro_port)
+
@cli.command(name='ns-list')
@click.pass_context
def ns_list(ctx):
- resp=ctx.obj.ns.list()
- table=PrettyTable(['ns instance name','id','operational status','config status'])
+ resp = ctx.obj.ns.list()
+ table = PrettyTable(
+ ['ns instance name',
+ 'id',
+ 'operational status',
+ 'config status'])
for ns in resp:
- nsopdata=ctx.obj.ns.get_opdata(ns['id'])
- nsr=nsopdata['nsr:nsr']
- table.add_row([nsr['name-ref'],nsr['ns-instance-config-ref'],nsr['operational-status'],nsr['config-status']])
- table.align='l'
+ nsopdata = ctx.obj.ns.get_opdata(ns['id'])
+ nsr = nsopdata['nsr:nsr']
+ table.add_row(
+ [nsr['name-ref'],
+ nsr['ns-instance-config-ref'],
+ nsr['operational-status'],
+ nsr['config-status']])
+ table.align = 'l'
print(table)
+
@cli.command(name='nsd-list')
@click.pass_context
def nsd_list(ctx):
- resp=ctx.obj.nsd.list()
- table=PrettyTable(['nsd name','id'])
+ resp = ctx.obj.nsd.list()
+ table = PrettyTable(['nsd name', 'id'])
for ns in resp:
- table.add_row([ns['name'],ns['id']])
- table.align='l'
+ table.add_row([ns['name'], ns['id']])
+ table.align = 'l'
print(table)
+
@cli.command(name='vnfd-list')
@click.pass_context
def vnfd_list(ctx):
resp = ctx.obj.vnfd.list()
- table=PrettyTable(['vnfd name','id'])
+ table = PrettyTable(['vnfd name', 'id'])
for vnfd in resp:
- table.add_row([vnfd['name'],vnfd['id']])
- table.align='l'
+ table.add_row([vnfd['name'], vnfd['id']])
+ table.align = 'l'
print(table)
+
@cli.command(name='vnf-list')
@click.pass_context
def vnf_list(ctx):
- resp=ctx.obj.vnf.list()
- table=PrettyTable(['vnf name','id','operational status','config status'])
+ resp = ctx.obj.vnf.list()
+ table = PrettyTable(
+ ['vnf name',
+ 'id',
+ 'operational status',
+ 'config status'])
for vnfr in resp:
- if not 'mgmt-interface' in vnfr:
+ if 'mgmt-interface' not in vnfr:
vnfr['mgmt-interface'] = {}
vnfr['mgmt-interface']['ip-address'] = None
- table.add_row([vnfr['name'],vnfr['id'],vnfr['operational-status'],vnfr['config-status']])
- table.align='l'
+ table.add_row(
+ [vnfr['name'],
+ vnfr['id'],
+ vnfr['operational-status'],
+ vnfr['config-status']])
+ table.align = 'l'
print(table)
+
@cli.command(name='vnf-show')
@click.argument('vnf_name')
-@click.option('--filter',default=None)
+@click.option('--filter', default=None)
@click.pass_context
-def vnf_show(ctx,vnf_name,filter):
+def vnf_show(ctx, vnf_name, filter):
try:
- resp=ctx.obj.vnf.get(vnf_name)
+ resp = ctx.obj.vnf.get(vnf_name)
except ClientException as inst:
print(inst.message)
exit(1)
-
- table=PrettyTable(['field','value'])
- for k,v in resp.items():
+
+ table = PrettyTable(['field', 'value'])
+ for k, v in resp.items():
if filter is None or filter in k:
- table.add_row([k,json.dumps(v,indent=2)])
- table.align='l'
+ table.add_row([k, json.dumps(v, indent=2)])
+ table.align = 'l'
print(table)
+
@cli.command(name='vnf-monitoring-show')
@click.argument('vnf_name')
@click.pass_context
-def vnf_monitoring_show(ctx,vnf_name):
- try:
- resp=ctx.obj.vnf.get_monitoring(vnf_name)
+def vnf_monitoring_show(ctx, vnf_name):
+ try:
+ resp = ctx.obj.vnf.get_monitoring(vnf_name)
except ClientException as inst:
print(inst.message)
exit(1)
- table=PrettyTable(['vnf name','monitoring name','value','units'])
+ table = PrettyTable(['vnf name', 'monitoring name', 'value', 'units'])
if resp is not None:
for monitor in resp:
- table.add_row([vnf_name,monitor['name'],monitor['value-integer'],monitor['units']])
- table.align='l'
+ table.add_row(
+ [vnf_name,
+ monitor['name'],
+ monitor['value-integer'],
+ monitor['units']])
+ table.align = 'l'
print(table)
+
@cli.command(name='ns-monitoring-show')
@click.argument('ns_name')
@click.pass_context
-def ns_monitoring_show(ctx,ns_name):
+def ns_monitoring_show(ctx, ns_name):
try:
- resp=ctx.obj.ns.get_monitoring(ns_name)
+ resp = ctx.obj.ns.get_monitoring(ns_name)
except ClientException as inst:
print(inst.message)
exit(1)
- table=PrettyTable(['vnf name','monitoring name','value','units'])
- for key,val in resp.items():
+ table = PrettyTable(['vnf name', 'monitoring name', 'value', 'units'])
+ for key, val in resp.items():
for monitor in val:
- table.add_row([key,monitor['name'],monitor['value-integer'],monitor['units']])
- table.align='l'
+ table.add_row(
+ [key,
+ monitor['name'],
+ monitor['value-integer'],
+ monitor['units']])
+ table.align = 'l'
print(table)
+
@cli.command(name='ns-create')
-@click.option('--ns_name',prompt=True)
-@click.option('--nsd_name',prompt=True)
-@click.option('--vim_account',prompt=True)
-@click.option('--admin_status',default='ENABLED',help='administration status')
-@click.option('--ssh_keys',default=None,help='comma separated list of keys to inject to vnfs')
-@click.option('--vim_network_prefix',default=None,help='vim network name prefix')
+@click.option('--ns_name',
+ prompt=True)
+@click.option('--nsd_name',
+ prompt=True)
+@click.option('--vim_account',
+ prompt=True)
+@click.option('--admin_status',
+ default='ENABLED',
+ help='administration status')
+@click.option('--ssh_keys',
+ default=None,
+ help='comma separated list of keys to inject to vnfs')
+@click.option('--vim_network_prefix',
+ default=None,
+ help='vim network name prefix')
@click.pass_context
-def ns_create(ctx,nsd_name,ns_name,vim_account,admin_status,ssh_keys,vim_network_prefix):
+def ns_create(ctx,
+ nsd_name,
+ ns_name,
+ vim_account,
+ admin_status,
+ ssh_keys,
+ vim_network_prefix):
try:
- ctx.obj.ns.create(nsd_name,ns_name,vim_network_prefix=vim_network_prefix,ssh_keys=ssh_keys,account=vim_account)
+ ctx.obj.ns.create(
+ nsd_name,
+ ns_name,
+ vim_network_prefix=vim_network_prefix,
+ ssh_keys=ssh_keys,
+ account=vim_account)
except ClientException as inst:
print(inst.message)
- exit(1)
+ exit(1)
+
@cli.command(name='ns-delete')
@click.argument('ns_name')
@click.pass_context
-def ns_delete(ctx,ns_name):
+def ns_delete(ctx, ns_name):
try:
ctx.obj.ns.delete(ns_name)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='upload-package')
@click.argument('filename')
@click.pass_context
-def upload_package(ctx,filename):
+def upload_package(ctx, filename):
try:
ctx.obj.package.upload(filename)
ctx.obj.package.wait_for_upload(filename)
print(inst.message)
exit(1)
+
@cli.command(name='ns-show')
@click.argument('ns_name')
-@click.option('--filter',default=None)
+@click.option('--filter', default=None)
@click.pass_context
-def ns_show(ctx,ns_name,filter):
+def ns_show(ctx, ns_name, filter):
try:
ns = ctx.obj.ns.get(ns_name)
except ClientException as inst:
print(inst.message)
exit(1)
- table=PrettyTable(['field','value'])
+ table = PrettyTable(['field', 'value'])
- for k,v in ns.items():
+ for k, v in ns.items():
if filter is None or filter in k:
- table.add_row([k,json.dumps(v,indent=2)])
+ table.add_row([k, json.dumps(v, indent=2)])
- nsopdata=ctx.obj.ns.get_opdata(ns['id'])
- nsr_optdata=nsopdata['nsr:nsr']
- for k,v in nsr_optdata.items():
+ nsopdata = ctx.obj.ns.get_opdata(ns['id'])
+ nsr_optdata = nsopdata['nsr:nsr']
+ for k, v in nsr_optdata.items():
if filter is None or filter in k:
- table.add_row([k,json.dumps(v,indent=2)])
- table.align='l'
+ table.add_row([k, json.dumps(v, indent=2)])
+ table.align = 'l'
print(table)
+
@cli.command(name='ns-scaling-show')
@click.argument('ns_name')
@click.pass_context
-def show_ns_scaling(ctx,ns_name):
+def show_ns_scaling(ctx, ns_name):
resp = ctx.obj.ns.list()
- table=PrettyTable(['instance-id','operational status','create-time','vnfr ids'])
-
- if 'nsr' in resp:
- for ns in resp['nsr']:
- if ns_name == ns['name']:
- nsopdata=ctx.obj.ns.get_opdata(ns['id'])
- scaling_records=nsopdata['nsr:nsr']['scaling-group-record']
- for record in scaling_records:
- if 'instance' in record:
- instances=record['instance']
- for inst in instances:
- table.add_row([inst['instance-id'],inst['op-status'],time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(inst['create-time'])),inst['vnfrs']])
- table.align='l'
+ table = PrettyTable(
+ ['group-name',
+ 'instance-id',
+ 'operational status',
+ 'create-time',
+ 'vnfr ids'])
+
+ for ns in resp:
+ if ns_name == ns['name']:
+ nsopdata = ctx.obj.ns.get_opdata(ns['id'])
+ scaling_records = nsopdata['nsr:nsr']['scaling-group-record']
+ for record in scaling_records:
+ if 'instance' in record:
+ instances = record['instance']
+ for inst in instances:
+ table.add_row(
+ [record['scaling-group-name-ref'],
+ inst['instance-id'],
+ inst['op-status'],
+ time.strftime('%Y-%m-%d %H:%M:%S',
+ time.localtime(
+ inst['create-time'])),
+ inst['vnfrs']])
+ table.align = 'l'
print(table)
+
+@cli.command(name='ns-scale')
+@click.argument('ns_name')
+@click.option('--ns_scale_group', prompt=True)
+@click.option('--index', prompt=True)
+@click.pass_context
+def ns_scale(ctx, ns_name, ns_scale_group, index):
+ ctx.obj.ns.scale(ns_name, ns_scale_group, index)
+
+
@cli.command(name='nsd-delete')
@click.argument('nsd_name')
@click.pass_context
-def nsd_delete(ctx,nsd_name):
+def nsd_delete(ctx, nsd_name):
try:
ctx.obj.nsd.delete(nsd_name)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='vnfd-delete')
@click.argument('vnfd_name')
@click.pass_context
-def vnfd_delete(ctx,vnfd_name):
+def vnfd_delete(ctx, vnfd_name):
try:
ctx.obj.vnfd.delete(vnfd_name)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='config-agent-list')
@click.pass_context
def config_agent_list(ctx):
- table=PrettyTable(['name','account-type','details'])
+ table = PrettyTable(['name', 'account-type', 'details'])
for account in ctx.obj.vca.list():
- table.add_row([account['name'],account['account-type'],account['juju']])
- table.align='l'
+ table.add_row(
+ [account['name'],
+ account['account-type'],
+ account['juju']])
+ table.align = 'l'
print(table)
+
@cli.command(name='config-agent-delete')
@click.argument('name')
@click.pass_context
-def config_agent_delete(ctx,name):
+def config_agent_delete(ctx, name):
try:
ctx.obj.vca.delete(name)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='config-agent-add')
-@click.option('--name',prompt=True)
-@click.option('--account_type',prompt=True)
-@click.option('--server',prompt=True)
-@click.option('--user',prompt=True)
-@click.option('--secret',prompt=True,hide_input=True,confirmation_prompt=True)
+@click.option('--name',
+ prompt=True)
+@click.option('--account_type',
+ prompt=True)
+@click.option('--server',
+ prompt=True)
+@click.option('--user',
+ prompt=True)
+@click.option('--secret',
+ prompt=True,
+ hide_input=True,
+ confirmation_prompt=True)
@click.pass_context
-def config_agent_add(ctx,name,account_type,server,user,secret):
+def config_agent_add(ctx, name, account_type, server, user, secret):
try:
- ctx.obj.vca.create(name,account_type,server,user,secret)
+ ctx.obj.vca.create(name, account_type, server, user, secret)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='vim-create')
-@click.option('--name',prompt=True)
-@click.option('--user',prompt=True)
-@click.option('--password',prompt=True,hide_input=True,confirmation_prompt=True)
-@click.option('--auth_url',prompt=True)
-@click.option('--tenant',prompt=True)
-@click.option('--floating_ip_pool',default=None)
-@click.option('--keypair',default=None)
-@click.option('--account_type',default='openstack')
-@click.option('--description',default='no description')
+@click.option('--name',
+ prompt=True)
+@click.option('--user',
+ prompt=True)
+@click.option('--password',
+ prompt=True,
+ hide_input=True,
+ confirmation_prompt=True)
+@click.option('--auth_url',
+ prompt=True)
+@click.option('--tenant',
+ prompt=True)
+@click.option('--floating_ip_pool',
+ default=None)
+@click.option('--keypair',
+ default=None)
+@click.option('--account_type',
+ default='openstack')
+@click.option('--description',
+ default='no description')
@click.pass_context
-def vim_create(ctx,name,user,password,auth_url,tenant,floating_ip_pool,keypair,account_type,description):
- vim={}
- vim['os-username']=user
- vim['os-password']=password
- vim['os-url']=auth_url
- vim['os-project-name']=tenant
- vim['floating_ip_pool']=floating_ip_pool
- vim['keypair']=keypair
- vim['vim-type']='openstack'
- vim['description']=description
+def vim_create(ctx,
+ name,
+ user,
+ password,
+ auth_url,
+ tenant,
+ floating_ip_pool,
+ keypair,
+ account_type,
+ description):
+ vim = {}
+ vim['os-username'] = user
+ vim['os-password'] = password
+ vim['os-url'] = auth_url
+ vim['os-project-name'] = tenant
+ vim['floating_ip_pool'] = floating_ip_pool
+ vim['keypair'] = keypair
+ vim['vim-type'] = 'openstack'
+ vim['description'] = description
try:
- ctx.obj.vim.create(name,vim)
+ ctx.obj.vim.create(name, vim)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='vim-delete')
@click.argument('name')
@click.pass_context
-def vim_delete(ctx,name):
+def vim_delete(ctx, name):
try:
ctx.obj.vim.delete(name)
except ClientException as inst:
print(inst.message)
exit(1)
+
@cli.command(name='vim-list')
@click.pass_context
def vim_list(ctx):
- resp=ctx.obj.vim.list()
- table=PrettyTable(['vim name','uuid'])
+ resp = ctx.obj.vim.list()
+ table = PrettyTable(['vim name', 'uuid'])
for vim in resp:
- table.add_row([vim['name'],vim['uuid']])
- table.align='l'
+ table.add_row([vim['name'], vim['uuid']])
+ table.align = 'l'
print(table)
+
@cli.command(name='vim-show')
@click.argument('name')
@click.pass_context
-def vim_show(ctx,name):
+def vim_show(ctx, name):
try:
- resp=ctx.obj.vim.get(name)
+ resp = ctx.obj.vim.get(name)
except ClientException as inst:
print(inst.message)
exit(1)
-
- table=PrettyTable(['key','attribute'])
- for k,v in resp.items():
- table.add_row([k,json.dumps(v,indent=2)])
- table.align='l'
+
+ table = PrettyTable(['key', 'attribute'])
+ for k, v in resp.items():
+ table.add_row([k, json.dumps(v, indent=2)])
+ table.align = 'l'
print(table)
+
@cli.command(name='ro-dump')
@click.pass_context
def ro_dump(ctx):
- resp=ctx.obj.vim.get_resource_orchestrator()
- table=PrettyTable(['key','attribute'])
- for k,v in resp.items():
- table.add_row([k,json.dumps(v,indent=2)])
- table.align='l'
+ resp = ctx.obj.vim.get_resource_orchestrator()
+ table = PrettyTable(['key', 'attribute'])
+ for k, v in resp.items():
+ table.add_row([k, json.dumps(v, indent=2)])
+ table.align = 'l'
print(table)
+
@cli.command(name='vcs-list')
@click.pass_context
def vcs_list(ctx):
- resp=ctx.obj.utils.get_vcs_info()
- table=PrettyTable(['component name','state'])
+ resp = ctx.obj.utils.get_vcs_info()
+ table = PrettyTable(['component name', 'state'])
for component in resp:
- table.add_row([component['component_name'],component['state']])
- table.align='l'
+ table.add_row([component['component_name'], component['state']])
+ table.align = 'l'
print(table)
+
if __name__ == '__main__':
cli()
from osmclient.v1 import vca
from osmclient.v1 import utils
from osmclient.common import http
-import pycurl
class Client(object):
- def __init__(self, host=None,ro_port=9090,upload_port=8443,**kwargs):
- self._user='admin'
- self._password='admin'
- self._host = host.split(':')[0]
- self._so_port = host.split(':')[1]
+ def __init__(
+ self,
+ host=None,
+ so_port=8008,
+ ro_host=None,
+ ro_port=9090,
+ upload_port=8443,
+ **kwargs):
+ self._user = 'admin'
+ self._password = 'admin'
- http_client = http.Http('https://{}:{}/'.format(self._host,self._so_port))
- http_client.set_http_header( ['Accept: application/vnd.yand.data+json','Content-Type: application/json'])
+ if len(host.split(':')) > 1:
+ # backwards compatible, port provided as part of host
+ self._host = host.split(':')[0]
+ self._so_port = host.split(':')[1]
+ else:
+ self._host = host
+ self._so_port = so_port
- ro_http_client = http.Http('http://{}:{}/'.format(self._host,ro_port))
- ro_http_client.set_http_header( ['Accept: application/vnd.yand.data+json','Content-Type: application/json'])
+ http_client = http.Http(
+ 'https://{}:{}/'.format(
+ self._host,
+ self._so_port))
+ http_client.set_http_header(
+ ['Accept: application/vnd.yand.data+json',
+ 'Content-Type: application/json'])
- upload_client = http.Http('https://{}:{}/composer/upload?api_server=https://localhost&upload_server=https://{}'.format(self._host,upload_port,self._host))
+ if ro_host is None:
+ ro_host = host
+ ro_http_client = http.Http('http://{}:{}/'.format(ro_host, ro_port))
+ ro_http_client.set_http_header(
+ ['Accept: application/vnd.yand.data+json',
+ 'Content-Type: application/json'])
- self.vnf = vnf.Vnf(http_client,**kwargs)
- self.vnfd = vnfd.Vnfd(http_client,**kwargs)
- self.ns = ns.Ns(http=http_client,client=self,**kwargs)
- self.nsd = nsd.Nsd(http_client,**kwargs)
- self.vim = vim.Vim(http=http_client,ro_http=ro_http_client,client=self,**kwargs)
- self.package = package.Package(http=http_client,upload_http=upload_client,client=self,**kwargs)
- self.vca = vca.Vca(http_client,**kwargs)
- self.utils = utils.Utils(http_client,**kwargs)
+ upload_client = http.Http(
+ 'https://{}:{}/composer/upload?api_server={}{}'.format(
+ self._host,
+ upload_port,
+ 'https://localhost&upload_server=https://',
+ self._host))
+
+ self.vnf = vnf.Vnf(http_client, **kwargs)
+ self.vnfd = vnfd.Vnfd(http_client, **kwargs)
+ self.ns = ns.Ns(http=http_client, client=self, **kwargs)
+ self.nsd = nsd.Nsd(http_client, **kwargs)
+ self.vim = vim.Vim(
+ http=http_client,
+ ro_http=ro_http_client,
+ client=self,
+ **kwargs)
+ self.package = package.Package(
+ http=http_client,
+ upload_http=upload_client,
+ client=self,
+ **kwargs)
+ self.vca = vca.Vca(http_client, **kwargs)
+ self.utils = utils.Utils(http_client, **kwargs)
class Key(object):
- def __init__(self,client=None):
- self._client=client
+ def __init__(self, client=None):
+ self._client = client
def list(self):
data = BytesIO()
- curl_cmd=self._client.get_curl_cmd('v1/api/config/key-pair?deep')
- curl_cmd.setopt(pycurl.HTTPGET,1)
+ curl_cmd = self._client.get_curl_cmd('v1/api/config/key-pair?deep')
+ curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
curl_cmd.perform()
curl_cmd.close()
from osmclient.common.exceptions import ClientException
from osmclient.common.exceptions import NotFound
import uuid
-import pprint
class Ns(object):
- def __init__(self,http=None,client=None):
- self._http=http
- self._client=client
+
+ def __init__(self, http=None, client=None):
+ self._http = http
+ self._client = client
def list(self):
"""Returns a list of ns's
return resp['nsr:ns-instance-config']['nsr']
- def get(self,name):
+ def get(self, name):
"""Returns an ns based on name
"""
for ns in self.list():
- if name == ns['name']:
- return ns
+ if name == ns['name']:
+ return ns
raise NotFound("ns {} not found".format(name))
- def create(self,nsd_name,nsr_name,account,vim_network_prefix=None,ssh_keys=None,description='default description',admin_status='ENABLED'):
- postdata={}
+ def scale(self, ns_name, ns_scale_group, instance_index):
+ postdata = {}
+ postdata['instance'] = list()
+ instance = {}
+ instance['id'] = instance_index
+ postdata['instance'].append(instance)
+
+ ns = self.get(ns_name)
+ resp = self._http.post_cmd(
+ 'v1/api/config/ns-instance-config/nsr/{}/scaling-group/{}/instance'
+ .format(ns['id'], ns_scale_group), postdata)
+ if 'success' not in resp:
+ raise ClientException(
+ "failed to scale ns: {} result: {}".format(
+ ns_name,
+ resp))
+
+ def create(self, nsd_name, nsr_name, account, vim_network_prefix=None,
+ ssh_keys=None, description='default description',
+ admin_status='ENABLED'):
+ postdata = {}
postdata['nsr'] = list()
- nsr={}
- nsr['id']=str(uuid.uuid1())
+ nsr = {}
+ nsr['id'] = str(uuid.uuid1())
+
+ nsd = self._client.nsd.get(nsd_name)
- nsd=self._client.nsd.get(nsd_name)
-
- datacenter=self._client.vim.get_datacenter(account)
+ datacenter = self._client.vim.get_datacenter(account)
if datacenter is None:
- raise NotFound("cannot find datacenter account {}".format(account))
+ raise NotFound("cannot find datacenter account {}".format(account))
- nsr['nsd']=nsd
- nsr['name']=nsr_name
- nsr['short-name']=nsr_name
- nsr['description']=description
- nsr['admin-status']=admin_status
- nsr['om-datacenter']=datacenter['uuid']
+ nsr['nsd'] = nsd
+ nsr['name'] = nsr_name
+ nsr['short-name'] = nsr_name
+ nsr['description'] = description
+ nsr['admin-status'] = admin_status
+ nsr['om-datacenter'] = datacenter['uuid']
if ssh_keys is not None:
# ssh_keys is comma separate list
- ssh_keys_format=[]
+ ssh_keys_format = []
for key in ssh_keys.split(','):
- ssh_keys_format.append({'key-pair-ref':key})
+ ssh_keys_format.append({'key-pair-ref': key})
- nsr['ssh-authorized-key']=ssh_keys_format
+ nsr['ssh-authorized-key'] = ssh_keys_format
if vim_network_prefix is not None:
- for index,vld in enumerate(nsr['nsd']['vld']):
+ for index, vld in enumerate(nsr['nsd']['vld']):
network_name = vld['name']
- nsr['nsd']['vld'][index]['vim-network-name'] = '{}-{}'.format(vim_network_prefix,network_name)
+ nsr['nsd']['vld'][index]['vim-network-name'] = '{}-{}'.format(
+ vim_network_prefix, network_name)
postdata['nsr'].append(nsr)
- resp = self._http.post_cmd('api/config/ns-instance-config/nsr',postdata)
+ resp = self._http.post_cmd(
+ 'api/config/ns-instance-config/nsr',
+ postdata)
if 'success' not in resp:
- raise ClientException("failed to create ns: {} nsd: {} result: {}".format(nsr_name,nsd_name,resp))
-
- def get_opdata(self,id):
- return self._http.get_cmd('api/operational/ns-instance-opdata/nsr/{}?deep'.format(id))
-
- def get_field(self,ns_name,field):
- nsr=self.get(ns_name)
+ raise ClientException(
+ "failed to create ns: {} nsd: {} result: {}".format(
+ nsr_name,
+ nsd_name,
+ resp))
+
+ def get_opdata(self, id):
+ return self._http.get_cmd(
+ 'api/operational/ns-instance-opdata/nsr/{}?deep'.format(id))
+
+ def get_field(self, ns_name, field):
+ nsr = self.get(ns_name)
if nsr is None:
raise NotFound("failed to retrieve ns {}".format(ns_name))
if field in nsr:
return nsr[field]
- nsopdata=self.get_opdata(nsr['id'])
+ nsopdata = self.get_opdata(nsr['id'])
if field in nsopdata['nsr:nsr']:
return nsopdata['nsr:nsr'][field]
- raise NotFound("failed to find {} in ns {}".format(field,ns_name))
+ raise NotFound("failed to find {} in ns {}".format(field, ns_name))
- def _terminate(self,ns_name):
- ns=self.get(ns_name)
+ def _terminate(self, ns_name):
+ ns = self.get(ns_name)
if ns is None:
- raise NotFound("cannot find ns {}".format(ns_name))
-
- return self._http.delete_cmd('api/config/ns-instance-config/nsr/'+ns['id'])
+ raise NotFound("cannot find ns {}".format(ns_name))
+
+ return self._http.delete_cmd('api/config/ns-instance-config/nsr/' +
+ ns['id'])
- def delete(self,ns_name,wait=True):
- vnfs = self.get_field(ns_name,'constituent-vnfr-ref')
+ def delete(self, ns_name, wait=True):
+ vnfs = self.get_field(ns_name, 'constituent-vnfr-ref')
- resp=self._terminate(ns_name)
+ resp = self._terminate(ns_name)
if 'success' not in resp:
- raise ClientException("failed to delete ns {}".format(name))
+ raise ClientException("failed to delete ns {}".format(ns_name))
# helper method to check if pkg exists
- def check_not_exists( func ):
+ def check_not_exists(func):
try:
func()
- return False
+ return False
except NotFound:
- return True
+ return True
for vnf in vnfs:
- if not utils.wait_for_value(lambda: check_not_exists(lambda: self._client.vnf.get(vnf['vnfr-id']))):
- raise ClientException("vnf {} failed to delete".format(vnf['vnfr-id']))
- if not utils.wait_for_value(lambda: check_not_exists(lambda: self.get(ns_name))):
+ if (not utils.wait_for_value(
+ lambda:
+ check_not_exists(lambda:
+ self._client.vnf.get(vnf['vnfr-id'])))):
+ raise ClientException(
+ "vnf {} failed to delete".format(vnf['vnfr-id']))
+ if not utils.wait_for_value(lambda:
+ check_not_exists(lambda:
+ self.get(ns_name))):
raise ClientException("ns {} failed to delete".format(ns_name))
- def get_monitoring(self,ns_name):
- ns=self.get(ns_name)
- mon_list={}
+ def get_monitoring(self, ns_name):
+ ns = self.get(ns_name)
+ mon_list = {}
if ns is None:
return mon_list
- vnfs=self._client.vnf.list()
+ vnfs = self._client.vnf.list()
for vnf in vnfs:
if ns['id'] == vnf['nsr-id-ref']:
if 'monitoring-param' in vnf:
class Nsd(object):
- def __init__(self,http=None):
- self._http=http
+
+ def __init__(self, http=None):
+ self._http = http
def list(self):
resp = self._http.get_cmd('api/running/nsd-catalog/nsd')
return resp['nsd:nsd']
return list()
- def get(self,name):
+ def get(self, name):
for nsd in self.list():
- if name == nsd['name']:
- return nsd
- raise NotFound("cannot find nsd {}".format(name))
+ if name == nsd['name']:
+ return nsd
+ raise NotFound("cannot find nsd {}".format(name))
- def delete(self,nsd_name):
- nsd=self.get(nsd_name)
+ def delete(self, nsd_name):
+ nsd = self.get(nsd_name)
if nsd is None:
- raise NotFound("cannot find nsd {}".format(nsd_name))
+ raise NotFound("cannot find nsd {}".format(nsd_name))
- resp=self._http.delete_cmd('api/running/nsd-catalog/nsd/{}'.format(nsd['id']))
+ resp = self._http.delete_cmd(
+ 'api/running/nsd-catalog/nsd/{}'.format(nsd['id']))
if 'success' not in resp:
- raise ClientException("failed to delete nsd {}".format(nsd_name))
+ raise ClientException("failed to delete nsd {}".format(nsd_name))
# under the License.
"""
-OSM package API handling
+OSM package API handling
"""
-import json
-import pycurl
-from io import BytesIO
import tarfile
-import re
+import re
import yaml
-import time
from osmclient.common.exceptions import ClientException
from osmclient.common.exceptions import NotFound
from osmclient.common import utils
class Package(object):
- def __init__(self,http=None,upload_http=None,client=None):
- self._client=client
- self._http=http
- self._upload_http=upload_http
+ def __init__(self, http=None, upload_http=None, client=None):
+ self._client = client
+ self._http = http
+ self._upload_http = upload_http
- def _wait_for_package(self,pkg_type):
+ def _wait_for_package(self, pkg_type):
if 'vnfd' in pkg_type['type']:
- get_method=self._client.vnfd.get
+ get_method = self._client.vnfd.get
elif 'nsd' in pkg_type['type']:
- get_method=self._client.nsd.get
+ get_method = self._client.nsd.get
else:
raise ClientException("no valid package type found")
# helper method to check if pkg exists
- def check_exists( func ):
+ def check_exists(func):
try:
func()
except NotFound:
return False
return True
- return utils.wait_for_value(lambda: check_exists(lambda: get_method(pkg_type['name'])))
+ return utils.wait_for_value(lambda:
+ check_exists(lambda:
+ get_method(pkg_type['name'])))
# method opens up a package and finds the name of the resulting
# descriptor (vnfd or nsd name)
tar = tarfile.open(descriptor_file)
yamlfile = None
for member in tar.getmembers():
- if re.match('.*.yaml',member.name) and len(member.name.split('/')) == 2:
+ if (re.match('.*.yaml', member.name) and
+ len(member.name.split('/')) == 2):
yamlfile = member.name
break
if yamlfile is None:
return None
- dict=yaml.load(tar.extractfile(yamlfile))
- result={}
- for k1,v1 in dict.items():
+ dict = yaml.load(tar.extractfile(yamlfile))
+ result = {}
+ for k1, v1 in dict.items():
if not k1.endswith('-catalog'):
continue
- for k2,v2 in v1.items():
+ for k2, v2 in v1.items():
if not k2.endswith('nsd') and not k2.endswith('vnfd'):
continue
result['type'] = 'vnfd'
for entry in v2:
- for k3,v3 in entry.items():
+ for k3, v3 in entry.items():
if k3 == 'name' or k3.endswith(':name'):
result['name'] = v3
return result
tar.close()
return None
- def wait_for_upload(self,filename):
+ def wait_for_upload(self, filename):
"""wait(block) for an upload to succeed.
-
The filename passed is assumed to be a descriptor tarball.
"""
- pkg_type=self.get_descriptor_type_from_pkg(filename)
+ pkg_type = self.get_descriptor_type_from_pkg(filename)
if pkg_type is None:
raise ClientException("Cannot determine package type")
if not self._wait_for_package(pkg_type):
- raise ClientException("package {} failed to upload".format(filename))
+ raise ClientException("package {} failed to upload"
+ .format(filename))
- def upload(self,filename):
- resp=self._upload_http.post_cmd(formfile=('package',filename))
+ def upload(self, filename):
+ resp = self._upload_http.post_cmd(formfile=('package', filename))
if not resp or 'transaction_id' not in resp:
raise ClientException("failed to upload package")
class TestNs(unittest.TestCase):
- def test_list_empty(self):
- mock=Mock()
- mock.get_cmd.return_value=list()
- assert len(ns.Ns(mock).list()) == 0
+ def test_list_empty(self):
+ mock = Mock()
+ mock.get_cmd.return_value = list()
+ assert len(ns.Ns(mock).list()) == 0
- def test_get_notfound(self):
- mock=Mock()
- mock.get_cmd.return_value='foo'
- self.assertRaises(NotFound,ns.Ns(mock).get,'bar')
+ def test_get_notfound(self):
+ mock = Mock()
+ mock.get_cmd.return_value = 'foo'
+ self.assertRaises(NotFound, ns.Ns(mock).get, 'bar')
- def test_get_found(self):
- mock=Mock()
- mock.get_cmd.return_value={'nsr:ns-instance-config': { 'nsr': [{'name': 'foo' }]}}
- assert ns.Ns(mock).get('foo')
+ def test_get_found(self):
+ mock = Mock()
+ mock.get_cmd.return_value = {'nsr:ns-instance-config':
+ {'nsr': [{'name': 'foo'}]}}
+ assert ns.Ns(mock).get('foo')
- def test_get_monitoring_notfound(self):
- mock=Mock()
- mock.get_cmd.return_value='foo'
- self.assertRaises(NotFound,ns.Ns(mock).get_monitoring,'bar')
+ def test_get_monitoring_notfound(self):
+ mock = Mock()
+ mock.get_cmd.return_value = 'foo'
+ self.assertRaises(NotFound, ns.Ns(mock).get_monitoring, 'bar')
from osmclient.v1 import nsd
from osmclient.common.exceptions import NotFound
+
class TestNsd(unittest.TestCase):
- def test_list_empty(self):
- mock=Mock()
- mock.get_cmd.return_value=list()
- assert len(nsd.Nsd(mock).list()) == 0
+ def test_list_empty(self):
+ mock = Mock()
+ mock.get_cmd.return_value = list()
+ assert len(nsd.Nsd(mock).list()) == 0
- def test_get_notfound(self):
- mock=Mock()
- mock.get_cmd.return_value='foo'
- self.assertRaises(NotFound,nsd.Nsd(mock).get,'bar')
+ def test_get_notfound(self):
+ mock = Mock()
+ mock.get_cmd.return_value = 'foo'
+ self.assertRaises(NotFound, nsd.Nsd(mock).get, 'bar')
- def test_get_found(self):
- mock=Mock()
- mock.get_cmd.return_value={'nsd:nsd': [{'name': 'foo' }]}
- assert nsd.Nsd(mock).get('foo')
+ def test_get_found(self):
+ mock = Mock()
+ mock.get_cmd.return_value = {'nsd:nsd': [{'name': 'foo'}]}
+ assert nsd.Nsd(mock).get('foo')
class TestPackage(unittest.TestCase):
- def test_upload_fail(self):
- mock=Mock()
- mock.post_cmd.return_value='foo'
- self.assertRaises(ClientException,package.Package(upload_http=mock).upload,'bar')
+ def test_upload_fail(self):
+ mock = Mock()
+ mock.post_cmd.return_value = 'foo'
+ self.assertRaises(ClientException,
+ package.Package(upload_http=mock).upload, 'bar')
- mock.post_cmd.return_value=None
- self.assertRaises(ClientException,package.Package(upload_http=mock).upload,'bar')
+ mock.post_cmd.return_value = None
+ self.assertRaises(ClientException,
+ package.Package(upload_http=mock).upload, 'bar')
- def test_wait_for_upload_bad_file(self):
- mock=Mock()
- mock.post_cmd.return_value='foo'
- self.assertRaises(IOError,package.Package(upload_http=mock).wait_for_upload,'invalidfile')
+ def test_wait_for_upload_bad_file(self):
+ mock = Mock()
+ mock.post_cmd.return_value = 'foo'
+ self.assertRaises(IOError,
+ package.Package(upload_http=mock).wait_for_upload,
+ 'invalidfile')
class TestVnf(unittest.TestCase):
- def test_list_empty(self):
- mock=Mock()
- mock.get_cmd.return_value=list()
- assert len(vnf.Vnf(mock).list()) == 0
-
- def test_get_notfound(self):
- mock=Mock()
- mock.get_cmd.return_value='foo'
- self.assertRaises(NotFound,vnf.Vnf(mock).get,'bar')
-
- def test_get_found(self):
- mock=Mock()
- mock.get_cmd.return_value={'vnfr:vnfr': [{'name': 'foo' }]}
- assert vnf.Vnf(mock).get('foo')
-
- def test_get_monitoring_notfound(self):
- mock=Mock()
- mock.get_cmd.return_value='foo'
- self.assertRaises(NotFound,vnf.Vnf(mock).get_monitoring,'bar')
-
- def test_get_monitoring_found(self):
- mock=Mock()
- mock.get_cmd.return_value={'vnfr:vnfr': [{'name': 'foo', 'monitoring-param': True }]}
- assert vnf.Vnf(mock).get_monitoring('foo')
+ def test_list_empty(self):
+ mock = Mock()
+ mock.get_cmd.return_value = list()
+ assert len(vnf.Vnf(mock).list()) == 0
+
+ def test_get_notfound(self):
+ mock = Mock()
+ mock.get_cmd.return_value = 'foo'
+ self.assertRaises(NotFound, vnf.Vnf(mock).get, 'bar')
+
+ def test_get_found(self):
+ mock = Mock()
+ mock.get_cmd.return_value = {'vnfr:vnfr': [{'name': 'foo'}]}
+ assert vnf.Vnf(mock).get('foo')
+
+ def test_get_monitoring_notfound(self):
+ mock = Mock()
+ mock.get_cmd.return_value = 'foo'
+ self.assertRaises(NotFound, vnf.Vnf(mock).get_monitoring, 'bar')
+
+ def test_get_monitoring_found(self):
+ mock = Mock()
+ mock.get_cmd.return_value = {'vnfr:vnfr': [{'name': 'foo',
+ 'monitoring-param': True}]}
+ assert vnf.Vnf(mock).get_monitoring('foo')
class TestVnfd(unittest.TestCase):
- def test_list_empty(self):
- mock=Mock()
- mock.get_cmd.return_value=list()
- assert len(vnfd.Vnfd(mock).list()) == 0
+ def test_list_empty(self):
+ mock = Mock()
+ mock.get_cmd.return_value = list()
+ assert len(vnfd.Vnfd(mock).list()) == 0
- def test_get_notfound(self):
- mock=Mock()
- mock.get_cmd.return_value='foo'
- self.assertRaises(NotFound,vnfd.Vnfd(mock).get,'bar')
+ def test_get_notfound(self):
+ mock = Mock()
+ mock.get_cmd.return_value = 'foo'
+ self.assertRaises(NotFound, vnfd.Vnfd(mock).get, 'bar')
- def test_get_found(self):
- mock=Mock()
- mock.get_cmd.return_value={'vnfd:vnfd': [{'name': 'foo' }]}
- assert vnfd.Vnfd(mock).get('foo')
+ def test_get_found(self):
+ mock = Mock()
+ mock.get_cmd.return_value = {'vnfd:vnfd': [{'name': 'foo'}]}
+ assert vnfd.Vnfd(mock).get('foo')
OSM utils
"""
+
class Utils(object):
- def __init__(self,http=None):
- self._http=http
+ def __init__(self, http=None):
+ self._http = http
def get_vcs_info(self):
- resp=self._http.get_cmd('api/operational/vcs/info')
+ resp = self._http.get_cmd('api/operational/vcs/info')
if resp:
return resp['rw-base:info']['components']['component_info']
return list()
-
from osmclient.common.exceptions import ClientException
+
class Vca(object):
- def __init__(self,http=None):
- self._http=http
+ def __init__(self, http=None):
+ self._http = http
def list(self):
resp = self._http.get_cmd('api/config/config-agent')
return resp['rw-config-agent:config-agent']['account']
return list()
- def delete(self,name):
- if 'success' not in self._http.delete_cmd('api/config/config-agent/account/{}'.format(name)):
- raise ClientException("failed to delete config agent {}".format(name))
+ def delete(self, name):
+ if ('success' not in
+ self._http.delete_cmd('api/config/config-agent/account/{}'
+ .format(name))):
+ raise ClientException("failed to delete config agent {}"
+ .format(name))
- def create(self,name,account_type,server,user,secret):
- postdata={}
+ def create(self, name, account_type, server, user, secret):
+ postdata = {}
postdata['account'] = list()
- account={}
+ account = {}
account['name'] = name
account['account-type'] = account_type
account['juju'] = {}
- account['juju']['user'] = user
+ account['juju']['user'] = user
account['juju']['secret'] = secret
account['juju']['ip-address'] = server
postdata['account'].append(account)
- if 'success' not in self._http.post_cmd('api/config/config-agent',postdata):
- raise ClientException("failed to create config agent {}".format(name))
+ if 'success' not in self._http.post_cmd('api/config/config-agent',
+ postdata):
+ raise ClientException("failed to create config agent {}"
+ .format(name))
class Vim(object):
-
- def __init__(self,http=None,ro_http=None,client=None):
- self._client=client
- self._ro_http=ro_http
- self._http=http
-
- def _attach(self,vim_name,username,secret,vim_tenant_name):
- tenant_name='osm'
- tenant=self._get_ro_tenant()
+ def __init__(self, http=None, ro_http=None, client=None):
+ self._client = client
+ self._ro_http = ro_http
+ self._http = http
+
+ def _attach(self, vim_name, username, secret, vim_tenant_name):
+ tenant_name = 'osm'
+ tenant = self._get_ro_tenant()
if tenant is None:
raise ClientException("tenant {} not found".format(tenant_name))
- datacenter= self._get_ro_datacenter(vim_name)
+ datacenter = self._get_ro_datacenter(vim_name)
if datacenter is None:
raise Exception('datacenter {} not found'.format(vim_name))
- vim_account={}
- vim_account['datacenter']={}
+ vim_account = {}
+ vim_account['datacenter'] = {}
- vim_account['datacenter']['vim_username'] = username
- vim_account['datacenter']['vim_password'] = secret
+ vim_account['datacenter']['vim_username'] = username
+ vim_account['datacenter']['vim_password'] = secret
vim_account['datacenter']['vim_tenant_name'] = vim_tenant_name
- return self._ro_http.post_cmd('openmano/{}/datacenters/{}'.format(tenant['uuid'],datacenter['uuid']),vim_account)
+ return self._ro_http.post_cmd('openmano/{}/datacenters/{}'
+ .format(tenant['uuid'],
+ datacenter['uuid']), vim_account)
- def _detach(self,vim_name):
- return self._ro_http.delete_cmd('openmano/{}/datacenters/{}'.format('osm',vim_name))
+ def _detach(self, vim_name):
+ return self._ro_http.delete_cmd('openmano/{}/datacenters/{}'
+ .format('osm', vim_name))
def create(self, name, vim_access):
- vim_account={}
- vim_account['datacenter']={}
+ vim_account = {}
+ vim_account['datacenter'] = {}
# currently assumes vim_acc
- if 'vim-type' not in vim_access or 'openstack' not in vim_access['vim-type']:
+ if ('vim-type' not in vim_access or
+ 'openstack' not in vim_access['vim-type']):
raise Exception("only vim type openstack support")
vim_account['datacenter']['name'] = name
vim_config = {}
vim_config['use_floating_ip'] = False
- if 'floating_ip_pool' in vim_access and vim_access['floating_ip_pool'] is not None:
+ if ('floating_ip_pool' in vim_access and
+ vim_access['floating_ip_pool'] is not None):
vim_config['use_floating_ip'] = True
if 'keypair' in vim_access and vim_access['keypair'] is not None:
vim_account['datacenter']['config'] = vim_config
- resp = self._ro_http.post_cmd('openmano/datacenters',vim_account)
+ resp = self._ro_http.post_cmd('openmano/datacenters', vim_account)
if resp and 'error' in resp:
raise ClientException("failed to create vim")
else:
- self._attach(name,vim_access['os-username'],vim_access['os-password'],vim_access['os-project-name'])
+ self._attach(name,
+ vim_access['os-username'],
+ vim_access['os-password'],
+ vim_access['os-project-name'])
- def delete(self,vim_name):
+ def delete(self, vim_name):
# first detach
- resp=self._detach(vim_name)
- # detach. continue if error, it could be the datacenter is left without attachment
+ self._detach(vim_name)
+ # detach. continue if error,
+ # it could be the datacenter is left without attachment
- if 'result' not in self._ro_http.delete_cmd('openmano/datacenters/{}'.format(vim_name)):
+ if 'result' not in self._ro_http.delete_cmd('openmano/datacenters/{}'
+ .format(vim_name)):
raise ClientException("failed to delete vim {}".format(vim_name))
def list(self):
- resp=self._http.get_cmd('v1/api/operational/datacenters')
+ resp = self._http.get_cmd('v1/api/operational/datacenters')
if not resp or 'rw-launchpad:datacenters' not in resp:
return list()
- datacenters=resp['rw-launchpad:datacenters']
+ datacenters = resp['rw-launchpad:datacenters']
vim_accounts = list()
if 'ro-accounts' not in datacenters:
return vim_accounts
- tenant=self._get_ro_tenant()
+ tenant = self._get_ro_tenant()
if tenant is None:
return vim_accounts
if 'datacenters' not in roaccount:
continue
for datacenter in roaccount['datacenters']:
- vim_accounts.append(self._get_ro_datacenter(datacenter['name'],tenant['uuid']))
+ vim_accounts.append(self._get_ro_datacenter(datacenter['name'],
+ tenant['uuid']))
return vim_accounts
- def _get_ro_tenant(self,name='osm'):
- resp=self._ro_http.get_cmd('openmano/tenants/{}'.format(name))
+ def _get_ro_tenant(self, name='osm'):
+ resp = self._ro_http.get_cmd('openmano/tenants/{}'.format(name))
if not resp:
return None
else:
return None
- def _get_ro_datacenter(self,name,tenant_uuid='any'):
- resp=self._ro_http.get_cmd('openmano/{}/datacenters/{}'.format(tenant_uuid,name))
+ def _get_ro_datacenter(self, name, tenant_uuid='any'):
+ resp = self._ro_http.get_cmd('openmano/{}/datacenters/{}'
+ .format(tenant_uuid, name))
if not resp:
raise NotFound("datacenter {} not found".format(name))
-
+
if 'datacenter' in resp and 'uuid' in resp['datacenter']:
return resp['datacenter']
else:
raise NotFound("datacenter {} not found".format(name))
- def get(self,name):
- tenant=self._get_ro_tenant()
+ def get(self, name):
+ tenant = self._get_ro_tenant()
if tenant is None:
raise NotFound("no ro tenant found")
- return self._get_ro_datacenter(name,tenant['uuid'])
+ return self._get_ro_datacenter(name, tenant['uuid'])
- def get_datacenter(self,name):
- resp=self._http.get_cmd('v1/api/operational/datacenters')
+ def get_datacenter(self, name):
+ resp = self._http.get_cmd('v1/api/operational/datacenters')
if not resp:
return None
- datacenters=resp['rw-launchpad:datacenters']
if not resp or 'rw-launchpad:datacenters' not in resp:
return None
if 'ro-accounts' not in resp['rw-launchpad:datacenters']:
return None
for roaccount in resp['rw-launchpad:datacenters']['ro-accounts']:
- if not 'datacenters' in roaccount:
+ if 'datacenters' not in roaccount:
continue
for datacenter in roaccount['datacenters']:
if datacenter['name'] == name:
return None
def get_resource_orchestrator(self):
- resp=self._http.get_cmd('v1/api/operational/resource-orchestrator')
+ resp = self._http.get_cmd('v1/api/operational/resource-orchestrator')
if not resp or 'rw-launchpad:resource-orchestrator' not in resp:
return None
return resp['rw-launchpad:resource-orchestrator']
OSM vnf API handling
"""
-from osmclient.common.exceptions import NotFound
+from osmclient.common.exceptions import NotFound
class Vnf(object):
- def __init__(self,http=None):
- self._http=http
+ def __init__(self, http=None):
+ self._http = http
def list(self):
resp = self._http.get_cmd('v1/api/operational/vnfr-catalog/vnfr')
if resp and 'vnfr:vnfr' in resp:
return resp['vnfr:vnfr']
- return list()
+ return list()
- def get(self,vnf_name):
- vnfs=self.list()
+ def get(self, vnf_name):
+ vnfs = self.list()
for vnf in vnfs:
if vnf_name == vnf['name']:
return vnf
return vnf
raise NotFound("vnf {} not found".format(vnf_name))
- def get_monitoring(self,vnf_name):
- vnf=self.get(vnf_name)
+ def get_monitoring(self, vnf_name):
+ vnf = self.get(vnf_name)
if vnf and 'monitoring-param' in vnf:
return vnf['monitoring-param']
return None
OSM vnfd API handling
"""
-from osmclient.common.exceptions import NotFound
+from osmclient.common.exceptions import NotFound
from osmclient.common.exceptions import ClientException
class Vnfd(object):
- def __init__(self,http=None):
- self._http=http
+ def __init__(self, http=None):
+ self._http = http
def list(self):
resp = self._http.get_cmd('api/running/vnfd-catalog/vnfd')
return resp['vnfd:vnfd']
return list()
- def get(self,name):
+ def get(self, name):
for vnfd in self.list():
- if name == vnfd['name']:
- return vnfd
+ if name == vnfd['name']:
+ return vnfd
raise NotFound("vnfd {} not found".format(name))
- def delete(self,vnfd_name):
- vnfd=self.get(vnfd_name)
- resp=self._http.delete_cmd('api/running/vnfd-catalog/vnfd/{}'.format(vnfd['id']))
+ def delete(self, vnfd_name):
+ vnfd = self.get(vnfd_name)
+ resp = self._http.delete_cmd('api/running/vnfd-catalog/vnfd/{}'
+ .format(vnfd['id']))
if 'success' not in resp:
raise ClientException("failed to delete vnfd {}".format(vnfd_name))
-from setuptools import setup,find_packages
+from setuptools import setup, find_packages
setup(
name='osmclient',
packages=find_packages(),
include_package_data=True,
install_requires=[
- 'Click','prettytable'
+ 'Click', 'prettytable', 'pyyaml', 'pycurl'
],
entry_points='''
[console_scripts]
[DEFAULT]
-Depends: python-setuptools, python-pycurl, python-click, python-prettytable
+Depends: python-setuptools, python-pycurl, python-click, python-prettytable, python-yaml
--- /dev/null
+flake8
+mock
+stdeb