X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=RO%2Fosm_ro%2Fwim%2Ftests%2Ftest_persistence.py;fp=RO%2Fosm_ro%2Fwim%2Ftests%2Ftest_persistence.py;h=ecca4fac24e6b847d95614b3abac1807fa63001b;hp=0000000000000000000000000000000000000000;hb=51cc9c4c78bb54c84a5d2399207d206eb30ea247;hpb=9f40121f66e644ddf700720d8d4bdf464f6dd414 diff --git a/RO/osm_ro/wim/tests/test_persistence.py b/RO/osm_ro/wim/tests/test_persistence.py new file mode 100644 index 00000000..ecca4fac --- /dev/null +++ b/RO/osm_ro/wim/tests/test_persistence.py @@ -0,0 +1,260 @@ +# -*- coding: utf-8 -*- +## +# Copyright 2018 University of Bristol - High Performance Networks Research +# Group +# All Rights Reserved. +# +# Contributors: Anderson Bravalheri, Dimitrios Gkounis, Abubakar Siddique +# Muqaddas, Navdeep Uniyal, Reza Nejabati and Dimitra Simeonidou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: +# +# Neither the name of the University of Bristol nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# This work has been performed in the context of DCMS UK 5G Testbeds +# & Trials Programme and in the framework of the Metro-Haul project - +# funded by the European Commission under Grant number 761727 through the +# Horizon 2020 and 5G-PPP programmes. +## + +import unittest +from itertools import chain + +from . import fixtures as eg +from ...tests.db_helpers import ( + TestCaseWithDatabasePerTest, + disable_foreign_keys, + uuid +) +from ..persistence import ( + WimPersistence, + hide_confidential_fields, + serialize_fields, + unserialize_fields +) + + +class TestPersistenceUtils(unittest.TestCase): + def test_hide_confidential_fields(self): + example = { + 'password': '123456', + 'nested.password': '123456', + 'nested.secret': None, + } + result = hide_confidential_fields(example, + fields=('password', 'secret')) + for field in 'password', 'nested.password': + assert result[field].startswith('***') + self.assertIs(result['nested.secret'], None) + + def test_serialize_fields(self): + example = { + 'config': dict(x=1), + 'nested.info': [1, 2, 3], + 'nested.config': None + } + result = serialize_fields(example, fields=('config', 'info')) + for field in 'config', 'nested.info': + self.assertIsInstance(result[field], str) + self.assertIs(result['nested.config'], None) + + def test_unserialize_fields(self): + example = { + 'config': '{"x": 1}', + 'nested.info': '[1,2,3]', + 'nested.config': None, + 'confidential.info': '{"password": "abcdef"}' + } + result = unserialize_fields(example, fields=('config', 'info')) + self.assertEqual(result['config'], dict(x=1)) + self.assertEqual(result['nested.info'], [1, 2, 3]) + self.assertIs(result['nested.config'], None) + self.assertNotEqual(result['confidential.info']['password'], 'abcdef') + assert result['confidential.info']['password'].startswith('***') + + +class TestWimPersistence(TestCaseWithDatabasePerTest): + def setUp(self): + super(TestWimPersistence, self).setUp() + self.persist = WimPersistence(self.db) + + def populate(self, seeds=None): + super(TestWimPersistence, self).populate(seeds or eg.consistent_set()) + + def test_query_offset(self): + # Given a database contains 4 records + self.populate([{'wims': [eg.wim(i) for i in range(4)]}]) + + # When we query using a limit of 2 and a offset of 1 + results = self.persist.query('wims', + ORDER_BY='name', LIMIT=2, OFFSET=1) + # Then we should have 2 results, skipping the first record + names = [r['name'] for r in results] + self.assertItemsEqual(names, ['wim1', 'wim2']) + + def test_get_wim_account_by_wim_tenant(self): + # Given a database contains WIM accounts associated to Tenants + self.populate() + + # when we retrieve the account using wim and tenant + wim_account = self.persist.get_wim_account_by( + uuid('wim0'), uuid('tenant0')) + + # then the right record should be returned + self.assertEqual(wim_account['uuid'], uuid('wim-account00')) + self.assertEqual(wim_account['name'], 'wim-account00') + self.assertEqual(wim_account['user'], 'user00') + + def test_get_wim_account_by_wim_tenant__names(self): + # Given a database contains WIM accounts associated to Tenants + self.populate() + + # when we retrieve the account using wim and tenant + wim_account = self.persist.get_wim_account_by( + 'wim0', 'tenant0') + + # then the right record should be returned + self.assertEqual(wim_account['uuid'], uuid('wim-account00')) + self.assertEqual(wim_account['name'], 'wim-account00') + self.assertEqual(wim_account['user'], 'user00') + + def test_get_wim_accounts_by_wim(self): + # Given a database contains WIM accounts associated to Tenants + self.populate() + + # when we retrieve the accounts using wim + wim_accounts = self.persist.get_wim_accounts_by(uuid('wim0')) + + # then the right records should be returned + self.assertEqual(len(wim_accounts), eg.NUM_TENANTS) + for account in wim_accounts: + self.assertEqual(account['wim_id'], uuid('wim0')) + + def test_get_wim_port_mappings(self): + # Given a database with WIMs, datacenters and port-mappings + self.populate() + + # when we retrieve the port mappings for a list of datacenters + # using either names or uuids + for criteria in ([uuid('dc0'), uuid('dc1')], ['dc0', 'dc1']): + mappings = self.persist.get_wim_port_mappings(datacenter=criteria) + + # then each result should have a datacenter_id + datacenters = [m['datacenter_id'] for m in mappings] + for datacenter in datacenters: + self.assertIn(datacenter, [uuid('dc0'), uuid('dc1')]) + + # a wim_id + wims = [m['wim_id'] for m in mappings] + for wim in wims: + self.assertIsNot(wim, None) + + # and a array of pairs 'wan' <> 'pop' connections + pairs = chain(*(m['pop_wan_mappings'] for m in mappings)) + self.assertEqual(len(list(pairs)), 2 * eg.NUM_WIMS) + + def test_get_wim_port_mappings_multiple(self): + # Given we have more then one connection in a datacenter managed by the + # WIM + self.populate() + self.populate([{ + 'wim_port_mappings': [ + eg.wim_port_mapping( + 0, 0, + pop_dpid='CC:CC:CC:CC:CC:CC:CC:CC', + wan_dpid='DD:DD:DD:DD:DD:DD:DD:DD'), + eg.wim_port_mapping( + 0, 0, + pop_dpid='EE:EE:EE:EE:EE:EE:EE:EE', + wan_dpid='FF:FF:FF:FF:FF:FF:FF:FF')]}]) + + # when we retrieve the port mappings for the wim and datacenter: + mappings = ( + self.persist.get_wim_port_mappings(wim='wim0', datacenter='dc0')) + + # then it should return just a single result, grouped by wim and + # datacenter + self.assertEqual(len(mappings), 1) + self.assertEqual(mappings[0]['wim_id'], uuid('wim0')) + self.assertEqual(mappings[0]['datacenter_id'], uuid('dc0')) + + self.assertEqual(len(mappings[0]['pop_wan_mappings']), 3) + + # when we retreive the mappings for more then one wim/datacenter + # the grouping should still work properly + mappings = self.persist.get_wim_port_mappings( + wim=['wim0', 'wim1'], datacenter=['dc0', 'dc1']) + self.assertEqual(len(mappings), 4) + pairs = chain(*(m['pop_wan_mappings'] for m in mappings)) + self.assertEqual(len(list(pairs)), 6) + + def test_get_actions_in_group(self): + # Given a good number of wim actions exist in the database + kwargs = {'action_id': uuid('action0')} + actions = (eg.wim_actions('CREATE', num_links=8, **kwargs) + + eg.wim_actions('FIND', num_links=8, **kwargs) + + eg.wim_actions('START', num_links=8, **kwargs)) + for i, action in enumerate(actions): + action['task_index'] = i + + self.populate([ + {'nfvo_tenants': eg.tenant()} + ] + eg.wim_set() + [ + {'instance_actions': eg.instance_action(**kwargs)}, + {'vim_wim_actions': actions} + ]) + + # When we retrieve them in groups + limit = 5 + results = self.persist.get_actions_in_groups( + uuid('wim-account00'), ['instance_wim_nets'], group_limit=limit) + + # Then we should have N groups where N == limit + self.assertEqual(len(results), limit) + for _, task_list in results: + # And since for each link we have create 3 actions (create, find, + # start), we should find them in each group + self.assertEqual(len(task_list), 3) + + @disable_foreign_keys + def test_update_instance_action_counters(self): + # Given we have one instance action in the database with 2 incomplete + # tasks + action = eg.instance_action(num_tasks=2) + self.populate([{'instance_actions': action}]) + # When we update the done counter by 0, nothing should happen + self.persist.update_instance_action_counters(action['uuid'], done=0) + result = self.persist.get_by_uuid('instance_actions', action['uuid']) + self.assertEqual(result['number_done'], 0) + self.assertEqual(result['number_failed'], 0) + # When we update the done counter by 2, number_done should be 2 + self.persist.update_instance_action_counters(action['uuid'], done=2) + result = self.persist.get_by_uuid('instance_actions', action['uuid']) + self.assertEqual(result['number_done'], 2) + self.assertEqual(result['number_failed'], 0) + # When we update the done counter by -1, and the failed counter by 1 + self.persist.update_instance_action_counters( + action['uuid'], done=-1, failed=1) + # Then we should see 1 and 1 + result = self.persist.get_by_uuid('instance_actions', action['uuid']) + self.assertEqual(result['number_done'], 1) + self.assertEqual(result['number_failed'], 1) + + +if __name__ == '__main__': + unittest.main()