inject_user_key routine fixes
[osm/RO.git] / osm_ro / db_base.py
index a4cdef3..e6e1134 100644 (file)
@@ -35,9 +35,85 @@ import json
 import time
 import logging
 import datetime
+from contextlib import contextmanager
+from functools import wraps, partial
+from threading import Lock
 from jsonschema import validate as js_v, exceptions as js_e
 
 from .http_tools import errors as httperrors
+from .utils import Attempt, get_arg, inject_args
+
+
+RECOVERY_TIME = 3
+
+_ATTEMPT = Attempt()
+
+
+def with_transaction(fn=None, cursor=None):
+    """Decorator that can be used together with instances of the ``db_base``
+    class, to perform database actions wrapped in a commit/rollback fashion
+
+    This decorator basically executes the function inside the context object
+    given by the ``transaction`` method in ``db_base``
+
+    Arguments:
+        cursor: [Optional] cursor class
+    """
+    if fn is None:  # Allows calling the decorator directly or with parameters
+        return partial(with_transaction, cursor=cursor)
+
+    @wraps(fn)
+    def _wrapper(self, *args, **kwargs):
+        cursor_type = None
+        if cursor == 'dict':
+            # MySQLdB define the "cursors" module attribute lazily,
+            # so we have to defer references to mdb.cursors.DictCursor
+            cursor_type = mdb.cursors.DictCursor
+
+        with self.transaction(cursor_type):
+            return fn(self, *args, **kwargs)
+
+    return _wrapper
+
+
+def retry(fn=None, max_attempts=Attempt.MAX, **info):
+    """Decorator that can be used together with instances of the ``db_base``
+    class, to replay a method again after a unexpected error.
+
+    The function being decorated needs to either be a method of ``db_base``
+    subclasses or accept an ``db_base`` instance as the first parameter.
+
+    All the extra keyword arguments will be passed to the ``_format_error``
+    method
+    """
+    if fn is None:  # Allows calling the decorator directly or with parameters
+        return partial(retry, max_attempts=max_attempts, **info)
+
+    @wraps(fn)
+    def _wrapper(*args, **kwargs):
+        self = args[0]
+        info.setdefault('table', get_arg('table', fn, args, kwargs))
+        attempt = Attempt(max_attempts=max_attempts, info=info)
+        while attempt.countdown >= 0:
+            try:
+                return inject_args(fn, attempt=attempt)(*args, **kwargs)
+            except (mdb.Error, AttributeError) as ex:
+                self.logger.debug("Attempt #%d", attempt.number)
+                try:
+                    # The format error will throw exceptions, however it can
+                    # tolerate a certain amount of retries if it judges that
+                    # the error can be solved with retrying
+                    self._format_error(ex, attempt.countdown, **attempt.info)
+                    # Anyway, unexpected/unknown errors can still be retried
+                except db_base_Exception as db_ex:
+                    if (attempt.countdown < 0 or db_ex.http_code !=
+                            httperrors.Internal_Server_Error):
+                        raise
+
+            attempt.count += 1
+
+    return _wrapper
+
 
 def _check_valid_uuid(uuid):
     id_schema = {"type" : "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"}
@@ -137,7 +213,8 @@ class db_base_Exception(httperrors.HttpMappedError):
 class db_base():
     tables_with_created_field=()
 
-    def __init__(self, host=None, user=None, passwd=None, database=None, log_name='db', log_level=None):
+    def __init__(self, host=None, user=None, passwd=None, database=None,
+                 log_name='db', log_level=None, lock=None):
         self.host = host
         self.user = user
         self.passwd = passwd
@@ -147,6 +224,7 @@ class db_base():
         self.logger = logging.getLogger(log_name)
         if self.log_level:
             self.logger.setLevel( getattr(logging, log_level) )
+        self.lock = lock or Lock()
 
     def connect(self, host=None, user=None, passwd=None, database=None):
         '''Connect to specific data base.
@@ -174,31 +252,23 @@ class db_base():
             value = value.encode("utf8")
         return self.con.escape_string(value)
 
+    @retry
+    @with_transaction
     def get_db_version(self):
         ''' Obtain the database schema version.
         Return: (negative, text) if error or version 0.0 where schema_version table is missing
                 (version_int, version_text) if ok
         '''
         cmd = "SELECT version_int,version FROM schema_version"
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor()
-                    self.logger.debug(cmd)
-                    self.cur.execute(cmd)
-                    rows = self.cur.fetchall()
-                    highest_version_int=0
-                    highest_version=""
-                    for row in rows: #look for the latest version
-                        if row[0]>highest_version_int:
-                            highest_version_int, highest_version = row[0:2]
-                    return highest_version_int, highest_version
-            except (mdb.Error, AttributeError) as e:
-                self.logger.error("Exception '{}' with command '{}'".format(e, cmd))
-                #self.logger.error("get_db_version DB Exception %d: %s. Command %s",e.args[0], e.args[1], cmd)
-                self._format_error(e, tries)
-            tries -= 1
+        self.logger.debug(cmd)
+        self.cur.execute(cmd)
+        rows = self.cur.fetchall()
+        highest_version_int=0
+        highest_version=""
+        for row in rows: #look for the latest version
+            if row[0]>highest_version_int:
+                highest_version_int, highest_version = row[0:2]
+        return highest_version_int, highest_version
 
     def disconnect(self):
         '''disconnect from specific data base'''
@@ -215,7 +285,73 @@ class db_base():
             else:
                 raise
 
-    def _format_error(self, e, tries=1, command=None, extra=None, table=None):
+    def reconnect(self):
+        """Try to gracefully to the database in case of error"""
+        try:
+            self.con.ping(True)  # auto-reconnect if the server is available
+        except:
+            # The server is probably not available...
+            # Let's wait a bit
+            time.sleep(RECOVERY_TIME)
+            self.con = None
+            self.connect()
+
+    def fork_connection(self):
+        """Return a new database object, with a separated connection to the
+        database (and lock), so it can act independently
+        """
+        obj =  self.__class__(
+            host=self.host,
+            user=self.user,
+            passwd=self.passwd,
+            database=self.database,
+            log_name=self.logger.name,
+            log_level=self.log_level,
+            lock=Lock()
+        )
+
+        obj.connect()
+
+        return obj
+
+    @contextmanager
+    def transaction(self, cursor_type=None):
+        """DB changes that are executed inside this context will be
+        automatically rolled back in case of error.
+
+        This implementation also adds a lock, so threads sharing the same
+        connection object are synchronized.
+
+        Arguments:
+            cursor_type: default: MySQLdb.cursors.DictCursor
+
+        Yields:
+            Cursor object
+
+        References:
+            https://www.oreilly.com/library/view/mysql-cookbook-2nd/059652708X/ch15s08.html
+            https://github.com/PyMySQL/mysqlclient-python/commit/c64915b1e5c705f4fb10e86db5dcfed0b58552cc
+        """
+        # Previously MySQLdb had built-in support for that using the context
+        # API for the connection object.
+        # This support was removed in version 1.40
+        # https://github.com/PyMySQL/mysqlclient-python/blob/master/HISTORY.rst#whats-new-in-140
+        with self.lock:
+            try:
+                if self.con.get_autocommit():
+                    self.con.query("BEGIN")
+
+                self.cur = self.con.cursor(cursor_type)
+                yield self.cur
+            except:  # noqa
+                self.con.rollback()
+                raise
+            else:
+                self.con.commit()
+
+
+    def _format_error(self, e, tries=1, command=None,
+                      extra=None, table=None, cmd=None, **_):
         '''Creates a text error base on the produced exception
             Params:
                 e: mdb exception
@@ -225,15 +361,22 @@ class db_base():
                 extra: extra information to add to some commands
             Return
                 HTTP error in negative, formatted error text
-        '''
+        '''  # the **_ ignores extra kwargs
+        table_info = ' (table `{}`)'.format(table) if table else ''
+        if cmd:
+            self.logger.debug("Exception '%s' with command '%s'%s",
+                              e, cmd, table_info)
+
         if isinstance(e,AttributeError ):
             self.logger.debug(str(e), exc_info=True)
             raise db_base_Exception("DB Exception " + str(e), httperrors.Internal_Server_Error)
         if e.args[0]==2006 or e.args[0]==2013 : #MySQL server has gone away (((or)))    Exception 2013: Lost connection to MySQL server during query
-            if tries>1:
+            # Let's aways reconnect if the connection is lost
+            # so future calls are not affected.
+            self.reconnect()
+
+            if tries > 1:
                 self.logger.warn("DB Exception '%s'. Retry", str(e))
-                #reconnect
-                self.connect()
                 return
             else:
                 raise db_base_Exception("Database connection timeout Try Again", httperrors.Request_Timeout)
@@ -250,7 +393,6 @@ class db_base():
         wc = e.args[1].find("in 'where clause'")
         fl = e.args[1].find("in 'field list'")
         #print de, fk, uk, wc,fl
-        table_info = ' (table `{}`)'.format(table) if table else ''
         if de>=0:
             if fk>=0: #error 1062
                 raise db_base_Exception(
@@ -420,7 +562,7 @@ class db_base():
             INSERT: dictionary with the key:value to insert
             table: table where to insert
             add_uuid: if True, it will create an uuid key entry at INSERT if not provided
-            created_time: time to add to the created_time column
+            created_time: time to add to the created_at column
         It checks presence of uuid and add one automatically otherwise
         Return: uuid
         '''
@@ -449,7 +591,7 @@ class db_base():
         cmd= "INSERT INTO " + table +" SET " + \
             ",".join(map(self.__tuple2db_format_set, INSERT.iteritems() ))
         if created_time:
-            cmd += ",created_at=%f" % created_time
+            cmd += ",created_at={time:.9f},modified_at={time:.9f}".format(time=created_time)
         if confidential_data:
             index = cmd.find("SET")
             subcmd = cmd[:index] + 'SET...'
@@ -467,6 +609,8 @@ class db_base():
         rows = self.cur.fetchall()
         return rows
 
+    @retry
+    @with_transaction
     def new_row(self, table, INSERT, add_uuid=False, created_time=0, confidential_data=False):
         ''' Add one row into a table.
         Attribute
@@ -479,18 +623,11 @@ class db_base():
         '''
         if table in self.tables_with_created_field and created_time==0:
             created_time=time.time()
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor()
-                    return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data)
-
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries, table=table)
-            tries -= 1
+        return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data)
 
-    def update_rows(self, table, UPDATE, WHERE, modified_time=0):
+    @retry
+    @with_transaction
+    def update_rows(self, table, UPDATE, WHERE, modified_time=None, attempt=_ATTEMPT):
         """ Update one or several rows of a table.
         :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
         :param table: database table to update
@@ -501,21 +638,14 @@ class db_base():
                 keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
                 The special keys "OR", "AND" with a dict value is used to create a nested WHERE
             If a list, each item will be a dictionary that will be concatenated with OR
-        :param modified_time: Can contain the time to be set to the table row
+        :param modified_time: Can contain the time to be set to the table row.
+            None to set automatically, 0 to do not modify it
         :return: the number of updated rows, raises exception upon error
         """
-        if table in self.tables_with_created_field and modified_time==0:
-            modified_time=time.time()
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor()
-                    return self._update_rows(
-                        table, UPDATE, WHERE, modified_time)
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries, table=table)
-            tries -= 1
+        if table in self.tables_with_created_field and modified_time is None:
+            modified_time = time.time()
+
+        return self._update_rows(table, UPDATE, WHERE, modified_time)
 
     def _delete_row_by_id_internal(self, table, uuid):
         cmd = "DELETE FROM {} WHERE uuid = '{}'".format(table, uuid)
@@ -529,19 +659,13 @@ class db_base():
         self.cur.execute(cmd)
         return deleted
 
+    @retry(command='delete', extra='dependencies')
+    @with_transaction
     def delete_row_by_id(self, table, uuid):
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor()
-                    return self._delete_row_by_id_internal(table, uuid)
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(
-                    e, tries, "delete", "dependencies", table=table)
-            tries -= 1
-
-    def delete_row(self, **sql_dict):
+        return self._delete_row_by_id_internal(table, uuid)
+
+    @retry
+    def delete_row(self, attempt=_ATTEMPT, **sql_dict):
         """ Deletes rows from a table.
         :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
         :param FROM: string with table name (Mandatory)
@@ -560,36 +684,28 @@ class db_base():
             cmd += " WHERE " + self.__create_where(sql_dict['WHERE'])
         if sql_dict.get('LIMIT'):
             cmd += " LIMIT " + str(sql_dict['LIMIT'])
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor()
-                    self.logger.debug(cmd)
-                    self.cur.execute(cmd)
-                    deleted = self.cur.rowcount
-                return deleted
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries)
-            tries -= 1
-
-    def get_rows_by_id(self, table, uuid):
+
+        attempt.info['cmd'] = cmd
+
+        with self.transaction():
+            self.logger.debug(cmd)
+            self.cur.execute(cmd)
+            deleted = self.cur.rowcount
+        return deleted
+
+    @retry
+    @with_transaction(cursor='dict')
+    def get_rows_by_id(self, table, uuid, attempt=_ATTEMPT):
         '''get row from a table based on uuid'''
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor(mdb.cursors.DictCursor)
-                    cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid))
-                    self.logger.debug(cmd)
-                    self.cur.execute(cmd)
-                    rows = self.cur.fetchall()
-                    return rows
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries, table=table)
-            tries -= 1
-
-    def get_rows(self, **sql_dict):
+        cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid))
+        attempt.info['cmd'] = cmd
+        self.logger.debug(cmd)
+        self.cur.execute(cmd)
+        rows = self.cur.fetchall()
+        return rows
+
+    @retry
+    def get_rows(self, attempt=_ATTEMPT, **sql_dict):
         """ Obtain rows from a table.
         :param SELECT: list or tuple of fields to retrieve) (by default all)
         :param FROM: string with table name (Mandatory)
@@ -628,26 +744,21 @@ class db_base():
         if 'LIMIT' in sql_dict:
             cmd += " LIMIT " + str(sql_dict['LIMIT'])
 
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor(mdb.cursors.DictCursor)
-                    self.logger.debug(cmd)
-                    self.cur.execute(cmd)
-                    rows = self.cur.fetchall()
-                    return rows
-            except (mdb.Error, AttributeError) as e:
-                self.logger.error("Exception '{}' with command '{}'".format(e, cmd))
-                self._format_error(e, tries)
-            tries -= 1
-
-    def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_serveral=False, WHERE_OR={}, WHERE_AND_OR="OR"):
+        attempt.info['cmd'] = cmd
+
+        with self.transaction(mdb.cursors.DictCursor):
+            self.logger.debug(cmd)
+            self.cur.execute(cmd)
+            rows = self.cur.fetchall()
+            return rows
+
+    @retry
+    def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_several=False, WHERE_OR={}, WHERE_AND_OR="OR", attempt=_ATTEMPT):
         ''' Obtain One row from a table based on name or uuid.
         Attribute:
             table: string of table name
             uuid_name: name or uuid. If not uuid format is found, it is considered a name
-            allow_severeral: if False return ERROR if more than one row are founded
+            allow_several: if False return ERROR if more than one row are found
             error_item_text: in case of error it identifies the 'item' name for a proper output text
             'WHERE_OR': dict of key:values, translated to key=value OR ... (Optional)
             'WHERE_AND_OR: str 'AND' or 'OR'(by default) mark the priority to 'WHERE AND (WHERE_OR)' or (WHERE) OR WHERE_OR' (Optional
@@ -666,58 +777,40 @@ class db_base():
             else:
                 cmd += " OR " + where_or
 
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor(mdb.cursors.DictCursor)
-                    self.logger.debug(cmd)
-                    self.cur.execute(cmd)
-                    number = self.cur.rowcount
-                    if number == 0:
-                        raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found)
-                    elif number > 1 and not allow_serveral:
-                        raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict)
-                    if allow_serveral:
-                        rows = self.cur.fetchall()
-                    else:
-                        rows = self.cur.fetchone()
-                    return rows
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries, table=table)
-            tries -= 1
+        attempt.info['cmd'] = cmd
+
+        with self.transaction(mdb.cursors.DictCursor):
+            self.logger.debug(cmd)
+            self.cur.execute(cmd)
+            number = self.cur.rowcount
+            if number == 0:
+                raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found)
+            elif number > 1 and not allow_several:
+                raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict)
+            if allow_several:
+                rows = self.cur.fetchall()
+            else:
+                rows = self.cur.fetchone()
+            return rows
 
+    @retry(table='uuids')
+    @with_transaction(cursor='dict')
     def get_uuid(self, uuid):
         '''check in the database if this uuid is already present'''
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor(mdb.cursors.DictCursor)
-                    self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'")
-                    rows = self.cur.fetchall()
-                    return self.cur.rowcount, rows
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries)
-            tries -= 1
+        self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'")
+        rows = self.cur.fetchall()
+        return self.cur.rowcount, rows
 
+    @retry
+    @with_transaction(cursor='dict')
     def get_uuid_from_name(self, table, name):
         '''Searchs in table the name and returns the uuid
         '''
-        tries = 2
-        while tries:
-            try:
-                with self.con:
-                    self.cur = self.con.cursor(mdb.cursors.DictCursor)
-                    where_text = "name='" + name +"'"
-                    self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text)
-                    rows = self.cur.fetchall()
-                    if self.cur.rowcount==0:
-                        return 0, "Name %s not found in table %s" %(name, table)
-                    elif self.cur.rowcount>1:
-                        return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table)
-                    return self.cur.rowcount, rows[0]["uuid"]
-            except (mdb.Error, AttributeError) as e:
-                self._format_error(e, tries, table=table)
-            tries -= 1
-
+        where_text = "name='" + name +"'"
+        self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text)
+        rows = self.cur.fetchall()
+        if self.cur.rowcount==0:
+            return 0, "Name %s not found in table %s" %(name, table)
+        elif self.cur.rowcount>1:
+            return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table)
+        return self.cur.rowcount, rows[0]["uuid"]