# See the License for the specific language governing permissions and
# limitations under the License.
-tox -e flake8
-tox -e unittest
+tox -e flake8 && \
+tox -e unittest && \
tox -e pytest
# See the License for the specific language governing permissions and
# limitations under the License.
-version = '0.1.10'
+version = '0.1.11'
# TODO add package version filling commit id with 0's; e.g.: '5.0.0.post11+00000000.dirty-1'
-date_version = '2018-10-22'
-
+date_version = '2018-10-23'
class DbBase(object):
- def __init__(self, logger_name='db', master_password=None):
+ def __init__(self, logger_name='db'):
"""
Constructor od dbBase
:param logger_name: logging name
- :param master_password: master password used for encrypt decrypt methods
"""
self.logger = logging.getLogger(logger_name)
- self.master_password = master_password
+ self.master_password = None
self.secret_key = None
def db_connect(self, config, target_version=None):
"""
Connect to database
- :param config: Configuration of database
+ :param config: Configuration of database. Contains among others:
+ host: database hosst (mandatory)
+ port: database port (mandatory)
+ name: database name (mandatory)
+ user: database username
+ password: database password
+ masterpassword: database password used for sensible information encryption
:param target_version: if provided it checks if database contains required version, raising exception otherwise.
:return: None or raises DbException on error
"""
class DbMemory(DbBase):
- def __init__(self, logger_name='db', master_password=None):
- super().__init__(logger_name, master_password)
+ def __init__(self, logger_name='db'):
+ super().__init__(logger_name)
self.db = {}
def db_connect(self, config):
"""
if "logger_name" in config:
self.logger = logging.getLogger(config["logger_name"])
+ self.master_password = config.get("masterpassword")
@staticmethod
def _format_filter(q_filter):
conn_initial_timout = 120
conn_timout = 10
- def __init__(self, logger_name='db', master_password=None):
- super().__init__(logger_name, master_password)
+ def __init__(self, logger_name='db'):
+ super().__init__(logger_name)
self.client = None
self.db = None
try:
if "logger_name" in config:
self.logger = logging.getLogger(config["logger_name"])
+ self.master_password = config.get("masterpassword")
self.client = MongoClient(config["host"], config["port"])
+ # TODO add as parameters also username=config.get("user"), password=config.get("password"))
+ # when all modules are ready
self.db = self.client[config["name"]]
if "loglevel" in config:
self.logger.setLevel(getattr(logging, config['loglevel']))
async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
raise MsgException("Method 'aioread' not implemented", http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
-
class TestEncryption(unittest.TestCase):
def setUp(self):
master_password = "Setting a long master password with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
- db_base1 = DbBase(master_password=master_password)
+ db_base1 = DbBase()
db_base2 = DbBase()
# set self.secret_key obtained when connect
- db_base1.secret_key = DbBase._join_passwords(urandom(32), db_base1.master_password)
- db_base2.secret_key = DbBase._join_passwords(urandom(32), db_base2.master_password)
+ db_base1.secret_key = DbBase._join_passwords(urandom(32), master_password)
+ db_base2.secret_key = DbBase._join_passwords(urandom(32), None)
self.db_base = [db_base1, db_base2]
def test_encrypt_decrypt(self):