+# Copyright 2018 Whitestack, LLC
+# Copyright 2018 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
+##
+
import http
import pytest
import unittest
from osm_common.dbbase import DbBase, DbException, deep_update
+from os import urandom
+from http import HTTPStatus
def exception_message(message):
def test_db_connect(db_base):
- db_base.db_connect(None)
+ with pytest.raises(DbException) as excinfo:
+ db_base.db_connect(None)
+ assert str(excinfo.value).startswith(exception_message("Method 'db_connect' not implemented"))
def test_db_disconnect(db_base):
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
+class TestEncryption(unittest.TestCase):
+ def setUp(self):
+ master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
+ db_base1 = DbBase()
+ db_base2 = DbBase()
+ db_base3 = DbBase()
+ # set self.secret_key obtained when connect
+ db_base1.set_secret_key(master_key, replace=True)
+ db_base1.set_secret_key(urandom(32))
+ db_base2.set_secret_key(None, replace=True)
+ db_base2.set_secret_key(urandom(30))
+ db_base3.set_secret_key(master_key)
+ self.db_bases = [db_base1, db_base2, db_base3]
+
+ def test_encrypt_decrypt(self):
+ TEST = (
+ ("plain text 1 ! ", None),
+ ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"),
+ ("plain text 3 with usalt ! ", u"1afd5d1a-4a7e-4d9c-8c65-251290183106"),
+ (u"plain unicode 4 ! ", None),
+ (u"plain unicode 5 with salt ! ", "1a000d1a-4a7e-4d9c-8c65-251290183106"),
+ (u"plain unicode 6 with usalt ! ", u"1abcdd1a-4a7e-4d9c-8c65-251290183106"),
+ )
+ for db_base in self.db_bases:
+ for value, salt in TEST:
+ # no encryption
+ encrypted = db_base.encrypt(value, schema_version='1.0', salt=salt)
+ self.assertEqual(encrypted, value, "value '{}' has been encrypted".format(value))
+ decrypted = db_base.decrypt(encrypted, schema_version='1.0', salt=salt)
+ self.assertEqual(decrypted, value, "value '{}' has been decrypted".format(value))
+
+ # encrypt/decrypt
+ encrypted = db_base.encrypt(value, schema_version='1.1', salt=salt)
+ self.assertNotEqual(encrypted, value, "value '{}' has not been encrypted".format(value))
+ self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
+ decrypted = db_base.decrypt(encrypted, schema_version='1.1', salt=salt)
+ self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
+
+ def test_encrypt_decrypt_salt(self):
+ value = "value to be encrypted!"
+ encrypted = []
+ for db_base in self.db_bases:
+ for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
+ # encrypt/decrypt
+ encrypted.append(db_base.encrypt(value, schema_version='1.1', salt=salt))
+ self.assertNotEqual(encrypted[-1], value, "value '{}' has not been encrypted".format(value))
+ self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
+ decrypted = db_base.decrypt(encrypted[-1], schema_version='1.1', salt=salt)
+ self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
+ for i in range(0, len(encrypted)):
+ for j in range(i+1, len(encrypted)):
+ self.assertNotEqual(encrypted[i], encrypted[j],
+ "encryption with different salt must contain different result")
+ # decrypt with a different master key
+ try:
+ decrypted = self.db_bases[-1].decrypt(encrypted[0], schema_version='1.1', salt=None)
+ self.assertNotEqual(encrypted[0], decrypted, "Decryption with different KEY must generate different result")
+ except DbException as e:
+ self.assertEqual(e.http_code, HTTPStatus.INTERNAL_SERVER_ERROR,
+ "Decryption with different KEY does not provide expected http_code")
+
+
class TestDeepUpdate(unittest.TestCase):
def test_update_dict(self):
# Original, patch, expected result
deep_update(t[0], t[1])
except DbException as e:
print(e)
+
+
+if __name__ == '__main__':
+ unittest.main()