X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fdb_base.py;h=e6e1134d854f21383114c35aa19a93f9eda85931;hb=1fa49b16e92ff2e4f512ccc466fdc3dff31559e4;hp=e7a4fb327bb43753c890c6dacba70ba13fb8a56d;hpb=a3ebc3637f8b1a2b7e4f6dc8ce5a33fefcc2b9eb;p=osm%2FRO.git diff --git a/osm_ro/db_base.py b/osm_ro/db_base.py index e7a4fb32..e6e1134d 100644 --- a/osm_ro/db_base.py +++ b/osm_ro/db_base.py @@ -35,9 +35,85 @@ import json import time import logging import datetime +from contextlib import contextmanager +from functools import wraps, partial +from threading import Lock from jsonschema import validate as js_v, exceptions as js_e from .http_tools import errors as httperrors +from .utils import Attempt, get_arg, inject_args + + +RECOVERY_TIME = 3 + +_ATTEMPT = Attempt() + + +def with_transaction(fn=None, cursor=None): + """Decorator that can be used together with instances of the ``db_base`` + class, to perform database actions wrapped in a commit/rollback fashion + + This decorator basically executes the function inside the context object + given by the ``transaction`` method in ``db_base`` + + Arguments: + cursor: [Optional] cursor class + """ + if fn is None: # Allows calling the decorator directly or with parameters + return partial(with_transaction, cursor=cursor) + + @wraps(fn) + def _wrapper(self, *args, **kwargs): + cursor_type = None + if cursor == 'dict': + # MySQLdB define the "cursors" module attribute lazily, + # so we have to defer references to mdb.cursors.DictCursor + cursor_type = mdb.cursors.DictCursor + + with self.transaction(cursor_type): + return fn(self, *args, **kwargs) + + return _wrapper + + +def retry(fn=None, max_attempts=Attempt.MAX, **info): + """Decorator that can be used together with instances of the ``db_base`` + class, to replay a method again after a unexpected error. + + The function being decorated needs to either be a method of ``db_base`` + subclasses or accept an ``db_base`` instance as the first parameter. + + All the extra keyword arguments will be passed to the ``_format_error`` + method + """ + if fn is None: # Allows calling the decorator directly or with parameters + return partial(retry, max_attempts=max_attempts, **info) + + @wraps(fn) + def _wrapper(*args, **kwargs): + self = args[0] + info.setdefault('table', get_arg('table', fn, args, kwargs)) + attempt = Attempt(max_attempts=max_attempts, info=info) + while attempt.countdown >= 0: + try: + return inject_args(fn, attempt=attempt)(*args, **kwargs) + except (mdb.Error, AttributeError) as ex: + self.logger.debug("Attempt #%d", attempt.number) + try: + # The format error will throw exceptions, however it can + # tolerate a certain amount of retries if it judges that + # the error can be solved with retrying + self._format_error(ex, attempt.countdown, **attempt.info) + # Anyway, unexpected/unknown errors can still be retried + except db_base_Exception as db_ex: + if (attempt.countdown < 0 or db_ex.http_code != + httperrors.Internal_Server_Error): + raise + + attempt.count += 1 + + return _wrapper + def _check_valid_uuid(uuid): id_schema = {"type" : "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"} @@ -137,7 +213,8 @@ class db_base_Exception(httperrors.HttpMappedError): class db_base(): tables_with_created_field=() - def __init__(self, host=None, user=None, passwd=None, database=None, log_name='db', log_level=None): + def __init__(self, host=None, user=None, passwd=None, database=None, + log_name='db', log_level=None, lock=None): self.host = host self.user = user self.passwd = passwd @@ -147,6 +224,7 @@ class db_base(): self.logger = logging.getLogger(log_name) if self.log_level: self.logger.setLevel( getattr(logging, log_level) ) + self.lock = lock or Lock() def connect(self, host=None, user=None, passwd=None, database=None): '''Connect to specific data base. @@ -170,35 +248,27 @@ class db_base(): return self.con.escape(value) def escape_string(self, value): - if isinstance(value, "unicode"): + if isinstance(value, unicode): value = value.encode("utf8") return self.con.escape_string(value) + @retry + @with_transaction def get_db_version(self): ''' Obtain the database schema version. Return: (negative, text) if error or version 0.0 where schema_version table is missing (version_int, version_text) if ok ''' cmd = "SELECT version_int,version FROM schema_version" - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - highest_version_int=0 - highest_version="" - for row in rows: #look for the latest version - if row[0]>highest_version_int: - highest_version_int, highest_version = row[0:2] - return highest_version_int, highest_version - except (mdb.Error, AttributeError) as e: - self.logger.error("Exception '{}' with command '{}'".format(e, cmd)) - #self.logger.error("get_db_version DB Exception %d: %s. Command %s",e.args[0], e.args[1], cmd) - self._format_error(e, tries) - tries -= 1 + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + highest_version_int=0 + highest_version="" + for row in rows: #look for the latest version + if row[0]>highest_version_int: + highest_version_int, highest_version = row[0:2] + return highest_version_int, highest_version def disconnect(self): '''disconnect from specific data base''' @@ -215,7 +285,73 @@ class db_base(): else: raise - def _format_error(self, e, tries=1, command=None, extra=None, table=None): + def reconnect(self): + """Try to gracefully to the database in case of error""" + try: + self.con.ping(True) # auto-reconnect if the server is available + except: + # The server is probably not available... + # Let's wait a bit + time.sleep(RECOVERY_TIME) + self.con = None + self.connect() + + def fork_connection(self): + """Return a new database object, with a separated connection to the + database (and lock), so it can act independently + """ + obj = self.__class__( + host=self.host, + user=self.user, + passwd=self.passwd, + database=self.database, + log_name=self.logger.name, + log_level=self.log_level, + lock=Lock() + ) + + obj.connect() + + return obj + + @contextmanager + def transaction(self, cursor_type=None): + """DB changes that are executed inside this context will be + automatically rolled back in case of error. + + This implementation also adds a lock, so threads sharing the same + connection object are synchronized. + + Arguments: + cursor_type: default: MySQLdb.cursors.DictCursor + + Yields: + Cursor object + + References: + https://www.oreilly.com/library/view/mysql-cookbook-2nd/059652708X/ch15s08.html + https://github.com/PyMySQL/mysqlclient-python/commit/c64915b1e5c705f4fb10e86db5dcfed0b58552cc + """ + # Previously MySQLdb had built-in support for that using the context + # API for the connection object. + # This support was removed in version 1.40 + # https://github.com/PyMySQL/mysqlclient-python/blob/master/HISTORY.rst#whats-new-in-140 + with self.lock: + try: + if self.con.get_autocommit(): + self.con.query("BEGIN") + + self.cur = self.con.cursor(cursor_type) + yield self.cur + except: # noqa + self.con.rollback() + raise + else: + self.con.commit() + + + def _format_error(self, e, tries=1, command=None, + extra=None, table=None, cmd=None, **_): '''Creates a text error base on the produced exception Params: e: mdb exception @@ -225,15 +361,22 @@ class db_base(): extra: extra information to add to some commands Return HTTP error in negative, formatted error text - ''' + ''' # the **_ ignores extra kwargs + table_info = ' (table `{}`)'.format(table) if table else '' + if cmd: + self.logger.debug("Exception '%s' with command '%s'%s", + e, cmd, table_info) + if isinstance(e,AttributeError ): self.logger.debug(str(e), exc_info=True) raise db_base_Exception("DB Exception " + str(e), httperrors.Internal_Server_Error) if e.args[0]==2006 or e.args[0]==2013 : #MySQL server has gone away (((or))) Exception 2013: Lost connection to MySQL server during query - if tries>1: + # Let's aways reconnect if the connection is lost + # so future calls are not affected. + self.reconnect() + + if tries > 1: self.logger.warn("DB Exception '%s'. Retry", str(e)) - #reconnect - self.connect() return else: raise db_base_Exception("Database connection timeout Try Again", httperrors.Request_Timeout) @@ -250,7 +393,6 @@ class db_base(): wc = e.args[1].find("in 'where clause'") fl = e.args[1].find("in 'field list'") #print de, fk, uk, wc,fl - table_info = ' (table `{}`)'.format(table) if table else '' if de>=0: if fk>=0: #error 1062 raise db_base_Exception( @@ -420,7 +562,7 @@ class db_base(): INSERT: dictionary with the key:value to insert table: table where to insert add_uuid: if True, it will create an uuid key entry at INSERT if not provided - created_time: time to add to the created_time column + created_time: time to add to the created_at column It checks presence of uuid and add one automatically otherwise Return: uuid ''' @@ -449,7 +591,7 @@ class db_base(): cmd= "INSERT INTO " + table +" SET " + \ ",".join(map(self.__tuple2db_format_set, INSERT.iteritems() )) if created_time: - cmd += ",created_at=%f" % created_time + cmd += ",created_at={time:.9f},modified_at={time:.9f}".format(time=created_time) if confidential_data: index = cmd.find("SET") subcmd = cmd[:index] + 'SET...' @@ -467,6 +609,8 @@ class db_base(): rows = self.cur.fetchall() return rows + @retry + @with_transaction def new_row(self, table, INSERT, add_uuid=False, created_time=0, confidential_data=False): ''' Add one row into a table. Attribute @@ -479,18 +623,11 @@ class db_base(): ''' if table in self.tables_with_created_field and created_time==0: created_time=time.time() - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data) - - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 + return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data) - def update_rows(self, table, UPDATE, WHERE, modified_time=0): + @retry + @with_transaction + def update_rows(self, table, UPDATE, WHERE, modified_time=None, attempt=_ATTEMPT): """ Update one or several rows of a table. :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values :param table: database table to update @@ -501,21 +638,14 @@ class db_base(): keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "=" The special keys "OR", "AND" with a dict value is used to create a nested WHERE If a list, each item will be a dictionary that will be concatenated with OR - :param modified_time: Can contain the time to be set to the table row + :param modified_time: Can contain the time to be set to the table row. + None to set automatically, 0 to do not modify it :return: the number of updated rows, raises exception upon error """ - if table in self.tables_with_created_field and modified_time==0: - modified_time=time.time() - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - return self._update_rows( - table, UPDATE, WHERE, modified_time) - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 + if table in self.tables_with_created_field and modified_time is None: + modified_time = time.time() + + return self._update_rows(table, UPDATE, WHERE, modified_time) def _delete_row_by_id_internal(self, table, uuid): cmd = "DELETE FROM {} WHERE uuid = '{}'".format(table, uuid) @@ -529,19 +659,13 @@ class db_base(): self.cur.execute(cmd) return deleted + @retry(command='delete', extra='dependencies') + @with_transaction def delete_row_by_id(self, table, uuid): - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - return self._delete_row_by_id_internal(table, uuid) - except (mdb.Error, AttributeError) as e: - self._format_error( - e, tries, "delete", "dependencies", table=table) - tries -= 1 - - def delete_row(self, **sql_dict): + return self._delete_row_by_id_internal(table, uuid) + + @retry + def delete_row(self, attempt=_ATTEMPT, **sql_dict): """ Deletes rows from a table. :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values :param FROM: string with table name (Mandatory) @@ -560,36 +684,28 @@ class db_base(): cmd += " WHERE " + self.__create_where(sql_dict['WHERE']) if sql_dict.get('LIMIT'): cmd += " LIMIT " + str(sql_dict['LIMIT']) - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - self.logger.debug(cmd) - self.cur.execute(cmd) - deleted = self.cur.rowcount - return deleted - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - - def get_rows_by_id(self, table, uuid): + + attempt.info['cmd'] = cmd + + with self.transaction(): + self.logger.debug(cmd) + self.cur.execute(cmd) + deleted = self.cur.rowcount + return deleted + + @retry + @with_transaction(cursor='dict') + def get_rows_by_id(self, table, uuid, attempt=_ATTEMPT): '''get row from a table based on uuid''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid)) - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - return rows - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 - - def get_rows(self, **sql_dict): + cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid)) + attempt.info['cmd'] = cmd + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + return rows + + @retry + def get_rows(self, attempt=_ATTEMPT, **sql_dict): """ Obtain rows from a table. :param SELECT: list or tuple of fields to retrieve) (by default all) :param FROM: string with table name (Mandatory) @@ -628,26 +744,21 @@ class db_base(): if 'LIMIT' in sql_dict: cmd += " LIMIT " + str(sql_dict['LIMIT']) - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - return rows - except (mdb.Error, AttributeError) as e: - self.logger.error("Exception '{}' with command '{}'".format(e, cmd)) - self._format_error(e, tries) - tries -= 1 - - def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_serveral=False, WHERE_OR={}, WHERE_AND_OR="OR"): + attempt.info['cmd'] = cmd + + with self.transaction(mdb.cursors.DictCursor): + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + return rows + + @retry + def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_several=False, WHERE_OR={}, WHERE_AND_OR="OR", attempt=_ATTEMPT): ''' Obtain One row from a table based on name or uuid. Attribute: table: string of table name uuid_name: name or uuid. If not uuid format is found, it is considered a name - allow_severeral: if False return ERROR if more than one row are founded + allow_several: if False return ERROR if more than one row are found error_item_text: in case of error it identifies the 'item' name for a proper output text 'WHERE_OR': dict of key:values, translated to key=value OR ... (Optional) 'WHERE_AND_OR: str 'AND' or 'OR'(by default) mark the priority to 'WHERE AND (WHERE_OR)' or (WHERE) OR WHERE_OR' (Optional @@ -666,58 +777,40 @@ class db_base(): else: cmd += " OR " + where_or - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - self.logger.debug(cmd) - self.cur.execute(cmd) - number = self.cur.rowcount - if number == 0: - raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found) - elif number > 1 and not allow_serveral: - raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict) - if allow_serveral: - rows = self.cur.fetchall() - else: - rows = self.cur.fetchone() - return rows - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 + attempt.info['cmd'] = cmd + + with self.transaction(mdb.cursors.DictCursor): + self.logger.debug(cmd) + self.cur.execute(cmd) + number = self.cur.rowcount + if number == 0: + raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found) + elif number > 1 and not allow_several: + raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict) + if allow_several: + rows = self.cur.fetchall() + else: + rows = self.cur.fetchone() + return rows + @retry(table='uuids') + @with_transaction(cursor='dict') def get_uuid(self, uuid): '''check in the database if this uuid is already present''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'") - rows = self.cur.fetchall() - return self.cur.rowcount, rows - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 + self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'") + rows = self.cur.fetchall() + return self.cur.rowcount, rows + @retry + @with_transaction(cursor='dict') def get_uuid_from_name(self, table, name): '''Searchs in table the name and returns the uuid ''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - where_text = "name='" + name +"'" - self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text) - rows = self.cur.fetchall() - if self.cur.rowcount==0: - return 0, "Name %s not found in table %s" %(name, table) - elif self.cur.rowcount>1: - return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table) - return self.cur.rowcount, rows[0]["uuid"] - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 - + where_text = "name='" + name +"'" + self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text) + rows = self.cur.fetchall() + if self.cur.rowcount==0: + return 0, "Name %s not found in table %s" %(name, table) + elif self.cur.rowcount>1: + return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table) + return self.cur.rowcount, rows[0]["uuid"]