Feature 10950: Replace pycrypto with pycryptodome
[osm/N2VC.git] / n2vc / store.py
index cd6c6fb..e9586d7 100644 (file)
 
 import abc
 import asyncio
-from base64 import b64decode
-import re
 import typing
 
-from Crypto.Cipher import AES
 from motor.motor_asyncio import AsyncIOMotorClient
 from n2vc.config import EnvironConfig
 from n2vc.vca.connection_data import ConnectionData
 from osm_common.dbmongo import DbMongo, DbException
+from osm_common.dbbase import Encryption
+
 
 DB_NAME = "osm"
 
@@ -195,6 +194,13 @@ class MotorStore(Store):
         self.loop = loop or asyncio.get_event_loop()
         self._secret_key = None
         self._config = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"])
+        self.encryption = Encryption(
+            uri=uri,
+            config=self._config,
+            encoding_type="utf-8",
+            loop=self.loop,
+            logger_name="db",
+        )
 
     @property
     def _database(self):
@@ -223,7 +229,7 @@ class MotorStore(Store):
         data = await self._vca_collection.find_one({"_id": vca_id})
         if not data:
             raise Exception("vca with id {} not found".format(vca_id))
-        await self.decrypt_fields(
+        await self.encryption.decrypt_fields(
             data,
             ["secret", "cacert"],
             schema_version=data["schema_version"],
@@ -294,114 +300,3 @@ class MotorStore(Store):
     async def _get_juju_info(self):
         """Get Juju information (the default VCA) from the admin collection"""
         return await self._admin_collection.find_one({"_id": "juju"})
-
-    # DECRYPT METHODS
-    async def decrypt_fields(
-        self,
-        item: dict,
-        fields: typing.List[str],
-        schema_version: str = None,
-        salt: str = None,
-    ):
-        """
-        Decrypt fields
-
-        Decrypt fields from a dictionary. Follows the same logic as in osm_common.
-
-        :param: item: Dictionary with the keys to be decrypted
-        :param: fields: List of keys to decrypt
-        :param: schema version: Schema version. (i.e. 1.11)
-        :param: salt: Salt for the decryption
-        """
-        flags = re.I
-
-        async def process(_item):
-            if isinstance(_item, list):
-                for elem in _item:
-                    await process(elem)
-            elif isinstance(_item, dict):
-                for key, val in _item.items():
-                    if isinstance(val, str):
-                        if any(re.search(f, key, flags) for f in fields):
-                            _item[key] = await self.decrypt(val, schema_version, salt)
-                    else:
-                        await process(val)
-
-        await process(item)
-
-    async def decrypt(self, value, schema_version=None, salt=None):
-        """
-        Decrypt an encrypted value
-        :param value: value to be decrypted. It is a base64 string
-        :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done.
-               If '1.1' symmetric AES encryption has been done
-        :param salt: optional salt to be used
-        :return: Plain content of value
-        """
-        await self.get_secret_key()
-        if not self.secret_key or not schema_version or schema_version == "1.0":
-            return value
-        else:
-            secret_key = self._join_secret_key(salt)
-            encrypted_msg = b64decode(value)
-            cipher = AES.new(secret_key)
-            decrypted_msg = cipher.decrypt(encrypted_msg)
-            try:
-                unpadded_private_msg = decrypted_msg.decode().rstrip("\0")
-            except UnicodeDecodeError:
-                raise DbException(
-                    "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
-                    http_code=500,
-                )
-            return unpadded_private_msg
-
-    def _join_secret_key(self, update_key: typing.Any) -> bytes:
-        """
-        Join key with secret key
-
-        :param: update_key: str or bytes with the to update
-
-        :return: Joined key
-        """
-        return self._join_keys(update_key, self.secret_key)
-
-    def _join_keys(self, key: typing.Any, secret_key: bytes) -> bytes:
-        """
-        Join key with secret_key
-
-        :param: key: str or bytesof the key to update
-        :param: secret_key: bytes of the secret key
-
-        :return: Joined key
-        """
-        if isinstance(key, str):
-            update_key_bytes = key.encode()
-        else:
-            update_key_bytes = key
-        new_secret_key = bytearray(secret_key) if secret_key else bytearray(32)
-        for i, b in enumerate(update_key_bytes):
-            new_secret_key[i % 32] ^= b
-        return bytes(new_secret_key)
-
-    @property
-    def secret_key(self):
-        return self._secret_key
-
-    async def get_secret_key(self):
-        """
-        Get secret key using the database key and the serial key in the DB
-        The key is populated in the property self.secret_key
-        """
-        if self.secret_key:
-            return
-        secret_key = None
-        if self.database_key:
-            secret_key = self._join_keys(self.database_key, None)
-        version_data = await self._admin_collection.find_one({"_id": "version"})
-        if version_data and version_data.get("serial"):
-            secret_key = self._join_keys(b64decode(version_data["serial"]), secret_key)
-        self._secret_key = secret_key
-
-    @property
-    def database_key(self):
-        return self._config["database_commonkey"]