echo "POST INSTALL OSMCLIENT"
#Install pyangbind, required for python3-osm-im
-python3 -m pip install pyangbind
+python3 -m pip install pyangbind verboselogs
#configure autocomplete for osmclient
[ -z "$SUDO_USER" ] && SUDO_USER="$USER"
su $SUDO_USER -c 'mkdir -p $HOME/.bash_completion.d'
from osmclient.v1 import client as client
from osmclient.sol005 import client as sol005client
+import logging
+import verboselogs
+verboselogs.install()
def Client(version=1, host=None, sol005=True, *args, **kwargs):
+ log_format_simple = "%(levelname)s %(message)s"
+ log_format_complete = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(funcName)s(): %(message)s"
+ log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
+ handler = logging.StreamHandler()
+ handler.setFormatter(log_formatter_simple)
+ logger = logging.getLogger('osmclient')
+ logger.setLevel(level=logging.WARNING)
+ logger.addHandler(handler)
+ verbose = kwargs.get('verbose',0)
+ if verbose>0:
+ log_formatter = logging.Formatter(log_format_complete, datefmt='%Y-%m-%dT%H:%M:%S')
+ #handler = logging.StreamHandler()
+ handler.setFormatter(log_formatter)
+ #logger.addHandler(handler)
+ if verbose==1:
+ logger.setLevel(level=logging.INFO)
+ elif verbose==2:
+ logger.setLevel(level=logging.VERBOSE)
+ elif verbose>2:
+ logger.setLevel(level=logging.DEBUG)
if not sol005:
if version == 1:
return client.Client(host, *args, **kwargs)
curl_cmd = self._get_curl_cmd(endpoint)
curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint))
curl_cmd.perform()
+ http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
curl_cmd.close()
if data.getvalue():
+ self._logger.debug("Response DATA: {}".format(json.loads(data.getvalue().decode())))
return json.loads(data.getvalue().decode())
return None
curl_cmd = self._get_curl_cmd(endpoint)
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint))
curl_cmd.perform()
+ http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
curl_cmd.close()
if data.getvalue():
+ self._logger.debug("Response DATA: {}".format(json.loads(data.getvalue().decode())))
return json.loads(data.getvalue().decode())
return None
(pycurl.FORM_FILE,
formfile[1])))])
+ self._logger.info("Request METHOD: {} URL: {}".format("POST",self._url + endpoint))
curl_cmd.perform()
+ http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
curl_cmd.close()
if data.getvalue():
+ self._logger.debug("Response DATA: {}".format(json.loads(data.getvalue().decode())))
return json.loads(data.getvalue().decode())
return None
import os
import textwrap
import pkg_resources
+import logging
+# Global variables
+
+CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'], max_content_width=160)
+
def wrap_text(text, width):
wrapper = textwrap.TextWrapper(width=width)
lines = text.splitlines()
:return: -
:raises ClientError: if the specified version does not match the client version
"""
+ logger.debug("")
fullclassname = obj.__module__ + "." + obj.__class__.__name__
message = 'The following commands or options are only supported with the option "--sol005": {}'.format(what)
if version == 'v1':
return
-CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'], max_content_width=160)
-
-@click.group(context_settings=CONTEXT_SETTINGS)
+@click.group(context_settings=dict(help_option_names=['-h', '--help'], max_content_width=160))
@click.option('--hostname',
default="127.0.0.1",
envvar='OSM_HOSTNAME',
envvar='OSM_PROJECT',
help='project (defaults to admin). ' +
'Also can set OSM_PROJECT in environment')
+@click.option('-v', '--verbose', count=True,
+ help='increase verbosity (-v INFO, -vv VERBOSE, -vvv DEBUG)')
#@click.option('--so-port',
# default=None,
# envvar='OSM_SO_PORT',
# help='hostname of RO server. ' +
# 'Also can set OSM_RO_PORT in environment')
@click.pass_context
-def cli_osm(ctx, hostname, user, password, project):
+def cli_osm(ctx, hostname, user, password, project, verbose):
+ global logger
if hostname is None:
print((
"either hostname option or OSM_HOSTNAME " +
"environment variable needs to be specified"))
exit(1)
- kwargs={}
+ kwargs = {'verbose': verbose}
# if so_port is not None:
# kwargs['so_port']=so_port
# if so_project is not None:
if project is not None:
kwargs['project']=project
ctx.obj = client.Client(host=hostname, sol005=sol005, **kwargs)
+ logger = logging.getLogger('osmclient')
####################
--filter nsd.vendor=<VENDOR>&nsd-ref=<NSD_NAME>
--filter nsd.constituent-vnfd.vnfd-id-ref=<VNFD_NAME>
"""
+ logger.debug("")
if filter:
check_client_version(ctx.obj, '--filter')
resp = ctx.obj.ns.list(filter)
def nsd_list(ctx, filter):
+ logger.debug("")
if filter:
check_client_version(ctx.obj, '--filter')
resp = ctx.obj.nsd.list(filter)
@click.pass_context
def nsd_list1(ctx, filter):
"""list all NSD/NS pkg in the system"""
+ logger.debug("")
nsd_list(ctx, filter)
@click.pass_context
def nsd_list2(ctx, filter):
"""list all NS packages"""
+ logger.debug("")
nsd_list(ctx, filter)
def vnfd_list(ctx, nf_type, filter):
+ logger.debug("")
if nf_type:
check_client_version(ctx.obj, '--nf_type')
elif filter:
@click.pass_context
def vnfd_list1(ctx, nf_type, filter):
"""list all xNF packages (VNF, HNF, PNF)"""
+ logger.debug("")
vnfd_list(ctx, nf_type, filter)
@click.pass_context
def vnfd_list2(ctx, nf_type, filter):
"""list all xNF packages (VNF, HNF, PNF)"""
+ logger.debug("")
vnfd_list(ctx, nf_type, filter)
@click.pass_context
def nfpkg_list(ctx, nf_type, filter):
"""list all xNF packages (VNF, HNF, PNF)"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
vnfd_list(ctx, nf_type, filter)
@click.pass_context
def vnf_list1(ctx, ns, filter):
"""list all NF instances"""
+ logger.debug("")
vnf_list(ctx, ns, filter)
--filter vdur.ip-address=<IP_ADDRESS>
--filter vnfd-ref=<VNFD_NAME>,vdur.ip-address=<IP_ADDRESS>
"""
+ logger.debug("")
vnf_list(ctx, ns, filter)
NAME: name or ID of the NS instance
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.ns.list_op(name)
def nsi_list(ctx, filter):
"""list all Network Slice Instances"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.nsi.list(filter)
@click.pass_context
def nsi_list1(ctx, filter):
"""list all Network Slice Instances (NSI)"""
+ logger.debug("")
nsi_list(ctx, filter)
@click.pass_context
def nsi_list2(ctx, filter):
"""list all Network Slice Instances (NSI)"""
+ logger.debug("")
nsi_list(ctx, filter)
def nst_list(ctx, filter):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.nst.list(filter)
@click.pass_context
def nst_list1(ctx, filter):
"""list all Network Slice Templates (NST) in the system"""
+ logger.debug("")
nst_list(ctx, filter)
@click.pass_context
def nst_list2(ctx, filter):
"""list all Network Slice Templates (NST) in the system"""
+ logger.debug("")
nst_list(ctx, filter)
def nsi_op_list(ctx, name):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.nsi.list_op(name)
NAME: name or ID of the Network Slice Instance
"""
+ logger.debug("")
nsi_op_list(ctx, name)
NAME: name or ID of the Network Slice Instance
"""
+ logger.debug("")
nsi_op_list(ctx, name)
@click.pass_context
def pdu_list(ctx, filter):
"""list all Physical Deployment Units (PDU)"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.pdu.list(filter)
####################
def nsd_show(ctx, name, literal):
+ logger.debug("")
# try:
resp = ctx.obj.nsd.get(name)
# resp = ctx.obj.nsd.get_individual(name)
NAME: name or ID of the NSD/NSpkg
"""
+ logger.debug("")
nsd_show(ctx, name, literal)
NAME: name or ID of the NSD/NSpkg
"""
+ logger.debug("")
nsd_show(ctx, name, literal)
def vnfd_show(ctx, name, literal):
+ logger.debug("")
# try:
resp = ctx.obj.vnfd.get(name)
# resp = ctx.obj.vnfd.get_individual(name)
NAME: name or ID of the VNFD/VNFpkg
"""
+ logger.debug("")
vnfd_show(ctx, name, literal)
NAME: name or ID of the VNFD/VNFpkg
"""
+ logger.debug("")
vnfd_show(ctx, name, literal)
NAME: name or ID of the NFpkg
"""
+ logger.debug("")
vnfd_show(ctx, name, literal)
NAME: name or ID of the NS instance
"""
+ logger.debug("")
# try:
ns = ctx.obj.ns.get(name)
# except ClientException as e:
NAME: name or ID of the VNF instance
"""
+ logger.debug("")
if kdu:
if literal:
raise ClientException('"--literal" option is incompatible with "--kdu" option')
ID: operation identifier
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
op_info = ctx.obj.ns.get_op(id)
def nst_show(ctx, name, literal):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.nst.get(name)
NAME: name or ID of the NST
"""
+ logger.debug("")
nst_show(ctx, name, literal)
NAME: name or ID of the NST
"""
+ logger.debug("")
nst_show(ctx, name, literal)
def nsi_show(ctx, name, literal, filter):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
nsi = ctx.obj.nsi.get(name)
NAME: name or ID of the Network Slice Instance
"""
+ logger.debug("")
nsi_show(ctx, name, literal, filter)
NAME: name or ID of the Network Slice Instance
"""
+ logger.debug("")
nsi_show(ctx, name, literal, filter)
def nsi_op_show(ctx, id, filter):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
op_info = ctx.obj.nsi.get_op(id)
ID: operation identifier
"""
+ logger.debug("")
nsi_op_show(ctx, id, filter)
ID: operation identifier
"""
+ logger.debug("")
nsi_op_show(ctx, id, filter)
NAME: name or ID of the PDU
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
pdu = ctx.obj.pdu.get(name)
####################
def nsd_create(ctx, filename, overwrite):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.nsd.create(filename, overwrite)
FILENAME: NSD yaml file or NSpkg tar.gz file
"""
+ logger.debug("")
nsd_create(ctx, filename, overwrite)
FILENAME: NSD yaml file or NSpkg tar.gz file
"""
+ logger.debug("")
nsd_create(ctx, filename, overwrite)
def vnfd_create(ctx, filename, overwrite):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.vnfd.create(filename, overwrite)
FILENAME: VNFD yaml file or VNFpkg tar.gz file
"""
+ logger.debug("")
vnfd_create(ctx, filename, overwrite)
FILENAME: VNFD yaml file or VNFpkg tar.gz file
"""
+ logger.debug("")
vnfd_create(ctx, filename, overwrite)
FILENAME: NF Descriptor yaml file or NFpkg tar.gz file
"""
+ logger.debug("")
vnfd_create(ctx, filename, overwrite)
config_file,
wait):
"""creates a new NS instance"""
+ logger.debug("")
# try:
if config_file:
check_client_version(ctx.obj, '--config_file')
def nst_create(ctx, filename, overwrite):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.nst.create(filename, overwrite)
FILENAME: NST yaml file or NSTpkg tar.gz file
"""
+ logger.debug("")
nst_create(ctx, filename, overwrite)
FILENAME: NST yaml file or NSTpkg tar.gz file
"""
+ logger.debug("")
nst_create(ctx, filename, overwrite)
def nsi_create(ctx, nst_name, nsi_name, vim_account, ssh_keys, config, config_file, wait):
"""creates a new Network Slice Instance (NSI)"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
if config_file:
@click.pass_context
def nsi_create1(ctx, nst_name, nsi_name, vim_account, ssh_keys, config, config_file, wait):
"""creates a new Network Slice Instance (NSI)"""
+ logger.debug("")
nsi_create(ctx, nst_name, nsi_name, vim_account, ssh_keys, config, config_file, wait=wait)
@click.pass_context
def nsi_create2(ctx, nst_name, nsi_name, vim_account, ssh_keys, config, config_file, wait):
"""creates a new Network Slice Instance (NSI)"""
+ logger.debug("")
nsi_create(ctx, nst_name, nsi_name, vim_account, ssh_keys, config, config_file, wait=wait)
@click.pass_context
def pdu_create(ctx, name, pdu_type, interface, description, vim_account, descriptor_file):
"""creates a new Physical Deployment Unit (PDU)"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
pdu = {}
# print(str(e))
# exit(1)
+
####################
# UPDATE operations
####################
def nsd_update(ctx, name, content):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.nsd.update(name, content)
NAME: name or ID of the NSD/NSpkg
"""
+ logger.debug("")
nsd_update(ctx, name, content)
NAME: name or ID of the NSD/NSpkg
"""
+ logger.debug("")
nsd_update(ctx, name, content)
def vnfd_update(ctx, name, content):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.vnfd.update(name, content)
NAME: name or ID of the VNFD/VNFpkg
"""
+ logger.debug("")
vnfd_update(ctx, name, content)
NAME: VNFD yaml file or VNFpkg tar.gz file
"""
+ logger.debug("")
vnfd_update(ctx, name, content)
NAME: NF Descriptor yaml file or NFpkg tar.gz file
"""
+ logger.debug("")
vnfd_update(ctx, name, content)
def nst_update(ctx, name, content):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.nst.update(name, content)
NAME: name or ID of the NSD/NSpkg
"""
+ logger.debug("")
nst_update(ctx, name, content)
NAME: name or ID of the NSD/NSpkg
"""
+ logger.debug("")
nst_update(ctx, name, content)
####################
def nsd_delete(ctx, name, force):
+ logger.debug("")
# try:
if not force:
ctx.obj.nsd.delete(name)
NAME: name or ID of the NSD/NSpkg to be deleted
"""
+ logger.debug("")
nsd_delete(ctx, name, force)
NAME: name or ID of the NSD/NSpkg to be deleted
"""
+ logger.debug("")
nsd_delete(ctx, name, force)
def vnfd_delete(ctx, name, force):
+ logger.debug("")
# try:
if not force:
ctx.obj.vnfd.delete(name)
NAME: name or ID of the VNFD/VNFpkg to be deleted
"""
+ logger.debug("")
vnfd_delete(ctx, name, force)
NAME: name or ID of the VNFD/VNFpkg to be deleted
"""
+ logger.debug("")
vnfd_delete(ctx, name, force)
NAME: name or ID of the NFpkg to be deleted
"""
+ logger.debug("")
vnfd_delete(ctx, name, force)
NAME: name or ID of the NS instance to be deleted
"""
+ logger.debug("")
# try:
if not force:
ctx.obj.ns.delete(name, wait=wait)
def nst_delete(ctx, name, force):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.nst.delete(name, force)
NAME: name or ID of the NST/NSTpkg to be deleted
"""
+ logger.debug("")
nst_delete(ctx, name, force)
NAME: name or ID of the NST/NSTpkg to be deleted
"""
+ logger.debug("")
nst_delete(ctx, name, force)
def nsi_delete(ctx, name, force, wait):
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.nsi.delete(name, force, wait=wait)
NAME: name or ID of the Network Slice instance to be deleted
"""
+ logger.debug("")
nsi_delete(ctx, name, force, wait=wait)
NAME: name or ID of the Network Slice instance to be deleted
"""
+ logger.debug("")
nsi_delete(ctx, name, force, wait=wait)
NAME: name or ID of the PDU to be deleted
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.pdu.delete(name, force)
sdn_port_mapping,
wait):
"""creates a new VIM account"""
+ logger.debug("")
# try:
if sdn_controller:
check_client_version(ctx.obj, '--sdn_controller')
NAME: name or ID of the VIM account
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
vim = {}
NAME: name or ID of the VIM account to be deleted
"""
+ logger.debug("")
# try:
if not force:
ctx.obj.vim.delete(name, wait=wait)
@click.pass_context
def vim_list(ctx, filter):
"""list all VIM accounts"""
+ logger.debug("")
if filter:
check_client_version(ctx.obj, '--filter')
# if ro_update:
NAME: name or ID of the VIM account
"""
+ logger.debug("")
# try:
resp = ctx.obj.vim.get(name)
if 'vim_password' in resp:
wim_port_mapping,
wait):
"""creates a new WIM account"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
# if sdn_controller:
NAME: name or ID of the WIM account
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
wim = {}
NAME: name or ID of the WIM account to be deleted
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.wim.delete(name, force, wait=wait)
@click.pass_context
def wim_list(ctx, filter):
"""list all WIM accounts"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.wim.list(filter)
NAME: name or ID of the WIM account
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.wim.get(name)
@click.pass_context
def sdnc_create(ctx, **kwargs):
"""creates a new SDN controller"""
+ logger.debug("")
sdncontroller = {x: kwargs[x] for x in kwargs if kwargs[x] and
x not in ("wait", "ip_address", "port", "switch_dpid")}
if kwargs.get("port"):
NAME: name or ID of the SDN controller
"""
+ logger.debug("")
sdncontroller = {x: kwargs[x] for x in kwargs if kwargs[x] and
x not in ("wait", "ip_address", "port", "switch_dpid", "new_name")}
if kwargs.get("newname"):
NAME: name or ID of the SDN controller to be deleted
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.sdnc.delete(name, force, wait=wait)
@click.pass_context
def sdnc_list(ctx, filter):
"""list all SDN controllers"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.sdnc.list(filter)
NAME: name or ID of the SDN controller
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.sdnc.get(name)
NAME: name of the project
"""
+ logger.debug("")
project = {}
project['name'] = name
# try:
NAME: name or ID of the project to be deleted
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.project.delete(name)
@click.pass_context
def project_list(ctx, filter):
"""list all projects"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.project.list(filter)
NAME: name or ID of the project
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.project.get(name)
:param name: new name for the project
:return:
"""
-
+ logger.debug("")
project_changes = {}
project_changes['name'] = name
PROJECTS: projects assigned to user (internal only)
PROJECT_ROLE_MAPPING: roles in projects assigned to user (keystone)
"""
+ logger.debug("")
user = {}
user['username'] = username
user['password'] = password
ADD_PROJECT_ROLE: adding mappings for project/role(s)
REMOVE_PROJECT_ROLE: removing mappings for project/role(s)
"""
+ logger.debug("")
user = {}
user['password'] = password
user['username'] = set_username
\b
NAME: name or ID of the user to be deleted
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.user.delete(name)
NAME: name or ID of the user
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.user.get(name)
"""creates a new alarm for a NS instance"""
# TODO: Check how to validate threshold_value.
# Should it be an integer (1-100), percentage, or decimal (0.01-1.00)?
+ logger.debug("")
# try:
ns_instance = ctx.obj.ns.get(ns)
alarm = {}
"""exports a metric to the internal OSM bus, which can be read by other apps"""
# TODO: Check how to validate interval.
# Should it be an integer (seconds), or should a suffix (s,m,h,d,w) also be permitted?
+ logger.debug("")
# try:
ns_instance = ctx.obj.ns.get(ns)
metric_data = {}
FILENAME: VNF or NS package file (tar.gz)
"""
+ logger.debug("")
# try:
ctx.obj.package.upload(filename)
fullclassname = ctx.obj.__module__ + "." + ctx.obj.__class__.__name__
NS_NAME: name or ID of the NS instance
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
op_data = {}
NS_NAME: name or ID of the NS instance.
VNF_NAME: member-vnf-index in the NS to be scaled.
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
if not scale_in and not scale_out:
NAME: Name or ID of the role.
DEFINITION: Definition of grant/denial of access to resources.
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.role.create(name, permissions)
ADD: Grant/denial of access to resource to add.
REMOVE: Grant/denial of access to resource to remove.
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.role.update(name, set_name, None, add, remove)
\b
NAME: Name or ID of the role.
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
ctx.obj.role.delete(name)
"""
List all roles.
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.role.list(filter)
\b
NAME: Name or ID of the role.
"""
+ logger.debug("")
# try:
check_client_version(ctx.obj, ctx.command.name)
resp = ctx.obj.role.get(name)
if __name__ == '__main__':
cli()
+
from osmclient.common.exceptions import ClientException
from osmclient.common import package_tool
import json
+import logging
class Client(object):
self._user = user
self._password = password
self._project = project
+ self._logger = logging.getLogger('osmclient')
self._auth_endpoint = '/admin/v1/tokens'
self._headers = {}
self._token = None
'''
def get_token(self):
+ self._logger.debug("")
if self._token is None:
postfields_dict = {'username': self._user,
'password': self._password,
from io import BytesIO
import pycurl
import json
+import logging
+import copy
from osmclient.common import http
+
class Http(http.Http):
def __init__(self, url, user='admin', password='admin'):
self._user = user
self._password = password
self._http_header = None
+ self._logger = logging.getLogger('osmclient')
def _get_curl_cmd(self, endpoint):
+ self._logger.debug("")
curl_cmd = pycurl.Curl()
- #print(self._url + endpoint)
+ if self._logger.getEffectiveLevel() == logging.DEBUG:
+ curl_cmd.setopt(pycurl.VERBOSE, True)
curl_cmd.setopt(pycurl.URL, self._url + endpoint)
curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
return curl_cmd
def delete_cmd(self, endpoint):
+ self._logger.debug("")
data = BytesIO()
curl_cmd = self._get_curl_cmd(endpoint)
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint))
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
- #print('HTTP_CODE: {}'.format(http_code))
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
curl_cmd.close()
# TODO 202 accepted should be returned somehow
if data.getvalue():
+ self._logger.verbose("Response DATA: {}".format(json.loads(data.getvalue().decode())))
return http_code, data.getvalue().decode()
else:
return http_code, None
def send_cmd(self, endpoint='', postfields_dict=None,
formfile=None, filename=None,
put_method=False, patch_method=False):
+ self._logger.debug("")
data = BytesIO()
curl_cmd = self._get_curl_cmd(endpoint)
if put_method:
if postfields_dict is not None:
jsondata = json.dumps(postfields_dict)
+ if 'password' in postfields_dict:
+ postfields_dict_copy = copy.deepcopy(postfields_dict)
+ postfields_dict_copy['password']='******'
+ jsondata_log = json.dumps(postfields_dict_copy)
+ else:
+ jsondata_log = jsondata
+ self._logger.verbose("Request POSTFIELDS: {}".format(jsondata_log))
curl_cmd.setopt(pycurl.POSTFIELDS, jsondata)
elif formfile is not None:
curl_cmd.setopt(
elif filename is not None:
with open(filename, 'rb') as stream:
postdata=stream.read()
+ self._logger.verbose("Request POSTFIELDS: Binary content")
curl_cmd.setopt(pycurl.POSTFIELDS, postdata)
+ if put_method:
+ self._logger.info("Request METHOD: {} URL: {}".format("PUT",self._url + endpoint))
+ elif patch_method:
+ self._logger.info("Request METHOD: {} URL: {}".format("PATCH",self._url + endpoint))
+ else:
+ self._logger.info("Request METHOD: {} URL: {}".format("POST",self._url + endpoint))
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
curl_cmd.close()
if data.getvalue():
+ self._logger.verbose("Response DATA: {}".format(json.loads(data.getvalue().decode())))
return http_code, data.getvalue().decode()
else:
return http_code, None
def post_cmd(self, endpoint='', postfields_dict=None,
formfile=None, filename=None):
+ self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
formfile=formfile,
def put_cmd(self, endpoint='', postfields_dict=None,
formfile=None, filename=None):
+ self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
formfile=formfile,
def patch_cmd(self, endpoint='', postfields_dict=None,
formfile=None, filename=None):
+ self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
formfile=formfile,
put_method=False, patch_method=True)
def get2_cmd(self, endpoint):
+ self._logger.debug("")
data = BytesIO()
curl_cmd = self._get_curl_cmd(endpoint)
curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
+ self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint))
curl_cmd.perform()
http_code = curl_cmd.getinfo(pycurl.HTTP_CODE)
+ self._logger.info("Response HTTPCODE: {}".format(http_code))
curl_cmd.close()
if data.getvalue():
+ self._logger.debug("Response DATA: {}".format(json.loads(data.getvalue().decode())))
return http_code, data.getvalue().decode()
return http_code, None
from osmclient.common.exceptions import NotFound
import yaml
import json
+import logging
class Ns(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/nslcm'
self._apiVersion = '/v1'
self._apiResource = '/ns_instances_content'
# NS '--wait' option
def _wait(self, id, deleteFlag=False):
+ self._logger.debug("")
# Endpoint to get operation status
apiUrlStatus = '{}{}{}'.format(self._apiName, self._apiVersion, '/ns_lcm_op_occs')
# Wait for status for NS instance creation/update/deletion
def list(self, filter=None):
"""Returns a list of NS
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns an NS based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for ns in self.list():
raise NotFound("ns {} not found".format(name))
def get_individual(self, name):
+ self._logger.debug("")
self._client.get_token()
ns_id = name
if not utils.validate_uuid4(name):
raise NotFound("ns {} not found".format(name))
def delete(self, name, force=False, wait=False):
+ self._logger.debug("")
ns = self.get(name)
querystring = ''
if force:
def create(self, nsd_name, nsr_name, account, config=None,
ssh_keys=None, description='default description',
admin_status='ENABLED', wait=False):
+ self._logger.debug("")
self._client.get_token()
nsd = self._client.nsd.get(nsd_name)
wim_account_id = {}
def get_vim_account_id(vim_account):
+ self._logger.debug("")
if vim_account_id.get(vim_account):
return vim_account_id[vim_account]
return vim['_id']
def get_wim_account_id(wim_account):
+ self._logger.debug("")
if not isinstance(wim_account, str):
return wim_account
if wim_account_id.get(wim_account):
def list_op(self, name, filter=None):
"""Returns the list of operations of a NS
"""
+ self._logger.debug("")
ns = self.get(name)
try:
self._apiResource = '/ns_lcm_op_occs'
def get_op(self, operationId):
"""Returns the status of an operation
"""
+ self._logger.debug("")
self._client.get_token()
try:
self._apiResource = '/ns_lcm_op_occs'
def exec_op(self, name, op_name, op_data=None, wait=False, ):
"""Executes an operation on a NS
"""
+ self._logger.debug("")
+ ns = self.get(name)
try:
ns = self.get(name)
self._apiResource = '/ns_instances'
def scale_vnf(self, ns_name, vnf_name, scaling_group, scale_in, scale_out, wait=False):
"""Scales a VNF by adding/removing VDUs
"""
+ self._logger.debug("")
self._client.get_token()
try:
op_data={}
raise ClientException(message)
def create_alarm(self, alarm):
+ self._logger.debug("")
self._client.get_token()
data = {}
data["create_alarm_request"] = {}
raise ClientException(message)
def delete_alarm(self, name):
+ self._logger.debug("")
self._client.get_token()
data = {}
data["delete_alarm_request"] = {}
raise ClientException(message)
def export_metric(self, metric):
+ self._logger.debug("")
self._client.get_token()
data = {}
data["read_metric_data_request"] = metric
raise ClientException(message)
def get_field(self, ns_name, field):
+ self._logger.debug("")
nsr = self.get(ns_name)
print(yaml.safe_dump(nsr))
if nsr is None:
return nsr[field]
raise NotFound("failed to find {} in ns {}".format(field, ns_name))
+
import json
import magic
from os.path import basename
+import logging
#from os import stat
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/nsd'
self._apiVersion = '/v1'
self._apiResource = '/ns_descriptors'
#self._apiBase='/nsds'
def list(self, filter=None):
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
return list()
def get(self, name):
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for nsd in self.list():
raise NotFound("nsd {} not found".format(name))
def get_individual(self, name):
- # Called to get_token not required, because will be implicitly called by get.
+ self._logger.debug("")
+ # Call to get_token not required, because will be implicitly called by get.
nsd = self.get(name)
# It is redundant, since the previous one already gets the whole nsdinfo
# The only difference is that a different primitive is exercised
raise NotFound("nsd {} not found".format(name))
def get_thing(self, name, thing, filename):
+ self._logger.debug("")
+ # Call to get_token not required, because will be implicitly called by get.
nsd = self.get(name)
headers = self._client._headers
headers['Accept'] = 'application/binary'
raise ClientException("failed to get {} from {} - {}".format(thing, name, msg))
def get_descriptor(self, name, filename):
+ self._logger.debug("")
self.get_thing(name, 'nsd', filename)
def get_package(self, name, filename):
+ self._logger.debug("")
self.get_thing(name, 'package_content', filename)
def get_artifact(self, name, artifact, filename):
+ self._logger.debug("")
self.get_thing(name, 'artifacts/{}'.format(artifact), filename)
def delete(self, name, force=False):
+ self._logger.debug("")
nsd = self.get(name)
querystring = ''
if force:
raise ClientException("failed to delete nsd {} - {}".format(name, msg))
def create(self, filename, overwrite=None, update_endpoint=None):
+ self._logger.debug("")
self._client.get_token()
mime_type = magic.from_file(filename, mime=True)
if mime_type is None:
raise ClientException("failed to create/update nsd - {}".format(msg))
def update(self, name, filename):
+ self._logger.debug("")
nsd = self.get(name)
endpoint = '{}/{}/nsd_content'.format(self._apiBase, nsd['_id'])
self.create(filename=filename, update_endpoint=endpoint)
from osmclient.common.exceptions import NotFound
import yaml
import json
+import logging
class Nsi(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/nsilcm'
self._apiVersion = '/v1'
self._apiResource = '/netslice_instances_content'
# NSI '--wait' option
def _wait(self, id, deleteFlag=False):
+ self._logger.debug("")
self._client.get_token()
# Endpoint to get operation status
apiUrlStatus = '{}{}{}'.format(self._apiName, self._apiVersion, '/nsi_lcm_op_occs')
def list(self, filter=None):
"""Returns a list of NSI
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns an NSI based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for nsi in self.list():
raise NotFound("nsi {} not found".format(name))
def get_individual(self, name):
+ self._logger.debug("")
nsi_id = name
self._client.get_token()
if not utils.validate_uuid4(name):
raise NotFound("nsi {} not found".format(name))
def delete(self, name, force=False, wait=False):
+ self._logger.debug("")
nsi = self.get(name)
querystring = ''
if force:
ssh_keys=None, description='default description',
admin_status='ENABLED', wait=False):
+ self._logger.debug("")
self._client.get_token()
nst = self._client.nst.get(nst_name)
vim_account_id = {}
def get_vim_account_id(vim_account):
+ self._logger.debug("")
if vim_account_id.get(vim_account):
return vim_account_id[vim_account]
def list_op(self, name, filter=None):
"""Returns the list of operations of a NSI
"""
+ self._logger.debug("")
nsi = self.get(name)
try:
self._apiResource = '/nsi_lcm_op_occs'
def get_op(self, operationId):
"""Returns the status of an operation
"""
+ self._logger.debug("")
self._client.get_token()
try:
self._apiResource = '/nsi_lcm_op_occs'
def exec_op(self, name, op_name, op_data=None):
"""Executes an operation on a NSI
"""
+ self._logger.debug("")
nsi = self.get(name)
try:
self._apiResource = '/netslice_instances'
from osmclient.common import utils
import json
import magic
+import logging
#from os import stat
#from os.path import basename
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/nst'
self._apiVersion = '/v1'
self._apiResource = '/netslice_templates'
self._apiVersion, self._apiResource)
def list(self, filter=None):
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
return list()
def get(self, name):
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for nst in self.list():
raise NotFound("nst {} not found".format(name))
def get_individual(self, name):
+ self._logger.debug("")
nst = self.get(name)
# It is redundant, since the previous one already gets the whole nstinfo
# The only difference is that a different primitive is exercised
raise NotFound("nst {} not found".format(name))
def get_thing(self, name, thing, filename):
+ self._logger.debug("")
nst = self.get(name)
headers = self._client._headers
headers['Accept'] = 'application/binary'
raise ClientException("failed to get {} from {} - {}".format(thing, name, msg))
def get_descriptor(self, name, filename):
+ self._logger.debug("")
self.get_thing(name, 'nst', filename)
def get_package(self, name, filename):
+ self._logger.debug("")
self.get_thing(name, 'nst_content', filename)
def get_artifact(self, name, artifact, filename):
+ self._logger.debug("")
self.get_thing(name, 'artifacts/{}'.format(artifact), filename)
def delete(self, name, force=False):
+ self._logger.debug("")
nst = self.get(name)
querystring = ''
if force:
raise ClientException("failed to delete nst {} - {}".format(name, msg))
def create(self, filename, overwrite=None, update_endpoint=None):
+ self._logger.debug("")
self._client.get_token()
mime_type = magic.from_file(filename, mime=True)
if mime_type is None:
raise ClientException("failed to create/update nst - {}".format(msg))
def update(self, name, filename):
+ self._logger.debug("")
nst = self.get(name)
endpoint = '{}/{}/nst_content'.format(self._apiBase, nst['_id'])
self.create(filename=filename, update_endpoint=endpoint)
from osmclient.common.exceptions import NotFound
from osmclient.common import utils
import json
+import logging
class Package(object):
def __init__(self, http=None, client=None):
self._client = client
self._http = http
+ self._logger = logging.getLogger('osmclient')
def get_key_val_from_pkg(self, descriptor_file):
+ self._logger.debug("")
return utils.get_key_val_from_pkg(descriptor_file)
def _wait_for_package(self, pkg_type):
+ self._logger.debug("")
if 'vnfd' in pkg_type['type']:
get_method = self._client.vnfd.get
elif 'nsd' in pkg_type['type']:
# helper method to check if pkg exists
def check_exists(func):
+ self._logger.debug("")
try:
func()
except NotFound:
"""wait(block) for an upload to succeed.
The filename passed is assumed to be a descriptor tarball.
"""
+ self._logger.debug("")
self._client.get_token()
pkg_type = utils.get_key_val_from_pkg(filename)
.format(filename))
def upload(self, filename):
+ self._logger.debug("")
self._client.get_token()
pkg_type = utils.get_key_val_from_pkg(filename)
if pkg_type is None:
from osmclient.common.exceptions import ClientException
from osmclient.common import utils
import json
+import logging
class Pdu(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/pdu'
self._apiVersion = '/v1'
self._apiResource = '/pdu_descriptors'
self._apiVersion, self._apiResource)
def list(self, filter=None):
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
return list()
def get(self, name):
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for pdud in self.list():
raise NotFound("pdud {} not found".format(name))
def get_individual(self, name):
+ self._logger.debug("")
pdud = self.get(name)
# It is redundant, since the previous one already gets the whole pdudInfo
# The only difference is that a different primitive is exercised
raise NotFound("pdu {} not found".format(name))
def delete(self, name, force=False):
+ self._logger.debug("")
pdud = self.get(name)
querystring = ''
if force:
raise ClientException("failed to delete pdu {} - {}".format(name, msg))
def create(self, pdu, update_endpoint=None):
+ self._logger.debug("")
self._client.get_token()
headers= self._client._headers
headers['Content-Type'] = 'application/yaml'
raise ClientException("failed to create/update pdu - {}".format(msg))
def update(self, name, filename):
+ self._logger.debug("")
pdud = self.get(name)
endpoint = '{}/{}'.format(self._apiBase, pdud['_id'])
self.create(filename=filename, update_endpoint=endpoint)
from osmclient.common.exceptions import ClientException
from osmclient.common.exceptions import NotFound
import json
+import logging
class Project(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/admin'
self._apiVersion = '/v1'
self._apiResource = '/projects'
def create(self, name, project):
"""Creates a new OSM project
"""
+ self._logger.debug("")
self._client.get_token()
http_code, resp = self._http.post_cmd(endpoint=self._apiBase,
postfields_dict=project)
def update(self, project, project_changes):
"""Updates an OSM project identified by name
"""
+ self._logger.debug("")
self._client.get_token()
proj = self.get(project)
http_code, resp = self._http.patch_cmd(endpoint='{}/{}'.format(self._apiBase, proj['_id']),
def delete(self, name, force=False):
"""Deletes an OSM project identified by name
"""
+ self._logger.debug("")
self._client.get_token()
project = self.get(name)
querystring = ''
def list(self, filter=None):
"""Returns the list of OSM projects
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns a specific OSM project based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for proj in self.list():
return proj
raise NotFound("Project {} not found".format(name))
-
from osmclient.common.exceptions import NotFound
import json
import yaml
+import logging
class Role(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/admin'
self._apiVersion = '/v1'
self._apiResource = '/roles'
:raises ClientException: when receives an unexpected from the server.
:raises ClientException: when fails creating a role.
"""
+ self._logger.debug("")
self._client.get_token()
role = {"name": name}
:raises ClientException: when receives an unexpected response from the server.
:raises ClientException: when fails updating a role.
"""
+ self._logger.debug("")
self._client.get_token()
if new_name is None and permissions is None and add is None and remove is None:
raise ClientException('At least one option should be provided')
:param force:
:raises ClientException: when fails to delete a role.
"""
+ self._logger.debug("")
self._client.get_token()
role = self.get(name)
querystring = ''
:param filter:
:returns:
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
:raises NotFound: when the role is not found.
:returns: the specified role.
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for role in self.list():
if name == role['name']:
return role
raise NotFound("Role {} not found".format(name))
+
from osmclient.common.exceptions import ClientException
from osmclient.common.exceptions import NotFound
import json
+import logging
class SdnController(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/admin'
self._apiVersion = '/v1'
self._apiResource = '/sdns'
# SDNC '--wait' option
def _wait(self, id, deleteFlag=False):
+ self._logger.debug("")
self._client.get_token()
# Endpoint to get operation status
apiUrlStatus = '{}{}{}'.format(self._apiName, self._apiVersion, '/sdns')
deleteFlag=deleteFlag)
def _get_id_for_wait(self, name):
- # Returns id of name, or the id itself if given as argument
+ """Returns id of name, or the id itself if given as argument
+ """
+ self._logger.debug("")
for sdnc in self.list():
if name == sdnc['_id']:
return sdnc['_id']
return ''
def create(self, name, sdn_controller, wait=False):
+ self._logger.debug("")
self._client.get_token()
http_code, resp = self._http.post_cmd(endpoint=self._apiBase, postfields_dict=sdn_controller)
# print('HTTP CODE: {}'.format(http_code))
raise ClientException("failed to create SDN controller {} - {}".format(name, msg))
def update(self, name, sdn_controller, wait=False):
+ self._logger.debug("")
self._client.get_token()
sdnc = self.get(name)
sdnc_id_for_wait = self._get_id_for_wait(name)
raise ClientException("failed to update SDN controller {} - {}".format(name, msg))
def delete(self, name, force=False, wait=False):
+ self._logger.debug("")
self._client.get_token()
sdn_controller = self.get(name)
sdnc_id_for_wait = self._get_id_for_wait(name)
def list(self, filter=None):
"""Returns a list of SDN controllers
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns an SDN controller based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for sdnc in self.list():
return sdnc
raise NotFound("SDN controller {} not found".format(name))
-
from osmclient.common.exceptions import ClientException
from osmclient.common.exceptions import NotFound
import json
+import logging
class User(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/admin'
self._apiVersion = '/v1'
self._apiResource = '/users'
def create(self, name, user):
"""Creates a new OSM user
"""
+ self._logger.debug("")
self._client.get_token()
if not user["projects"] or (len(user["projects"]) == 1 and not user["projects"][0]):
del user["projects"]
def update(self, name, user):
"""Updates an existing OSM user identified by name
"""
+ self._logger.debug("")
self._client.get_token()
# print(user)
myuser = self.get(name)
def delete(self, name, force=False):
"""Deletes an existing OSM user identified by name
"""
+ self._logger.debug("")
self._client.get_token()
user = self.get(name)
querystring = ''
def list(self, filter=None):
"""Returns the list of OSM users
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns an OSM user based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for user in self.list():
return user
raise NotFound("User {} not found".format(name))
-
from osmclient.common.exceptions import NotFound
import yaml
import json
+import logging
class Vim(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/admin'
self._apiVersion = '/v1'
self._apiResource = '/vim_accounts'
# VIM '--wait' option
def _wait(self, id, deleteFlag=False):
+ self._logger.debug("")
self._client.get_token()
# Endpoint to get operation status
apiUrlStatus = '{}{}{}'.format(self._apiName, self._apiVersion, '/vim_accounts')
deleteFlag=deleteFlag)
def _get_id_for_wait(self, name):
- # Returns id of name, or the id itself if given as argument
+ """ Returns id of name, or the id itself if given as argument
+ """
+ self._logger.debug("")
+ self._client.get_token()
for vim in self.list():
if name == vim['uuid']:
return vim['uuid']
return ''
def create(self, name, vim_access, sdn_controller=None, sdn_port_mapping=None, wait=False):
+ self._logger.debug("")
self._client.get_token()
if 'vim-type' not in vim_access:
#'openstack' not in vim_access['vim-type']):
raise ClientException("failed to create vim {} - {}".format(name, msg))
def update(self, vim_name, vim_account, sdn_controller, sdn_port_mapping, wait=False):
+ self._logger.debug("")
self._client.get_token()
vim = self.get(vim_name)
vim_id_for_wait = self._get_id_for_wait(vim_name)
raise ClientException("failed to update vim {} - {}".format(vim_name, msg))
def update_vim_account_dict(self, vim_account, vim_access):
+ self._logger.debug("")
vim_account['vim_type'] = vim_access['vim-type']
vim_account['description'] = vim_access['description']
vim_account['vim_url'] = vim_access['vim-url']
def get_id(self, name):
"""Returns a VIM id from a VIM name
"""
+ self._logger.debug("")
for vim in self.list():
if name == vim['name']:
return vim['uuid']
raise NotFound("vim {} not found".format(name))
def delete(self, vim_name, force=False, wait=False):
+ self._logger.debug("")
self._client.get_token()
vim_id = vim_name
if not utils.validate_uuid4(vim_name):
def list(self, filter=None):
"""Returns a list of VIM accounts
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns a VIM account based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
vim_id = name
if not utils.validate_uuid4(name):
from osmclient.common import utils
from osmclient.common.exceptions import NotFound
+import logging
class Vnf(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/nslcm'
self._apiVersion = '/v1'
self._apiResource = '/vnfrs'
def list(self, ns=None, filter=None):
"""Returns a list of VNF instances
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns a VNF instance based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for vnf in self.list():
raise NotFound("vnf {} not found".format(name))
def get_individual(self, name):
+ self._logger.debug("")
self._client.get_token()
vnf_id = name
if not utils.validate_uuid4(name):
import json
import magic
from os.path import basename
+import logging
#from os import stat
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/vnfpkgm'
self._apiVersion = '/v1'
self._apiResource = '/vnf_packages'
#self._apiBase='/vnfds'
def list(self, filter=None):
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
return list()
def get(self, name):
+ self._logger.debug("")
self._client.get_token()
if utils.validate_uuid4(name):
for vnfd in self.list():
raise NotFound("vnfd {} not found".format(name))
def get_individual(self, name):
+ self._logger.debug("")
vnfd = self.get(name)
# It is redundant, since the previous one already gets the whole vnfpkginfo
# The only difference is that a different primitive is exercised
raise NotFound("vnfd {} not found".format(name))
def get_thing(self, name, thing, filename):
+ self._logger.debug("")
vnfd = self.get(name)
headers = self._client._headers
headers['Accept'] = 'application/binary'
raise ClientException("failed to get {} from {} - {}".format(thing, name, msg))
def get_descriptor(self, name, filename):
+ self._logger.debug("")
self.get_thing(name, 'vnfd', filename)
def get_package(self, name, filename):
+ self._logger.debug("")
self.get_thing(name, 'package_content', filename)
def get_artifact(self, name, artifact, filename):
+ self._logger.debug("")
self.get_thing(name, 'artifacts/{}'.format(artifact), filename)
def delete(self, name, force=False):
+ self._logger.debug("")
self._client.get_token()
vnfd = self.get(name)
querystring = ''
raise ClientException("failed to delete vnfd {} - {}".format(name, msg))
def create(self, filename, overwrite=None, update_endpoint=None):
+ self._logger.debug("")
self._client.get_token()
mime_type = magic.from_file(filename, mime=True)
if mime_type is None:
raise ClientException("failed to create/update vnfd - {}".format(msg))
def update(self, name, filename):
+ self._logger.debug("")
self._client.get_token()
vnfd = self.get(name)
endpoint = '{}/{}/package_content'.format(self._apiBase, vnfd['_id'])
from osmclient.common.exceptions import NotFound
import yaml
import json
+import logging
class Wim(object):
def __init__(self, http=None, client=None):
self._http = http
self._client = client
+ self._logger = logging.getLogger('osmclient')
self._apiName = '/admin'
self._apiVersion = '/v1'
self._apiResource = '/wim_accounts'
# WIM '--wait' option
def _wait(self, id, deleteFlag=False):
+ self._logger.debug("")
self._client.get_token()
# Endpoint to get operation status
apiUrlStatus = '{}{}{}'.format(self._apiName, self._apiVersion, '/wim_accounts')
deleteFlag=deleteFlag)
def _get_id_for_wait(self, name):
- # Returns id of name, or the id itself if given as argument
+ """Returns id of name, or the id itself if given as argument
+ """
+ self._logger.debug("")
for wim in self.list():
if name == wim['uuid']:
return wim['uuid']
return ''
def create(self, name, wim_input, wim_port_mapping=None, wait=False):
+ self._logger.debug("")
self._client.get_token()
if 'wim_type' not in wim_input:
raise Exception("wim type not provided")
raise ClientException("failed to create wim {} - {}".format(name, msg))
def update(self, wim_name, wim_account, wim_port_mapping=None, wait=False):
+ self._logger.debug("")
self._client.get_token()
wim = self.get(wim_name)
wim_id_for_wait = self._get_id_for_wait(wim_name)
raise ClientException("failed to update wim {} - {}".format(wim_name, msg))
def update_wim_account_dict(self, wim_account, wim_input):
- print(wim_input)
+ self._logger.debug("")
+ self._logger.debug(str(wim_input))
wim_account['wim_type'] = wim_input['wim_type']
wim_account['description'] = wim_input['description']
wim_account['wim_url'] = wim_input['url']
def get_id(self, name):
"""Returns a WIM id from a WIM name
"""
+ self._logger.debug("")
for wim in self.list():
if name == wim['name']:
return wim['uuid']
raise NotFound("wim {} not found".format(name))
def delete(self, wim_name, force=False, wait=False):
+ self._logger.debug("")
self._client.get_token()
wim_id = wim_name
wim_id_for_wait = self._get_id_for_wait(wim_name)
def list(self, filter=None):
"""Returns a list of VIM accounts
"""
+ self._logger.debug("")
self._client.get_token()
filter_string = ''
if filter:
def get(self, name):
"""Returns a VIM account based on name or id
"""
+ self._logger.debug("")
self._client.get_token()
wim_id = name
if not utils.validate_uuid4(name):
else:
return resp
raise NotFound("wim {} not found".format(name))
+
pycurl
python-magic
jinja2
+verboselogs
git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im
description=_description,
license='Apache 2',
install_requires=[
- 'Click', 'prettytable', 'pyyaml', 'PycURL', 'python-magic',
- 'jinja2', 'osm-im'
+ 'Click', 'prettytable', 'pyyaml', 'pycurl', 'python-magic',
+ 'jinja2', 'osm-im', 'verboselogs'
],
dependency_links=[
'git+https://osm.etsi.org/gerrit/osm/IM.git#egg=osm-im',