Reformat common to standardized format
[osm/common.git] / osm_common / tests / test_dbbase.py
index 1abd1c7..117350e 100644 (file)
@@ -43,7 +43,9 @@ def test_constructor():
 def test_db_connect(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.db_connect(None)
-    assert str(excinfo.value).startswith(exception_message("Method 'db_connect' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'db_connect' not implemented")
+    )
 
 
 def test_db_disconnect(db_base):
@@ -53,42 +55,54 @@ def test_db_disconnect(db_base):
 def test_get_list(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.get_list(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'get_list' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'get_list' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_get_one(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.get_one(None, None, None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'get_one' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'get_one' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_create(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.create(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'create' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'create' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_create_list(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.create_list(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'create_list' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'create_list' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_del_list(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.del_list(None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'del_list' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'del_list' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
 def test_del_one(db_base):
     with pytest.raises(DbException) as excinfo:
         db_base.del_one(None, None, None)
-    assert str(excinfo.value).startswith(exception_message("Method 'del_one' not implemented"))
+    assert str(excinfo.value).startswith(
+        exception_message("Method 'del_one' not implemented")
+    )
     assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
 
 
@@ -118,17 +132,25 @@ class TestEncryption(unittest.TestCase):
         for db_base in self.db_bases:
             for value, salt in TEST:
                 # no encryption
-                encrypted = db_base.encrypt(value, schema_version='1.0', salt=salt)
-                self.assertEqual(encrypted, value, "value '{}' has been encrypted".format(value))
-                decrypted = db_base.decrypt(encrypted, schema_version='1.0', salt=salt)
-                self.assertEqual(decrypted, value, "value '{}' has been decrypted".format(value))
+                encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
+                self.assertEqual(
+                    encrypted, value, "value '{}' has been encrypted".format(value)
+                )
+                decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
+                self.assertEqual(
+                    decrypted, value, "value '{}' has been decrypted".format(value)
+                )
 
                 # encrypt/decrypt
-                encrypted = db_base.encrypt(value, schema_version='1.1', salt=salt)
-                self.assertNotEqual(encrypted, value, "value '{}' has not been encrypted".format(value))
+                encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
+                self.assertNotEqual(
+                    encrypted, value, "value '{}' has not been encrypted".format(value)
+                )
                 self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
-                decrypted = db_base.decrypt(encrypted, schema_version='1.1', salt=salt)
-                self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
+                decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
+                self.assertEqual(
+                    decrypted, value, "value is not equal after encryption/decryption"
+                )
 
     def test_encrypt_decrypt_salt(self):
         value = "value to be encrypted!"
@@ -136,22 +158,44 @@ class TestEncryption(unittest.TestCase):
         for db_base in self.db_bases:
             for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
                 # encrypt/decrypt
-                encrypted.append(db_base.encrypt(value, schema_version='1.1', salt=salt))
-                self.assertNotEqual(encrypted[-1], value, "value '{}' has not been encrypted".format(value))
+                encrypted.append(
+                    db_base.encrypt(value, schema_version="1.1", salt=salt)
+                )
+                self.assertNotEqual(
+                    encrypted[-1],
+                    value,
+                    "value '{}' has not been encrypted".format(value),
+                )
                 self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
-                decrypted = db_base.decrypt(encrypted[-1], schema_version='1.1', salt=salt)
-                self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
+                decrypted = db_base.decrypt(
+                    encrypted[-1], schema_version="1.1", salt=salt
+                )
+                self.assertEqual(
+                    decrypted, value, "value is not equal after encryption/decryption"
+                )
         for i in range(0, len(encrypted)):
-            for j in range(i+1, len(encrypted)):
-                self.assertNotEqual(encrypted[i], encrypted[j],
-                                    "encryption with different salt must contain different result")
+            for j in range(i + 1, len(encrypted)):
+                self.assertNotEqual(
+                    encrypted[i],
+                    encrypted[j],
+                    "encryption with different salt must contain different result",
+                )
         # decrypt with a different master key
         try:
-            decrypted = self.db_bases[-1].decrypt(encrypted[0], schema_version='1.1', salt=None)
-            self.assertNotEqual(encrypted[0], decrypted, "Decryption with different KEY must generate different result")
+            decrypted = self.db_bases[-1].decrypt(
+                encrypted[0], schema_version="1.1", salt=None
+            )
+            self.assertNotEqual(
+                encrypted[0],
+                decrypted,
+                "Decryption with different KEY must generate different result",
+            )
         except DbException as e:
-            self.assertEqual(e.http_code, HTTPStatus.INTERNAL_SERVER_ERROR,
-                             "Decryption with different KEY does not provide expected http_code")
+            self.assertEqual(
+                e.http_code,
+                HTTPStatus.INTERNAL_SERVER_ERROR,
+                "Decryption with different KEY does not provide expected http_code",
+            )
 
 
 class TestDeepUpdate(unittest.TestCase):
@@ -193,30 +237,62 @@ class TestDeepUpdate(unittest.TestCase):
             ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
             ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
             # delete and insert at 0
-            ({"A": ["a", "b", "c"]}, {"A": {"$b": None, "$+[0]": "b"}}, {"A": ["b", "a", "c"]}),
+            (
+                {"A": ["a", "b", "c"]},
+                {"A": {"$b": None, "$+[0]": "b"}},
+                {"A": ["b", "a", "c"]},
+            ),
             # delete and edit
-            ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[1]": {"c": "d"}}}, {"A": [{"c": "d"}]}),
+            (
+                {"A": ["a", "b", "a"]},
+                {"A": {"$a": None, "$[1]": {"c": "d"}}},
+                {"A": [{"c": "d"}]},
+            ),
             # insert if not exist
             ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
             ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
             # edit by filter
-            ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}}}, {"A": ["a", {"c": "d"}, "a"]}),
-            ({"A": ["a", "b", "a"]}, {"A": {"$b": None, "$+[0]": "b", "$+": "c"}}, {"A": ["b", "a", "a", "c"]}),
+            (
+                {"A": ["a", "b", "a"]},
+                {"A": {"$b": {"c": "d"}}},
+                {"A": ["a", {"c": "d"}, "a"]},
+            ),
+            (
+                {"A": ["a", "b", "a"]},
+                {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
+                {"A": ["b", "a", "a", "c"]},
+            ),
             ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
             # index deletion out of range
             ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
             # nested array->dict
-            ({"A": ["a", "b", {"id": "1", "c": {"d": 2}}]}, {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
-             {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]}),
-            ({"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
-             {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
-             {"A": [{"id": 1, "c": {"d": "e", "f": "g"}}, {"id": 1, "c": {"d": "e", "f": "g"}}]}),
+            (
+                {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
+                {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
+                {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
+            ),
+            (
+                {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
+                {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
+                {
+                    "A": [
+                        {"id": 1, "c": {"d": "e", "f": "g"}},
+                        {"id": 1, "c": {"d": "e", "f": "g"}},
+                    ]
+                },
+            ),
             # nested array->array
-            ({"A": ["a", "b", ["a", "b"]]}, {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
-             {"A": ["a", ["a", {}, "c"]]}),
+            (
+                {"A": ["a", "b", ["a", "b"]]},
+                {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
+                {"A": ["a", ["a", {}, "c"]]},
+            ),
             # types str and int different, so not found
-            ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: 1": {"c": "e"}}}, {"A": ["a", {"id": "1", "c": "d"}]}),
-
+            (
+                {"A": ["a", {"id": "1", "c": "d"}]},
+                {"A": {"$id: 1": {"c": "e"}}},
+                {"A": ["a", {"id": "1", "c": "d"}]},
+            ),
         )
         for t in TEST:
             print(t)
@@ -244,7 +320,10 @@ class TestDeepUpdate(unittest.TestCase):
             # index edition out of range
             ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
             # conflict, two editions on index 2
-            ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}}),
+            (
+                {"A": ["a", {"id": "1", "c": "d"}]},
+                {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
+            ),
         )
         for t in TEST:
             print(t)
@@ -255,5 +334,5 @@ class TestDeepUpdate(unittest.TestCase):
                 print(e)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main()