Adding flake8 tests over some files
[osm/RO.git] / osm_ro / wim / tests / test_http_handler.py
1 # -*- coding: utf-8 -*-
2 ##
3 # Copyright 2018 University of Bristol - High Performance Networks Research
4 # Group
5 # All Rights Reserved.
6 #
7 # Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique
8 # Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou
9 #
10 # Licensed under the Apache License, Version 2.0 (the "License"); you may
11 # not use this file except in compliance with the License. You may obtain
12 # a copy of the License at
13 #
14 # http://www.apache.org/licenses/LICENSE-2.0
15 #
16 # Unless required by applicable law or agreed to in writing, software
17 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
18 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
19 # License for the specific language governing permissions and limitations
20 # under the License.
21 #
22 # For those usages not covered by the Apache License, Version 2.0 please
23 # contact with: <highperformance-networks@bristol.ac.uk>
24 #
25 # Neither the name of the University of Bristol nor the names of its
26 # contributors may be used to endorse or promote products derived from
27 # this software without specific prior written permission.
28 #
29 # This work has been performed in the context of DCMS UK 5G Testbeds
30 # & Trials Programme and in the framework of the Metro-Haul project -
31 # funded by the European Commission under Grant number 761727 through the
32 # Horizon 2020 and 5G-PPP programmes.
33 ##
34
35 from __future__ import unicode_literals
36
37 import unittest
38
39 import bottle
40 from mock import MagicMock, patch
41 from webtest import TestApp
42
43 from . import fixtures as eg # "examples"
44 from ...http_tools.errors import Conflict, Not_Found
45 from ...tests.db_helpers import TestCaseWithDatabasePerTest, uuid
46 from ...utils import merge_dicts
47 from ..http_handler import WimHandler
48
49 OK = 200
50
51
52 @patch('osm_ro.wim.wim_thread.CONNECTORS', MagicMock()) # Avoid external calls
53 @patch('osm_ro.wim.wim_thread.WimThread.start', MagicMock()) # Avoid running
54 class TestHttpHandler(TestCaseWithDatabasePerTest):
55 def setUp(self):
56 super(TestHttpHandler, self).setUp()
57 bottle.debug(True)
58 handler = WimHandler(db=self.db)
59 self.engine = handler.engine
60 self.addCleanup(self.engine.stop_threads)
61 self.app = TestApp(handler.wsgi_app)
62
63 def populate(self, seeds=None):
64 super(TestHttpHandler, self).populate(seeds or eg.consistent_set())
65
66 def test_list_wims(self):
67 # Given some wims are registered in the database
68 self.populate()
69 # when a GET /<tenant_id>/wims request arrives
70 tenant_id = uuid('tenant0')
71 response = self.app.get('/{}/wims'.format(tenant_id))
72
73 # then the request should be well succeeded
74 self.assertEqual(response.status_code, OK)
75 # and all the registered wims should be present
76 retrieved_wims = {v['name']: v for v in response.json['wims']}
77 for name in retrieved_wims:
78 identifier = int(name.replace('wim', ''))
79 self.assertDictContainsSubset(
80 eg.wim(identifier), retrieved_wims[name])
81
82 def test_show_wim(self):
83 # Given some wims are registered in the database
84 self.populate()
85 # when a GET /<tenant_id>/wims/<wim_id> request arrives
86 tenant_id = uuid('tenant0')
87 wim_id = uuid('wim1')
88 response = self.app.get('/{}/wims/{}'.format(tenant_id, wim_id))
89
90 # then the request should be well succeeded
91 self.assertEqual(response.status_code, OK)
92 # and the registered wim (wim1) should be present
93 self.assertDictContainsSubset(eg.wim(1), response.json['wim'])
94 # Moreover, it also works with tenant_id = all
95 response = self.app.get('/any/wims/{}'.format(wim_id))
96 self.assertEqual(response.status_code, OK)
97 self.assertDictContainsSubset(eg.wim(1), response.json['wim'])
98
99 def test_show_wim__wim_doesnt_exists(self):
100 # Given wim_id does not refer to any already registered wim
101 self.populate()
102 # when a GET /<tenant_id>/wims/<wim_id> request arrives
103 tenant_id = uuid('tenant0')
104 wim_id = uuid('wim999')
105 response = self.app.get(
106 '/{}/wims/{}'.format(tenant_id, wim_id),
107 expect_errors=True)
108
109 # then the result should not be well succeeded
110 self.assertEqual(response.status_code, Not_Found)
111
112 def test_show_wim__tenant_doesnt_exists(self):
113 # Given wim_id does not refer to any already registered wim
114 self.populate()
115 # when a GET /<tenant_id>/wims/<wim_id> request arrives
116 tenant_id = uuid('tenant999')
117 wim_id = uuid('wim0')
118 response = self.app.get(
119 '/{}/wims/{}'.format(tenant_id, wim_id),
120 expect_errors=True)
121
122 # then the result should not be well succeeded
123 self.assertEqual(response.status_code, Not_Found)
124
125 def test_edit_wim(self):
126 # Given a WIM exists in the database
127 self.populate()
128 # when a PUT /wims/<wim_id> request arrives
129 wim_id = uuid('wim1')
130 response = self.app.put_json('/wims/{}'.format(wim_id), {
131 'wim': {'name': 'My-New-Name'}})
132
133 # then the request should be well succeeded
134 self.assertEqual(response.status_code, OK)
135 # and the registered wim (wim1) should be present
136 self.assertDictContainsSubset(
137 merge_dicts(eg.wim(1), name='My-New-Name'),
138 response.json['wim'])
139
140 def test_edit_wim__port_mappings(self):
141 # Given a WIM exists in the database
142 self.populate()
143 # when a PUT /wims/<wim_id> request arrives
144 wim_id = uuid('wim1')
145 response = self.app.put_json(
146 '/wims/{}'.format(wim_id), {
147 'wim': dict(
148 name='My-New-Name',
149 config={'wim_port_mapping': [{
150 'datacenter_name': 'dc0',
151 'pop_wan_mappings': [{
152 'pop_switch_dpid': '00:AA:11:BB:22:CC:33:DD',
153 'pop_switch_port': 1,
154 'wan_service_mapping_info': {
155 'mapping_type': 'dpid-port',
156 'wan_switch_dpid': 'BB:BB:BB:BB:BB:BB:BB:0A',
157 'wan_switch_port': 1
158 }
159 }]}]
160 }
161 )
162 }
163 )
164
165 # then the request should be well succeeded
166 self.assertEqual(response.status_code, OK)
167 # and the registered wim (wim1) should be present
168 self.assertDictContainsSubset(
169 merge_dicts(eg.wim(1), name='My-New-Name'),
170 response.json['wim'])
171 # and the port mappings hould be updated
172 mappings = response.json['wim']['config']['wim_port_mapping']
173 self.assertEqual(len(mappings), 1)
174 self.assertEqual(
175 mappings[0]['pop_wan_mappings'][0]['pop_switch_dpid'],
176 '00:AA:11:BB:22:CC:33:DD')
177
178 def test_delete_wim(self):
179 # Given a WIM exists in the database
180 self.populate()
181 num_accounts = self.count('wim_accounts')
182 num_associations = self.count('wim_nfvo_tenants')
183 num_mappings = self.count('wim_port_mappings')
184
185 with self.engine.threads_running():
186 num_threads = len(self.engine.threads)
187 # when a DELETE /wims/<wim_id> request arrives
188 wim_id = uuid('wim1')
189 response = self.app.delete('/wims/{}'.format(wim_id))
190 num_threads_after = len(self.engine.threads)
191
192 # then the request should be well succeeded
193 self.assertEqual(response.status_code, OK)
194 self.assertIn('deleted', response.json['result'])
195 # and the registered wim1 should be deleted
196 response = self.app.get(
197 '/any/wims/{}'.format(wim_id),
198 expect_errors=True)
199 self.assertEqual(response.status_code, Not_Found)
200 # and all the dependent records in other tables should be deleted:
201 # wim_accounts, wim_nfvo_tenants, wim_port_mappings
202 self.assertEqual(self.count('wim_nfvo_tenants'),
203 num_associations - eg.NUM_TENANTS)
204 self.assertLess(self.count('wim_port_mappings'), num_mappings)
205 self.assertEqual(self.count('wim_accounts'),
206 num_accounts - eg.NUM_TENANTS)
207 # And the threads associated with the wim accounts should be stopped
208 self.assertEqual(num_threads_after, num_threads - eg.NUM_TENANTS)
209
210 def test_create_wim(self):
211 # Given no WIM exists yet
212 # when a POST /wims request arrives with the right payload
213 response = self.app.post_json('/wims', {'wim': eg.wim(999)})
214
215 # then the request should be well succeeded
216 self.assertEqual(response.status_code, OK)
217 self.assertEqual(response.json['wim']['name'], 'wim999')
218
219 def test_create_wim__port_mappings(self):
220 self.populate()
221 # when a POST /wims request arrives with the right payload
222 response = self.app.post_json(
223 '/wims', {
224 'wim': merge_dicts(
225 eg.wim(999),
226 config={'wim_port_mapping': [{
227 'datacenter_name': 'dc0',
228 'pop_wan_mappings': [{
229 'pop_switch_dpid': 'AA:AA:AA:AA:AA:AA:AA:01',
230 'pop_switch_port': 1,
231 'wan_service_mapping_info': {
232 'mapping_type': 'dpid-port',
233 'wan_switch_dpid': 'BB:BB:BB:BB:BB:BB:BB:01',
234 'wan_switch_port': 1
235 }
236 }]}]
237 }
238 )
239 }
240 )
241
242 # then the request should be well succeeded
243 self.assertEqual(response.status_code, OK)
244 self.assertEqual(response.json['wim']['name'], 'wim999')
245 self.assertEqual(
246 len(response.json['wim']['config']['wim_port_mapping']), 1)
247
248 def test_create_wim_account(self):
249 # Given a WIM and a NFVO tenant exist but are not associated
250 self.populate([{'wims': [eg.wim(0)]},
251 {'nfvo_tenants': [eg.tenant(0)]}])
252
253 with self.engine.threads_running():
254 num_threads = len(self.engine.threads)
255 # when a POST /<tenant_id>/wims/<wim_id> arrives
256 response = self.app.post_json(
257 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')),
258 {'wim_account': eg.wim_account(0, 0)})
259
260 num_threads_after = len(self.engine.threads)
261
262 # then a new thread should be created
263 self.assertEqual(num_threads_after, num_threads + 1)
264
265 # and the request should be well succeeded
266 self.assertEqual(response.status_code, OK)
267 self.assertEqual(response.json['wim_account']['name'], 'wim-account00')
268
269 # and a new association record should be created
270 association = self.db.get_rows(FROM='wim_nfvo_tenants')
271 assert association
272 self.assertEqual(len(association), 1)
273 self.assertEqual(association[0]['wim_id'], uuid('wim0'))
274 self.assertEqual(association[0]['nfvo_tenant_id'], uuid('tenant0'))
275 self.assertEqual(association[0]['wim_account_id'],
276 response.json['wim_account']['uuid'])
277
278 def test_create_wim_account__existing_account(self):
279 # Given a WIM, a WIM account and a NFVO tenants exist
280 # But the NFVO and the WIM are not associated
281 self.populate([
282 {'wims': [eg.wim(0)]},
283 {'nfvo_tenants': [eg.tenant(0)]},
284 {'wim_accounts': [eg.wim_account(0, 0)]}])
285
286 # when a POST /<tenant_id>/wims/<wim_id> arrives
287 # and it refers to an existing wim account
288 response = self.app.post_json(
289 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')),
290 {'wim_account': {'name': 'wim-account00'}})
291
292 # then the request should be well succeeded
293 self.assertEqual(response.status_code, OK)
294 # and the association should be created
295 association = self.db.get_rows(
296 FROM='wim_nfvo_tenants',
297 WHERE={'wim_id': uuid('wim0'),
298 'nfvo_tenant_id': uuid('tenant0')})
299 assert association
300 self.assertEqual(len(association), 1)
301 # but no new wim_account should be created
302 wim_accounts = self.db.get_rows(FROM='wim_accounts')
303 self.assertEqual(len(wim_accounts), 1)
304 self.assertEqual(wim_accounts[0]['name'], 'wim-account00')
305
306 def test_create_wim_account__existing_account__differing(self):
307 # Given a WIM, a WIM account and a NFVO tenants exist
308 # But the NFVO and the WIM are not associated
309 self.populate([
310 {'wims': [eg.wim(0)]},
311 {'nfvo_tenants': [eg.tenant(0)]},
312 {'wim_accounts': [eg.wim_account(0, 0)]}])
313
314 # when a POST /<tenant_id>/wims/<wim_id> arrives
315 # and it refers to an existing wim account,
316 # but with different fields
317 response = self.app.post_json(
318 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')), {
319 'wim_account': {
320 'name': 'wim-account00',
321 'user': 'john',
322 'password': 'abc123'}},
323 expect_errors=True)
324
325 # then the request should not be well succeeded
326 self.assertEqual(response.status_code, Conflict)
327 # some useful message should be displayed
328 response.mustcontain('attempt to overwrite', 'user', 'password')
329 # and the association should not be created
330 association = self.db.get_rows(
331 FROM='wim_nfvo_tenants',
332 WHERE={'wim_id': uuid('wim0'),
333 'nfvo_tenant_id': uuid('tenant0')})
334 assert not association
335
336 def test_create_wim_account__association_already_exists(self):
337 # Given a WIM, a WIM account and a NFVO tenants exist
338 # and are correctly associated
339 self.populate()
340 num_assoc_before = self.count('wim_nfvo_tenants')
341
342 # when a POST /<tenant_id>/wims/<wim_id> arrives trying to connect a
343 # WIM and a tenant for the second time
344 response = self.app.post_json(
345 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')), {
346 'wim_account': {
347 'user': 'user999',
348 'password': 'password999'}},
349 expect_errors=True)
350
351 # then the request should not be well succeeded
352 self.assertEqual(response.status_code, Conflict)
353 # the message should be useful
354 response.mustcontain('There is already', uuid('wim0'), uuid('tenant0'))
355
356 num_assoc_after = self.count('wim_nfvo_tenants')
357
358 # and the number of association record should not be increased
359 self.assertEqual(num_assoc_before, num_assoc_after)
360
361 def test_create_wim__tenant_doesnt_exist(self):
362 # Given a tenant not exists
363 self.populate()
364
365 # But the user tries to create a wim_account anyway
366 response = self.app.post_json(
367 '/{}/wims/{}'.format(uuid('tenant999'), uuid('wim0')), {
368 'wim_account': {
369 'user': 'user999',
370 'password': 'password999'}},
371 expect_errors=True)
372
373 # then the request should not be well succeeded
374 self.assertEqual(response.status_code, Not_Found)
375 # the message should be useful
376 response.mustcontain('No record was found', uuid('tenant999'))
377
378 def test_create_wim__wim_doesnt_exist(self):
379 # Given a tenant not exists
380 self.populate()
381
382 # But the user tries to create a wim_account anyway
383 response = self.app.post_json(
384 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim999')), {
385 'wim_account': {
386 'user': 'user999',
387 'password': 'password999'}},
388 expect_errors=True)
389
390 # then the request should not be well succeeded
391 self.assertEqual(response.status_code, Not_Found)
392 # the message should be useful
393 response.mustcontain('No record was found', uuid('wim999'))
394
395 def test_update_wim_account(self):
396 # Given a WIM account connecting a tenant and a WIM exists
397 self.populate()
398
399 with self.engine.threads_running():
400 num_threads = len(self.engine.threads)
401
402 thread = self.engine.threads[uuid('wim-account00')]
403 reload = MagicMock(wraps=thread.reload)
404
405 with patch.object(thread, 'reload', reload):
406 # when a PUT /<tenant_id>/wims/<wim_id> arrives
407 response = self.app.put_json(
408 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')), {
409 'wim_account': {
410 'name': 'account888',
411 'user': 'user888'}})
412
413 num_threads_after = len(self.engine.threads)
414
415 # then the wim thread should be restarted
416 reload.assert_called_once()
417 # and no thread should be added or removed
418 self.assertEqual(num_threads_after, num_threads)
419
420 # and the request should be well succeeded
421 self.assertEqual(response.status_code, OK)
422 self.assertEqual(response.json['wim_account']['name'], 'account888')
423 self.assertEqual(response.json['wim_account']['user'], 'user888')
424
425 def test_update_wim_account__multiple(self):
426 # Given a WIM account connected to several tenants
427 self.populate()
428
429 with self.engine.threads_running():
430 # when a PUT /any/wims/<wim_id> arrives
431 response = self.app.put_json(
432 '/any/wims/{}'.format(uuid('wim0')), {
433 'wim_account': {
434 'user': 'user888',
435 'config': {'x': 888}}})
436
437 # then the request should be well succeeded
438 self.assertEqual(response.status_code, OK)
439 self.assertEqual(len(response.json['wim_accounts']), eg.NUM_TENANTS)
440
441 for account in response.json['wim_accounts']:
442 self.assertEqual(account['user'], 'user888')
443 self.assertEqual(account['config']['x'], 888)
444
445 def test_delete_wim_account(self):
446 # Given a WIM account exists and it is connected to a tenant
447 self.populate()
448
449 num_accounts_before = self.count('wim_accounts')
450
451 with self.engine.threads_running():
452 thread = self.engine.threads[uuid('wim-account00')]
453 exit = MagicMock(wraps=thread.exit)
454 num_threads = len(self.engine.threads)
455
456 with patch.object(thread, 'exit', exit):
457 # when a PUT /<tenant_id>/wims/<wim_id> arrives
458 response = self.app.delete_json(
459 '/{}/wims/{}'.format(uuid('tenant0'), uuid('wim0')))
460
461 num_threads_after = len(self.engine.threads)
462
463 # then the wim thread should exit
464 self.assertEqual(num_threads_after, num_threads - 1)
465 exit.assert_called_once()
466
467 # and the request should be well succeeded
468 self.assertEqual(response.status_code, OK)
469 response.mustcontain('account `wim-account00` deleted')
470
471 # and the number of wim_accounts should decrease
472 num_accounts_after = self.count('wim_accounts')
473 self.assertEqual(num_accounts_after, num_accounts_before - 1)
474
475 def test_delete_wim_account__multiple(self):
476 # Given a WIM account exists and it is connected to several tenants
477 self.populate()
478
479 num_accounts_before = self.count('wim_accounts')
480
481 with self.engine.threads_running():
482 # when a PUT /<tenant_id>/wims/<wim_id> arrives
483 response = self.app.delete_json(
484 '/any/wims/{}'.format(uuid('wim0')))
485
486 # then the request should be well succeeded
487 self.assertEqual(response.status_code, OK)
488 response.mustcontain('account `wim-account00` deleted')
489 response.mustcontain('account `wim-account10` deleted')
490
491 # and the number of wim_accounts should decrease
492 num_accounts_after = self.count('wim_accounts')
493 self.assertEqual(num_accounts_after,
494 num_accounts_before - eg.NUM_TENANTS)
495
496 def test_delete_wim_account__doesnt_exist(self):
497 # Given we have a tenant that is not connected to a WIM
498 self.populate()
499 tenant = {'uuid': uuid('tenant888'), 'name': 'tenant888'}
500 self.populate([{'nfvo_tenants': [tenant]}])
501
502 num_accounts_before = self.count('wim_accounts')
503
504 # when a PUT /<tenant_id>/wims/<wim_id> arrives
505 response = self.app.delete(
506 '/{}/wims/{}'.format(uuid('tenant888'), uuid('wim0')),
507 expect_errors=True)
508
509 # then the request should not succeed
510 self.assertEqual(response.status_code, Not_Found)
511
512 # and the number of wim_accounts should not decrease
513 num_accounts_after = self.count('wim_accounts')
514 self.assertEqual(num_accounts_after, num_accounts_before)
515
516 def test_create_port_mappings(self):
517 # Given we have a wim and datacenter without any port mappings
518 self.populate([{'nfvo_tenants': eg.tenant(0)}] +
519 eg.datacenter_set(888, 0) +
520 eg.wim_set(999, 0))
521
522 # when a POST /<tenant_id>/wims/<wim_id>/port_mapping arrives
523 response = self.app.post_json(
524 '/{}/wims/{}/port_mapping'.format(uuid('tenant0'), uuid('wim999')),
525 {'wim_port_mapping': [{
526 'datacenter_name': 'dc888',
527 'pop_wan_mappings': [
528 {'pop_switch_dpid': 'AA:AA:AA:AA:AA:AA:AA:AA',
529 'pop_switch_port': 1,
530 'wan_service_mapping_info': {
531 'mapping_type': 'dpid-port',
532 'wan_switch_dpid': 'BB:BB:BB:BB:BB:BB:BB:BB',
533 'wan_switch_port': 1
534 }}
535 ]}
536 ]})
537
538 # the request should be well succeeded
539 self.assertEqual(response.status_code, OK)
540 # and port mappings should be stored in the database
541 port_mapping = self.db.get_rows(FROM='wim_port_mappings')
542 self.assertEqual(len(port_mapping), 1)
543
544 def test_get_port_mappings(self):
545 # Given WIMS and datacenters exist with port mappings between them
546 self.populate()
547 # when a GET /<tenant_id>/wims/<wim_id>/port_mapping arrives
548 response = self.app.get(
549 '/{}/wims/{}/port_mapping'.format(uuid('tenant0'), uuid('wim0')))
550 # the request should be well succeeded
551 self.assertEqual(response.status_code, OK)
552 # and we should see port mappings for each WIM, datacenter pair
553 mappings = response.json['wim_port_mapping']
554 self.assertEqual(len(mappings), eg.NUM_DATACENTERS)
555 # ^ In the fixture set all the datacenters are connected to all wims
556
557 def test_delete_port_mappings(self):
558 # Given WIMS and datacenters exist with port mappings between them
559 self.populate()
560 num_mappings_before = self.count('wim_port_mappings')
561
562 # when a DELETE /<tenant_id>/wims/<wim_id>/port_mapping arrives
563 response = self.app.delete(
564 '/{}/wims/{}/port_mapping'.format(uuid('tenant0'), uuid('wim0')))
565 # the request should be well succeeded
566 self.assertEqual(response.status_code, OK)
567 # and the number of port mappings should decrease
568 num_mappings_after = self.count('wim_port_mappings')
569 self.assertEqual(num_mappings_after,
570 num_mappings_before - eg.NUM_DATACENTERS)
571 # ^ In the fixture set all the datacenters are connected to all wims
572
573
574 if __name__ == '__main__':
575 unittest.main()