update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / ra / pytest / ns / restapitest / test_project_restapi.py
1 # !/usr/bin/env python
2 """
3 #
4 # Copyright 2017 RIFT.io Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 @author Anoop Valluthadam (anoop.valluthadam@riftio.com), Vishnu Narayanan K.A
20 @brief Create/Delete/Other operations of Projects and User
21 """
22
23 import os
24
25 from utils.imports import * # noqa
26 from utils.traversal_engine import traverse_it
27 from utils.utils import parse_input_data
28 from utils.tbac_token_utils import * # noqa
29
30 headers = {'content-type': 'application/json'}
31
32
33 class TestRestAPI(object):
34 """TestRestAPI."""
35
36 def traverse_and_find_all_keys(self, it, key_dict):
37 """Find all keys and their data types present in the json schema.
38
39 Args:
40 it (dict): the json
41 key_dict (dict): will be populated with the keys & their datatypes
42 Returns:
43 key_dict (dict): will be populated with the keys & their datatypes
44 """
45 if (isinstance(it, list)):
46 for item in it:
47 self.traverse_and_find_all_keys(item, key_dict)
48 return key_dict
49
50 elif (isinstance(it, dict)):
51 for key in it.keys():
52 if key == 'name' and 'data-type' in it:
53 if isinstance(it['data-type'], dict):
54 dtype = next(iter(it['data-type']))
55 if ((it[key] in key_dict) and
56 (dtype not in key_dict[it[key]])):
57
58 key_dict[it[key]].append(dtype)
59
60 elif it[key] not in key_dict:
61 key_dict[it[key]] = [dtype]
62 else:
63 pass
64 else:
65 if ((it[key] in key_dict) and
66 (it['data-type'] not in key_dict[it[key]])):
67
68 key_dict[it[key]].append(it['data-type'])
69
70 elif it[key] not in key_dict:
71 key_dict[it[key]] = [it['data-type']]
72 else:
73 pass
74 self.traverse_and_find_all_keys(it[key], key_dict)
75 return key_dict
76 else:
77 return None
78
79 def create_post_call(
80 self, data, confd_host, url, logger, state, number_of_tests):
81 """Create the POST.
82
83 Args:
84 data (dict): JSON data
85 confd_host (string): IP addr of the Launchpad
86 url (string): the url for the post call
87 logger (logger Object): log object
88 state: for the tbac token
89 number_of_tests (list): test & error cases count
90 Returns:
91 number_of_tests (list): test & error cases count
92 Raises:
93 requests.exceptions.ConnectionError: in case we loose connection
94 from the Launchpad, mostly when Launchpad crashes
95
96 """
97 number_of_tests[0] += 1
98
99 key = next(iter(data))
100 if 'project' in url:
101 name = str(data[key][0]["name"])
102 new_url = url + name
103 elif 'user-config' in url:
104 name = str(data[key]['user'][0]['user-name'])
105 domain = str(data[key]['user'][0]['user-domain'])
106 data = data['rw-user:user-config']
107 new_url = url + '/user/' + name + ',' + domain
108 else:
109 raise Exception('Something wrong with the URL')
110
111 logger.debug(data)
112 headers['Authorization'] = 'Bearer ' + state.access_token
113 try:
114 create_result = state.session.post(
115 url, data=json.dumps(data),
116 headers=headers, verify=False)
117 get_result = state.session.get(
118 new_url,
119 headers=headers, verify=False)
120 delete_result = state.session.delete(
121 new_url,
122 headers=headers, verify=False)
123 except requests.exceptions.ConnectionError:
124 logger.error('Crashed for the data: \n{}'.format(data))
125 number_of_tests[1] += 1
126 exit(1)
127
128 logger.debug(
129 'create result:\n{}\n{}\n'.format(
130 create_result.status_code, create_result.text))
131 logger.debug(
132 'get result:\n{}\n{}\n'.format(
133 get_result.status_code, get_result.text))
134 logger.debug(
135 'delete result:\n{}\n{}\n'.format(
136 delete_result.status_code, delete_result.text))
137
138 return number_of_tests
139
140 def get_schema(self, confd_host, url, property_=None):
141 """Get schema.
142
143 Args:
144 confd_host (string): Launchpad IP
145 property_ (string): vnfd/nsd/user etc
146 Returns:
147 schema (JSON): Schema in JSON format
148 """
149 headers = {'content-type': 'application/json'}
150
151 result = requests.get(url, auth=HTTPBasicAuth('admin', 'admin'),
152 headers=headers, verify=False)
153
154 schema = json.loads(result.text)
155
156 return schema
157
158 def traverse_call(
159 self, test_input, data, k_dict, confd_host, logger,
160 number_of_tests, depth, url, state):
161 """Traversing through the values from the test IP JSON.
162
163 Args:
164 test_input (string): the data from the test IP JSON
165 data (json): schema data
166 k_dict (dict): dictionary of the JSON IP
167 confd_host (string): Launchpad IP
168 logger (logger obj): log object
169 number_of_tests (list): test & error cases count
170 depth (int): depth of the json
171 url (string): the url for the post call
172 state: for the tbac token
173 Returns:
174 number_of_tests (list): test & error cases count
175 """
176 for key, kdata_types in k_dict.items():
177 for kdata_type in kdata_types:
178 if kdata_type in test_input:
179 test_values = test_input[kdata_type]
180 for test_value in test_values:
181 test_data = {kdata_type: test_value}
182 # Actual traversal call which will generate data
183 json_data = traverse_it(
184 data, original=False,
185 test_value=test_data, test_key=key,
186 max_depth=depth)
187
188 number_of_tests = self.create_post_call(
189 json_data, confd_host, url,
190 logger, state, number_of_tests)
191
192 return number_of_tests
193
194 def test_get_token(
195 self, rw_user_proxy, rbac_user_passwd, user_domain,
196 rbac_platform_proxy, rw_rbac_int_proxy, mgmt_session, state):
197 """Setting the public key in config and get token."""
198 client_id = '1234'
199 rift.auto.mano.create_user(
200 rw_user_proxy, 'test', rbac_user_passwd, user_domain)
201 rift.auto.mano.assign_platform_role_to_user(
202 rbac_platform_proxy, 'rw-rbac-platform:super-admin', 'test',
203 user_domain, rw_rbac_int_proxy)
204 openidc_xpath = (
205 '/rw-openidc-provider:openidc-provider-config/' +
206 'rw-openidc-provider:openidc-client' +
207 '[rw-openidc-provider:client-id={}]'.format(quoted_key(client_id))
208 )
209 config_object = (
210 RwOpenidcProviderYang.
211 YangData_RwOpenidcProvider_OpenidcProviderConfig_OpenidcClient.
212 from_dict({
213 'client_id': client_id,
214 'client_name': 'test',
215 'user_name': 'test',
216 'user_domain': 'tbacdomain',
217 'public_key': PUBLIC_KEY}))
218 rw_open_idc_proxy = mgmt_session.proxy(RwOpenidcProviderYang)
219 rw_open_idc_proxy.create_config(openidc_xpath, config_object)
220
221 # Get the token
222 jwt = Jwt(private_key=PRIVATE_KEY, iss=client_id,
223 sub="test", aud="https://locahost:8009")
224 jws = jwt.sign_jwt()
225 body_tuple = (
226 ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
227 ("assertion", jws),
228 )
229
230 req = tornado.httpclient.HTTPRequest(
231 url=TOKEN_URL,
232 method='POST',
233 headers={"Content-Type": "application/x-www-form-urlencoded"},
234 ca_certs=state.cert,
235 body=urllib.parse.urlencode(body_tuple)
236 )
237 client = tornado.httpclient.HTTPClient()
238 resp = client.fetch(req)
239 token_resp = json.loads(resp.body.decode('utf-8'))
240 assert "access_token" in token_resp
241 state.access_token = token_resp["access_token"]
242
243 auth_value = 'Bearer ' + state.access_token
244 state.session = requests.Session()
245 state.session.headers.update({
246 'content-type': 'application/json',
247 'Authorization': auth_value
248 })
249
250 def test_user_restapi(self, confd_host, logger, state):
251 """Test user creation restapi."""
252 rift_install = os.getenv('RIFT_INSTALL')
253 file_path = (
254 '{}/usr/rift/systemtest/pytest/'.format(rift_install) +
255 'system/ns/restapitest/test_inputs/test_inputs.json')
256 test_input = parse_input_data(file_path)
257 schema_url_for_user = (
258 "https://{}:8008/v2/api/schema/user-config/".format(confd_host)
259 )
260 url_for_user = (
261 "https://{}:8008/v2/api/config/user-config".format(confd_host)
262 )
263 data = self.get_schema(confd_host, schema_url_for_user)
264
265 key_dict = {}
266 k_dict = self.traverse_and_find_all_keys(data, key_dict)
267
268 number_of_tests = [0, 0] # [total no. of tests, no. of erros]
269 # Traverse with depth but with out any specific key
270 for depth in range(14, 15):
271 number_of_tests = self.traverse_call(
272 test_input, data["user-config"], k_dict, confd_host,
273 logger, number_of_tests, depth, url_for_user, state)
274 logger.debug(
275 'No of tests ran for userapi: {}'.format(number_of_tests[0]))
276 logger.debug(
277 'No of crashed tests for userapi:{}'.format(number_of_tests[1]))
278
279 def test_project_restapi(self, confd_host, logger, state):
280 """Test project creation restapi."""
281 rift_install = os.getenv('RIFT_INSTALL')
282 file_path = (
283 '{}/usr/rift/systemtest/pytest/'.format(rift_install) +
284 'system/ns/restapitest/test_inputs/test_inputs.json')
285 test_input = parse_input_data(file_path)
286
287 schema_url_for_project = (
288 "https://{}:8008/v2/api/schema/project/".format(confd_host)
289 )
290 url_for_project = (
291 "https://{}:8008/v2/api/config/project/".format(confd_host)
292 )
293 data = self.get_schema(confd_host, schema_url_for_project)
294
295 key_dict = {}
296 k_dict = self.traverse_and_find_all_keys(data, key_dict)
297
298 number_of_tests = [0, 0] # [total no. of tests, no. of erros]
299
300 # Traverse with depth but with out any specific key
301 for depth in range(5, 6):
302 number_of_tests = self.traverse_call(
303 test_input, data["project"], k_dict, confd_host,
304 logger, number_of_tests, depth, url_for_project, state)
305 logger.debug(
306 'No of tests ran for projectapi: {}'.format(number_of_tests[0]))
307 logger.debug(
308 'No of crashed tests for projectapi:{}'.format(number_of_tests[1]))