'Also can set OSM_PROJECT in environment')
@click.option('-v', '--verbose', count=True,
help='increase verbosity (-v INFO, -vv VERBOSE, -vvv DEBUG)')
+@click.option('--all-projects',
+ default=False,
+ is_flag=True,
+ help='include all projects')
+@click.option('--public/--no-public', default=None,
+ help='flag for public items (packages, instances, VIM accounts, etc.)')
#@click.option('--so-port',
# default=None,
# envvar='OSM_SO_PORT',
# help='hostname of RO server. ' +
# 'Also can set OSM_RO_PORT in environment')
@click.pass_context
-def cli_osm(ctx, hostname, user, password, project, verbose):
+def cli_osm(ctx, hostname, user, password, project, verbose, all_projects, public):
global logger
if hostname is None:
print((
kwargs['password']=password
if project is not None:
kwargs['project']=project
+ if all_projects:
+ kwargs['all_projects']=all_projects
+ if public is not None:
+ kwargs['public']=public
ctx.obj = client.Client(host=hostname, sol005=sol005, **kwargs)
logger = logging.getLogger('osmclient')
self._auth_endpoint = '/admin/v1/tokens'
self._headers = {}
self._token = None
-
if len(host.split(':')) > 1:
# backwards compatible, port provided as part of host
self._host = host.split(':')[0]
self._so_port = so_port
self._http_client = http.Http(
- 'https://{}:{}/osm'.format(self._host,self._so_port))
+ 'https://{}:{}/osm'.format(self._host,self._so_port), **kwargs)
self._headers['Accept'] = 'application/json'
self._headers['Content-Type'] = 'application/yaml'
http_header = ['{}: {}'.format(key, val)
'password': self._password,
'project_id': self._project}
http_code, resp = self._http_client.post_cmd(endpoint=self._auth_endpoint,
- postfields_dict=postfields_dict)
+ postfields_dict=postfields_dict,
+ skip_query_admin=True)
# if http_code not in (200, 201, 202, 204):
# message ='Authentication error: not possible to get auth token\nresp:\n{}'.format(resp)
# raise ClientException(message)
self._http_client.set_http_header(http_header)
def get_version(self):
- _, resp = self._http_client.get2_cmd(endpoint="/version")
+ _, resp = self._http_client.get2_cmd(endpoint="/version", skip_query_admin=True)
resp = json.loads(resp)
return "{} {}".format(resp.get("version"), resp.get("date"))
+
+ def set_default_params(self, **kwargs):
+ host = kwargs.pop('host', None)
+ if host != None:
+ self._host=host
+ port = kwargs.pop('port', None)
+ if port != None:
+ self._so_port=port
+ self._http_client.set_query_admin(**kwargs)
class Http(http.Http):
- def __init__(self, url, user='admin', password='admin'):
+ def __init__(self, url, user='admin', password='admin', **kwargs):
self._url = url
self._user = user
self._password = password
self._http_header = None
self._logger = logging.getLogger('osmclient')
+ self._default_query_admin = None
+ self._all_projects = None;
+ self._public = None;
+ if 'all_projects' in kwargs:
+ self._all_projects=kwargs['all_projects']
+ if 'public' in kwargs:
+ self._public=kwargs['public']
+ self._default_query_admin = self._complete_default_query_admin()
- def _get_curl_cmd(self, endpoint):
+ def _complete_default_query_admin(self):
+ query_string_list=[]
+ if self._all_projects:
+ query_string_list.append("ADMIN")
+ if self._public is not None:
+ query_string_list.append("PUBLIC={}".format(self._public))
+ return "&".join(query_string_list)
+
+ def _complete_endpoint(self, endpoint):
+ if self._default_query_admin:
+ if '?' in endpoint:
+ endpoint = '&'.join([endpoint, self._default_query_admin])
+ else:
+ endpoint = '?'.join([endpoint, self._default_query_admin])
+ return endpoint
+
+ def _get_curl_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
curl_cmd = pycurl.Curl()
if self._logger.getEffectiveLevel() == logging.DEBUG:
curl_cmd.setopt(pycurl.VERBOSE, True)
+ if not skip_query_admin:
+ endpoint = self._complete_endpoint(endpoint)
curl_cmd.setopt(pycurl.URL, self._url + endpoint)
curl_cmd.setopt(pycurl.SSL_VERIFYPEER, 0)
curl_cmd.setopt(pycurl.SSL_VERIFYHOST, 0)
curl_cmd.setopt(pycurl.HTTPHEADER, self._http_header)
return curl_cmd
- def delete_cmd(self, endpoint):
+ def delete_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
+ curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "DELETE")
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
self._logger.info("Request METHOD: {} URL: {}".format("DELETE",self._url + endpoint))
def send_cmd(self, endpoint='', postfields_dict=None,
formfile=None, filename=None,
- put_method=False, patch_method=False):
+ put_method=False, patch_method=False,
+ skip_query_admin=False):
self._logger.debug("")
data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
+ curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
if put_method:
curl_cmd.setopt(pycurl.CUSTOMREQUEST, "PUT")
elif patch_method:
return http_code, None
def post_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None):
+ formfile=None, filename=None,
+ skip_query_admin=False):
self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
- formfile=formfile,
- filename=filename,
- put_method=False, patch_method=False)
+ formfile=formfile, filename=filename,
+ put_method=False, patch_method=False,
+ skip_query_admin=skip_query_admin)
def put_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None):
+ formfile=None, filename=None,
+ skip_query_admin=False):
self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
- formfile=formfile,
- filename=filename,
- put_method=True, patch_method=False)
+ formfile=formfile, filename=filename,
+ put_method=True, patch_method=False,
+ skip_query_admin=skip_query_admin)
def patch_cmd(self, endpoint='', postfields_dict=None,
- formfile=None, filename=None):
+ formfile=None, filename=None,
+ skip_query_admin=False):
self._logger.debug("")
return self.send_cmd(endpoint=endpoint,
postfields_dict=postfields_dict,
- formfile=formfile,
- filename=filename,
- put_method=False, patch_method=True)
+ formfile=formfile, filename=filename,
+ put_method=False, patch_method=True,
+ skip_query_admin=skip_query_admin)
- def get2_cmd(self, endpoint):
+ def get2_cmd(self, endpoint, skip_query_admin=False):
self._logger.debug("")
data = BytesIO()
- curl_cmd = self._get_curl_cmd(endpoint)
+ curl_cmd = self._get_curl_cmd(endpoint, skip_query_admin)
curl_cmd.setopt(pycurl.HTTPGET, 1)
curl_cmd.setopt(pycurl.WRITEFUNCTION, data.write)
self._logger.info("Request METHOD: {} URL: {}".format("GET",self._url + endpoint))
if http_code == 404:
raise NotFound("Error {}{}".format(http_code, resp))
raise OsmHttpException("Error {}{}".format(http_code, resp))
+
+ def set_query_admin(self, **kwargs):
+ if 'all_projects' in kwargs:
+ self._all_projects=kwargs['all_projects']
+ if 'public' in kwargs:
+ self._public=kwargs['public']
+ self._default_query_admin = self._complete_default_query_admin()
self._logger.debug("")
self._client.get_token()
http_code, resp = self._http.post_cmd(endpoint=self._apiBase,
- postfields_dict=project)
+ postfields_dict=project,
+ skip_query_admin=True)
#print('HTTP CODE: {}'.format(http_code))
#print('RESP: {}'.format(resp))
#if http_code in (200, 201, 202, 204):
self._client.get_token()
proj = self.get(project)
http_code, resp = self._http.patch_cmd(endpoint='{}/{}'.format(self._apiBase, proj['_id']),
- postfields_dict=project_changes)
+ postfields_dict=project_changes,
+ skip_query_admin=True)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
if http_code in (200, 201, 202):
if force:
querystring = '?FORCE=True'
http_code, resp = self._http.delete_cmd('{}/{}{}'.format(self._apiBase,
- project['_id'], querystring))
+ project['_id'], querystring),
+ skip_query_admin=True)
#print('HTTP CODE: {}'.format(http_code))
#print('RESP: {}'.format(resp))
if http_code == 202:
filter_string = ''
if filter:
filter_string = '?{}'.format(filter)
- _, resp = self._http.get2_cmd('{}{}'.format(self._apiBase,filter_string))
+ _, resp = self._http.get2_cmd('{}{}'.format(self._apiBase,filter_string),
+ skip_query_admin=True)
#print('RESP: {}'.format(resp))
if resp:
return json.loads(resp)
role["permissions"] = role_permissions
http_code, resp = self._http.post_cmd(endpoint=self._apiBase,
- postfields_dict=role)
+ postfields_dict=role,
+ skip_query_admin=True)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
#if http_code in (200, 201, 202, 204):
del new_role_obj["permissions"]
http_code, resp = self._http.patch_cmd(endpoint='{}/{}'.format(self._apiBase, role_obj['_id']),
- postfields_dict=new_role_obj)
+ postfields_dict=new_role_obj,
+ skip_query_admin=True)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
if http_code in (200, 201, 202):
if force:
querystring = '?FORCE=True'
http_code, resp = self._http.delete_cmd('{}/{}{}'.format(self._apiBase,
- role['_id'], querystring))
+ role['_id'], querystring),
+ skip_query_admin=True)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
if http_code == 202:
filter_string = ''
if filter:
filter_string = '?{}'.format(filter)
- _, resp = self._http.get2_cmd('{}{}'.format(self._apiBase, filter_string))
+ _, resp = self._http.get2_cmd('{}{}'.format(self._apiBase, filter_string),skip_query_admin=True)
# print('RESP: {}'.format(resp))
if resp:
return json.loads(resp)
for role in roles:
mapping = {"project": project, "role": role}
- if mapping not in project_role_mappings:
+ if mapping not in project_role_mappings:
project_role_mappings.append(mapping)
-
user["project_role_mappings"] = project_role_mappings
else:
del user["project_role_mappings"]
http_code, resp = self._http.post_cmd(endpoint=self._apiBase,
- postfields_dict=user)
+ postfields_dict=user,
+ skip_query_admin=True)
#print('HTTP CODE: {}'.format(http_code))
#print('RESP: {}'.format(resp))
#if http_code in (200, 201, 202, 204):
raise ClientException("At least something should be changed.")
http_code, resp = self._http.patch_cmd(endpoint='{}/{}'.format(self._apiBase, myuser['_id']),
- postfields_dict=update_user)
+ postfields_dict=update_user, skip_query_admin=True)
# print('HTTP CODE: {}'.format(http_code))
# print('RESP: {}'.format(resp))
if http_code in (200, 201, 202):
if force:
querystring = '?FORCE=True'
http_code, resp = self._http.delete_cmd('{}/{}{}'.format(self._apiBase,
- user['_id'], querystring))
+ user['_id'], querystring), skip_query_admin=True)
#print('HTTP CODE: {}'.format(http_code))
#print('RESP: {}'.format(resp))
if http_code == 202:
filter_string = ''
if filter:
filter_string = '?{}'.format(filter)
- _, resp = self._http.get2_cmd('{}{}'.format(self._apiBase,filter_string))
+ _, resp = self._http.get2_cmd('{}{}'.format(self._apiBase,filter_string,skip_query_admin=True))
#print('RESP: {}'.format(resp))
if resp:
return json.loads(resp)