4 # Copyright 2017 RIFT.io Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
19 @author Anoop Valluthadam (anoop.valluthadam@riftio.com), Vishnu Narayanan K.A
20 @brief Create/Delete/Other operations of Projects and User
25 from utils
.imports
import * # noqa
26 from utils
.traversal_engine
import traverse_it
27 from utils
.utils
import parse_input_data
28 from utils
.tbac_token_utils
import * # noqa
30 headers
= {'content-type': 'application/json'}
33 class TestRestAPI(object):
36 def traverse_and_find_all_keys(self
, it
, key_dict
):
37 """Find all keys and their data types present in the json schema.
41 key_dict (dict): will be populated with the keys & their datatypes
43 key_dict (dict): will be populated with the keys & their datatypes
45 if (isinstance(it
, list)):
47 self
.traverse_and_find_all_keys(item
, key_dict
)
50 elif (isinstance(it
, dict)):
52 if key
== 'name' and 'data-type' in it
:
53 if isinstance(it
['data-type'], dict):
54 dtype
= next(iter(it
['data-type']))
55 if ((it
[key
] in key_dict
) and
56 (dtype
not in key_dict
[it
[key
]])):
58 key_dict
[it
[key
]].append(dtype
)
60 elif it
[key
] not in key_dict
:
61 key_dict
[it
[key
]] = [dtype
]
65 if ((it
[key
] in key_dict
) and
66 (it
['data-type'] not in key_dict
[it
[key
]])):
68 key_dict
[it
[key
]].append(it
['data-type'])
70 elif it
[key
] not in key_dict
:
71 key_dict
[it
[key
]] = [it
['data-type']]
74 self
.traverse_and_find_all_keys(it
[key
], key_dict
)
80 self
, data
, confd_host
, url
, logger
, state
, number_of_tests
):
84 data (dict): JSON data
85 confd_host (string): IP addr of the Launchpad
86 url (string): the url for the post call
87 logger (logger Object): log object
88 state: for the tbac token
89 number_of_tests (list): test & error cases count
91 number_of_tests (list): test & error cases count
93 requests.exceptions.ConnectionError: in case we loose connection
94 from the Launchpad, mostly when Launchpad crashes
97 number_of_tests
[0] += 1
99 key
= next(iter(data
))
101 name
= str(data
[key
][0]["name"])
103 elif 'user-config' in url
:
104 name
= str(data
[key
]['user'][0]['user-name'])
105 domain
= str(data
[key
]['user'][0]['user-domain'])
106 data
= data
['rw-user:user-config']
107 new_url
= url
+ '/user/' + name
+ ',' + domain
109 raise Exception('Something wrong with the URL')
112 headers
['Authorization'] = 'Bearer ' + state
.access_token
114 create_result
= state
.session
.post(
115 url
, data
=json
.dumps(data
),
116 headers
=headers
, verify
=False)
117 get_result
= state
.session
.get(
119 headers
=headers
, verify
=False)
120 delete_result
= state
.session
.delete(
122 headers
=headers
, verify
=False)
123 except requests
.exceptions
.ConnectionError
:
124 logger
.error('Crashed for the data: \n{}'.format(data
))
125 number_of_tests
[1] += 1
129 'create result:\n{}\n{}\n'.format(
130 create_result
.status_code
, create_result
.text
))
132 'get result:\n{}\n{}\n'.format(
133 get_result
.status_code
, get_result
.text
))
135 'delete result:\n{}\n{}\n'.format(
136 delete_result
.status_code
, delete_result
.text
))
138 return number_of_tests
140 def get_schema(self
, confd_host
, url
, property_
=None):
144 confd_host (string): Launchpad IP
145 property_ (string): vnfd/nsd/user etc
147 schema (JSON): Schema in JSON format
149 headers
= {'content-type': 'application/json'}
151 result
= requests
.get(url
, auth
=HTTPBasicAuth('admin', 'admin'),
152 headers
=headers
, verify
=False)
154 schema
= json
.loads(result
.text
)
159 self
, test_input
, data
, k_dict
, confd_host
, logger
,
160 number_of_tests
, depth
, url
, state
):
161 """Traversing through the values from the test IP JSON.
164 test_input (string): the data from the test IP JSON
165 data (json): schema data
166 k_dict (dict): dictionary of the JSON IP
167 confd_host (string): Launchpad IP
168 logger (logger obj): log object
169 number_of_tests (list): test & error cases count
170 depth (int): depth of the json
171 url (string): the url for the post call
172 state: for the tbac token
174 number_of_tests (list): test & error cases count
176 for key
, kdata_types
in k_dict
.items():
177 for kdata_type
in kdata_types
:
178 if kdata_type
in test_input
:
179 test_values
= test_input
[kdata_type
]
180 for test_value
in test_values
:
181 test_data
= {kdata_type
: test_value
}
182 # Actual traversal call which will generate data
183 json_data
= traverse_it(
184 data
, original
=False,
185 test_value
=test_data
, test_key
=key
,
188 number_of_tests
= self
.create_post_call(
189 json_data
, confd_host
, url
,
190 logger
, state
, number_of_tests
)
192 return number_of_tests
195 self
, rw_user_proxy
, rbac_user_passwd
, user_domain
,
196 rbac_platform_proxy
, rw_rbac_int_proxy
, mgmt_session
, state
):
197 """Setting the public key in config and get token."""
199 rift
.auto
.mano
.create_user(
200 rw_user_proxy
, 'test', rbac_user_passwd
, user_domain
)
201 rift
.auto
.mano
.assign_platform_role_to_user(
202 rbac_platform_proxy
, 'rw-rbac-platform:super-admin', 'test',
203 user_domain
, rw_rbac_int_proxy
)
205 '/rw-openidc-provider:openidc-provider-config/' +
206 'rw-openidc-provider:openidc-client' +
207 '[rw-openidc-provider:client-id={}]'.format(quoted_key(client_id
))
210 RwOpenidcProviderYang
.
211 YangData_RwOpenidcProvider_OpenidcProviderConfig_OpenidcClient
.
213 'client_id': client_id
,
214 'client_name': 'test',
216 'user_domain': 'tbacdomain',
217 'public_key': PUBLIC_KEY
}))
218 rw_open_idc_proxy
= mgmt_session
.proxy(RwOpenidcProviderYang
)
219 rw_open_idc_proxy
.create_config(openidc_xpath
, config_object
)
222 jwt
= Jwt(private_key
=PRIVATE_KEY
, iss
=client_id
,
223 sub
="test", aud
="https://locahost:8009")
226 ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
230 req
= tornado
.httpclient
.HTTPRequest(
233 headers
={"Content-Type": "application/x-www-form-urlencoded"},
235 body
=urllib
.parse
.urlencode(body_tuple
)
237 client
= tornado
.httpclient
.HTTPClient()
238 resp
= client
.fetch(req
)
239 token_resp
= json
.loads(resp
.body
.decode('utf-8'))
240 assert "access_token" in token_resp
241 state
.access_token
= token_resp
["access_token"]
243 auth_value
= 'Bearer ' + state
.access_token
244 state
.session
= requests
.Session()
245 state
.session
.headers
.update({
246 'content-type': 'application/json',
247 'Authorization': auth_value
250 def test_user_restapi(self
, confd_host
, logger
, state
):
251 """Test user creation restapi."""
252 rift_install
= os
.getenv('RIFT_INSTALL')
254 '{}/usr/rift/systemtest/pytest/'.format(rift_install
) +
255 'system/ns/restapitest/test_inputs/test_inputs.json')
256 test_input
= parse_input_data(file_path
)
257 schema_url_for_user
= (
258 "https://{}:8008/v2/api/schema/user-config/".format(confd_host
)
261 "https://{}:8008/v2/api/config/user-config".format(confd_host
)
263 data
= self
.get_schema(confd_host
, schema_url_for_user
)
266 k_dict
= self
.traverse_and_find_all_keys(data
, key_dict
)
268 number_of_tests
= [0, 0] # [total no. of tests, no. of erros]
269 # Traverse with depth but with out any specific key
270 for depth
in range(14, 15):
271 number_of_tests
= self
.traverse_call(
272 test_input
, data
["user-config"], k_dict
, confd_host
,
273 logger
, number_of_tests
, depth
, url_for_user
, state
)
275 'No of tests ran for userapi: {}'.format(number_of_tests
[0]))
277 'No of crashed tests for userapi:{}'.format(number_of_tests
[1]))
279 def test_project_restapi(self
, confd_host
, logger
, state
):
280 """Test project creation restapi."""
281 rift_install
= os
.getenv('RIFT_INSTALL')
283 '{}/usr/rift/systemtest/pytest/'.format(rift_install
) +
284 'system/ns/restapitest/test_inputs/test_inputs.json')
285 test_input
= parse_input_data(file_path
)
287 schema_url_for_project
= (
288 "https://{}:8008/v2/api/schema/project/".format(confd_host
)
291 "https://{}:8008/v2/api/config/project/".format(confd_host
)
293 data
= self
.get_schema(confd_host
, schema_url_for_project
)
296 k_dict
= self
.traverse_and_find_all_keys(data
, key_dict
)
298 number_of_tests
= [0, 0] # [total no. of tests, no. of erros]
300 # Traverse with depth but with out any specific key
301 for depth
in range(5, 6):
302 number_of_tests
= self
.traverse_call(
303 test_input
, data
["project"], k_dict
, confd_host
,
304 logger
, number_of_tests
, depth
, url_for_project
, state
)
306 'No of tests ran for projectapi: {}'.format(number_of_tests
[0]))
308 'No of crashed tests for projectapi:{}'.format(number_of_tests
[1]))