import time
import logging
import datetime
+from contextlib import contextmanager
+from functools import wraps, partial
+from threading import Lock
from jsonschema import validate as js_v, exceptions as js_e
from .http_tools import errors as httperrors
+from .utils import Attempt, get_arg, inject_args
+
+
+RECOVERY_TIME = 3
+
+_ATTEMPT = Attempt()
+
+
+def with_transaction(fn=None, cursor=None):
+ """Decorator that can be used together with instances of the ``db_base``
+ class, to perform database actions wrapped in a commit/rollback fashion
+
+ This decorator basically executes the function inside the context object
+ given by the ``transaction`` method in ``db_base``
+
+ Arguments:
+ cursor: [Optional] cursor class
+ """
+ if fn is None: # Allows calling the decorator directly or with parameters
+ return partial(with_transaction, cursor=cursor)
+
+ @wraps(fn)
+ def _wrapper(self, *args, **kwargs):
+ cursor_type = None
+ if cursor == 'dict':
+ # MySQLdB define the "cursors" module attribute lazily,
+ # so we have to defer references to mdb.cursors.DictCursor
+ cursor_type = mdb.cursors.DictCursor
+
+ with self.transaction(cursor_type):
+ return fn(self, *args, **kwargs)
+
+ return _wrapper
+
+
+def retry(fn=None, max_attempts=Attempt.MAX, **info):
+ """Decorator that can be used together with instances of the ``db_base``
+ class, to replay a method again after a unexpected error.
+
+ The function being decorated needs to either be a method of ``db_base``
+ subclasses or accept an ``db_base`` instance as the first parameter.
+
+ All the extra keyword arguments will be passed to the ``_format_error``
+ method
+ """
+ if fn is None: # Allows calling the decorator directly or with parameters
+ return partial(retry, max_attempts=max_attempts, **info)
+
+ @wraps(fn)
+ def _wrapper(*args, **kwargs):
+ self = args[0]
+ info.setdefault('table', get_arg('table', fn, args, kwargs))
+ attempt = Attempt(max_attempts=max_attempts, info=info)
+ while attempt.countdown >= 0:
+ try:
+ return inject_args(fn, attempt=attempt)(*args, **kwargs)
+ except (mdb.Error, AttributeError) as ex:
+ self.logger.debug("Attempt #%d", attempt.number)
+ try:
+ # The format error will throw exceptions, however it can
+ # tolerate a certain amount of retries if it judges that
+ # the error can be solved with retrying
+ self._format_error(ex, attempt.countdown, **attempt.info)
+ # Anyway, unexpected/unknown errors can still be retried
+ except db_base_Exception as db_ex:
+ if (attempt.countdown < 0 or db_ex.http_code !=
+ httperrors.Internal_Server_Error):
+ raise
+
+ attempt.count += 1
+
+ return _wrapper
+
def _check_valid_uuid(uuid):
id_schema = {"type" : "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"}
class db_base():
tables_with_created_field=()
- def __init__(self, host=None, user=None, passwd=None, database=None, log_name='db', log_level=None):
+ def __init__(self, host=None, user=None, passwd=None, database=None,
+ log_name='db', log_level=None, lock=None):
self.host = host
self.user = user
self.passwd = passwd
self.logger = logging.getLogger(log_name)
if self.log_level:
self.logger.setLevel( getattr(logging, log_level) )
+ self.lock = lock or Lock()
def connect(self, host=None, user=None, passwd=None, database=None):
'''Connect to specific data base.
def escape(self, value):
return self.con.escape(value)
-
def escape_string(self, value):
+ if isinstance(value, unicode):
+ value = value.encode("utf8")
return self.con.escape_string(value)
-
+ @retry
+ @with_transaction
def get_db_version(self):
''' Obtain the database schema version.
Return: (negative, text) if error or version 0.0 where schema_version table is missing
(version_int, version_text) if ok
'''
cmd = "SELECT version_int,version FROM schema_version"
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor()
- self.logger.debug(cmd)
- self.cur.execute(cmd)
- rows = self.cur.fetchall()
- highest_version_int=0
- highest_version=""
- for row in rows: #look for the latest version
- if row[0]>highest_version_int:
- highest_version_int, highest_version = row[0:2]
- return highest_version_int, highest_version
- except (mdb.Error, AttributeError) as e:
- self.logger.error("Exception '{}' with command '{}'".format(e, cmd))
- #self.logger.error("get_db_version DB Exception %d: %s. Command %s",e.args[0], e.args[1], cmd)
- self._format_error(e, tries)
- tries -= 1
+ self.logger.debug(cmd)
+ self.cur.execute(cmd)
+ rows = self.cur.fetchall()
+ highest_version_int=0
+ highest_version=""
+ for row in rows: #look for the latest version
+ if row[0]>highest_version_int:
+ highest_version_int, highest_version = row[0:2]
+ return highest_version_int, highest_version
def disconnect(self):
'''disconnect from specific data base'''
else:
raise
- def _format_error(self, e, tries=1, command=None, extra=None, table=None):
+ def reconnect(self):
+ """Try to gracefully to the database in case of error"""
+ try:
+ self.con.ping(True) # auto-reconnect if the server is available
+ except:
+ # The server is probably not available...
+ # Let's wait a bit
+ time.sleep(RECOVERY_TIME)
+ self.con = None
+ self.connect()
+
+ def fork_connection(self):
+ """Return a new database object, with a separated connection to the
+ database (and lock), so it can act independently
+ """
+ obj = self.__class__(
+ host=self.host,
+ user=self.user,
+ passwd=self.passwd,
+ database=self.database,
+ log_name=self.logger.name,
+ log_level=self.log_level,
+ lock=Lock()
+ )
+
+ obj.connect()
+
+ return obj
+
+ @contextmanager
+ def transaction(self, cursor_type=None):
+ """DB changes that are executed inside this context will be
+ automatically rolled back in case of error.
+
+ This implementation also adds a lock, so threads sharing the same
+ connection object are synchronized.
+
+ Arguments:
+ cursor_type: default: MySQLdb.cursors.DictCursor
+
+ Yields:
+ Cursor object
+
+ References:
+ https://www.oreilly.com/library/view/mysql-cookbook-2nd/059652708X/ch15s08.html
+ https://github.com/PyMySQL/mysqlclient-python/commit/c64915b1e5c705f4fb10e86db5dcfed0b58552cc
+ """
+ # Previously MySQLdb had built-in support for that using the context
+ # API for the connection object.
+ # This support was removed in version 1.40
+ # https://github.com/PyMySQL/mysqlclient-python/blob/master/HISTORY.rst#whats-new-in-140
+ with self.lock:
+ try:
+ if self.con.get_autocommit():
+ self.con.query("BEGIN")
+
+ self.cur = self.con.cursor(cursor_type)
+ yield self.cur
+ except: # noqa
+ self.con.rollback()
+ raise
+ else:
+ self.con.commit()
+
+
+ def _format_error(self, e, tries=1, command=None,
+ extra=None, table=None, cmd=None, **_):
'''Creates a text error base on the produced exception
Params:
e: mdb exception
extra: extra information to add to some commands
Return
HTTP error in negative, formatted error text
- '''
+ ''' # the **_ ignores extra kwargs
+ table_info = ' (table `{}`)'.format(table) if table else ''
+ if cmd:
+ self.logger.debug("Exception '%s' with command '%s'%s",
+ e, cmd, table_info)
+
if isinstance(e,AttributeError ):
self.logger.debug(str(e), exc_info=True)
raise db_base_Exception("DB Exception " + str(e), httperrors.Internal_Server_Error)
if e.args[0]==2006 or e.args[0]==2013 : #MySQL server has gone away (((or))) Exception 2013: Lost connection to MySQL server during query
- if tries>1:
+ # Let's aways reconnect if the connection is lost
+ # so future calls are not affected.
+ self.reconnect()
+
+ if tries > 1:
self.logger.warn("DB Exception '%s'. Retry", str(e))
- #reconnect
- self.connect()
return
else:
raise db_base_Exception("Database connection timeout Try Again", httperrors.Request_Timeout)
wc = e.args[1].find("in 'where clause'")
fl = e.args[1].find("in 'field list'")
#print de, fk, uk, wc,fl
- table_info = ' (table `{}`)'.format(table) if table else ''
if de>=0:
if fk>=0: #error 1062
raise db_base_Exception(
httperrors.Internal_Server_Error)
def __str2db_format(self, data):
- '''Convert string data to database format.
+ """Convert string data to database format.
If data is None it returns the 'Null' text,
otherwise it returns the text surrounded by quotes ensuring internal quotes are escaped.
- '''
- if data==None:
+ """
+ if data is None:
return 'Null'
- elif isinstance(data[1], str):
+ elif isinstance(data[1], (str, unicode)):
return json.dumps(data)
else:
return json.dumps(str(data))
B can be also a dict with special keys:
{"INCREMENT": NUMBER}, then it produce "A=A+NUMBER"
"""
- if data[1] == None:
+ if data[1] is None:
return str(data[0]) + "=Null"
- elif isinstance(data[1], str):
+ elif isinstance(data[1], (str, unicode)):
return str(data[0]) + '=' + json.dumps(data[1])
elif isinstance(data[1], dict):
if "INCREMENT" in data[1]:
for v2 in v:
if v2 is None:
cmd2.append(k.replace("=", " is").replace("<>", " is not") + " Null")
+ elif isinstance(v2, (str, unicode)):
+ cmd2.append(k + json.dumps(v2))
else:
cmd2.append(k + json.dumps(str(v2)))
cmd.append("(" + " OR ".join(cmd2) + ")")
+ elif isinstance(v, (str, unicode)):
+ cmd.append(k + json.dumps(v))
else:
cmd.append(k + json.dumps(str(v)))
elif isinstance(data, (tuple, list)):
INSERT: dictionary with the key:value to insert
table: table where to insert
add_uuid: if True, it will create an uuid key entry at INSERT if not provided
- created_time: time to add to the created_time column
+ created_time: time to add to the created_at column
It checks presence of uuid and add one automatically otherwise
Return: uuid
'''
cmd= "INSERT INTO " + table +" SET " + \
",".join(map(self.__tuple2db_format_set, INSERT.iteritems() ))
if created_time:
- cmd += ",created_at=%f" % created_time
+ cmd += ",created_at={time:.9f},modified_at={time:.9f}".format(time=created_time)
if confidential_data:
index = cmd.find("SET")
subcmd = cmd[:index] + 'SET...'
rows = self.cur.fetchall()
return rows
+ @retry
+ @with_transaction
def new_row(self, table, INSERT, add_uuid=False, created_time=0, confidential_data=False):
''' Add one row into a table.
Attribute
'''
if table in self.tables_with_created_field and created_time==0:
created_time=time.time()
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor()
- return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data)
+ return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data)
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries, table=table)
- tries -= 1
-
- def update_rows(self, table, UPDATE, WHERE, modified_time=0):
+ @retry
+ @with_transaction
+ def update_rows(self, table, UPDATE, WHERE, modified_time=None, attempt=_ATTEMPT):
""" Update one or several rows of a table.
:param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
:param table: database table to update
keys can be suffixed by >,<,<>,>=,<= so that this is used to compare key and value instead of "="
The special keys "OR", "AND" with a dict value is used to create a nested WHERE
If a list, each item will be a dictionary that will be concatenated with OR
- :param modified_time: Can contain the time to be set to the table row
+ :param modified_time: Can contain the time to be set to the table row.
+ None to set automatically, 0 to do not modify it
:return: the number of updated rows, raises exception upon error
"""
- if table in self.tables_with_created_field and modified_time==0:
- modified_time=time.time()
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor()
- return self._update_rows(
- table, UPDATE, WHERE, modified_time)
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries, table=table)
- tries -= 1
+ if table in self.tables_with_created_field and modified_time is None:
+ modified_time = time.time()
+
+ return self._update_rows(table, UPDATE, WHERE, modified_time)
def _delete_row_by_id_internal(self, table, uuid):
cmd = "DELETE FROM {} WHERE uuid = '{}'".format(table, uuid)
self.cur.execute(cmd)
return deleted
+ @retry(command='delete', extra='dependencies')
+ @with_transaction
def delete_row_by_id(self, table, uuid):
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor()
- return self._delete_row_by_id_internal(table, uuid)
- except (mdb.Error, AttributeError) as e:
- self._format_error(
- e, tries, "delete", "dependencies", table=table)
- tries -= 1
-
- def delete_row(self, **sql_dict):
+ return self._delete_row_by_id_internal(table, uuid)
+
+ @retry
+ def delete_row(self, attempt=_ATTEMPT, **sql_dict):
""" Deletes rows from a table.
:param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values
:param FROM: string with table name (Mandatory)
cmd += " WHERE " + self.__create_where(sql_dict['WHERE'])
if sql_dict.get('LIMIT'):
cmd += " LIMIT " + str(sql_dict['LIMIT'])
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor()
- self.logger.debug(cmd)
- self.cur.execute(cmd)
- deleted = self.cur.rowcount
- return deleted
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries)
- tries -= 1
-
- def get_rows_by_id(self, table, uuid):
+
+ attempt.info['cmd'] = cmd
+
+ with self.transaction():
+ self.logger.debug(cmd)
+ self.cur.execute(cmd)
+ deleted = self.cur.rowcount
+ return deleted
+
+ @retry
+ @with_transaction(cursor='dict')
+ def get_rows_by_id(self, table, uuid, attempt=_ATTEMPT):
'''get row from a table based on uuid'''
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor(mdb.cursors.DictCursor)
- cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid))
- self.logger.debug(cmd)
- self.cur.execute(cmd)
- rows = self.cur.fetchall()
- return rows
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries, table=table)
- tries -= 1
-
- def get_rows(self, **sql_dict):
+ cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid))
+ attempt.info['cmd'] = cmd
+ self.logger.debug(cmd)
+ self.cur.execute(cmd)
+ rows = self.cur.fetchall()
+ return rows
+
+ @retry
+ def get_rows(self, attempt=_ATTEMPT, **sql_dict):
""" Obtain rows from a table.
:param SELECT: list or tuple of fields to retrieve) (by default all)
:param FROM: string with table name (Mandatory)
if 'LIMIT' in sql_dict:
cmd += " LIMIT " + str(sql_dict['LIMIT'])
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor(mdb.cursors.DictCursor)
- self.logger.debug(cmd)
- self.cur.execute(cmd)
- rows = self.cur.fetchall()
- return rows
- except (mdb.Error, AttributeError) as e:
- self.logger.error("Exception '{}' with command '{}'".format(e, cmd))
- self._format_error(e, tries)
- tries -= 1
-
- def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_serveral=False, WHERE_OR={}, WHERE_AND_OR="OR"):
+ attempt.info['cmd'] = cmd
+
+ with self.transaction(mdb.cursors.DictCursor):
+ self.logger.debug(cmd)
+ self.cur.execute(cmd)
+ rows = self.cur.fetchall()
+ return rows
+
+ @retry
+ def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_several=False, WHERE_OR={}, WHERE_AND_OR="OR", attempt=_ATTEMPT):
''' Obtain One row from a table based on name or uuid.
Attribute:
table: string of table name
uuid_name: name or uuid. If not uuid format is found, it is considered a name
- allow_severeral: if False return ERROR if more than one row are founded
+ allow_several: if False return ERROR if more than one row are found
error_item_text: in case of error it identifies the 'item' name for a proper output text
'WHERE_OR': dict of key:values, translated to key=value OR ... (Optional)
'WHERE_AND_OR: str 'AND' or 'OR'(by default) mark the priority to 'WHERE AND (WHERE_OR)' or (WHERE) OR WHERE_OR' (Optional
else:
cmd += " OR " + where_or
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor(mdb.cursors.DictCursor)
- self.logger.debug(cmd)
- self.cur.execute(cmd)
- number = self.cur.rowcount
- if number == 0:
- raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found)
- elif number > 1 and not allow_serveral:
- raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict)
- if allow_serveral:
- rows = self.cur.fetchall()
- else:
- rows = self.cur.fetchone()
- return rows
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries, table=table)
- tries -= 1
+ attempt.info['cmd'] = cmd
+
+ with self.transaction(mdb.cursors.DictCursor):
+ self.logger.debug(cmd)
+ self.cur.execute(cmd)
+ number = self.cur.rowcount
+ if number == 0:
+ raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found)
+ elif number > 1 and not allow_several:
+ raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict)
+ if allow_several:
+ rows = self.cur.fetchall()
+ else:
+ rows = self.cur.fetchone()
+ return rows
+ @retry(table='uuids')
+ @with_transaction(cursor='dict')
def get_uuid(self, uuid):
'''check in the database if this uuid is already present'''
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor(mdb.cursors.DictCursor)
- self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'")
- rows = self.cur.fetchall()
- return self.cur.rowcount, rows
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries)
- tries -= 1
+ self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'")
+ rows = self.cur.fetchall()
+ return self.cur.rowcount, rows
+ @retry
+ @with_transaction(cursor='dict')
def get_uuid_from_name(self, table, name):
'''Searchs in table the name and returns the uuid
'''
- tries = 2
- while tries:
- try:
- with self.con:
- self.cur = self.con.cursor(mdb.cursors.DictCursor)
- where_text = "name='" + name +"'"
- self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text)
- rows = self.cur.fetchall()
- if self.cur.rowcount==0:
- return 0, "Name %s not found in table %s" %(name, table)
- elif self.cur.rowcount>1:
- return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table)
- return self.cur.rowcount, rows[0]["uuid"]
- except (mdb.Error, AttributeError) as e:
- self._format_error(e, tries, table=table)
- tries -= 1
-
+ where_text = "name='" + name +"'"
+ self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text)
+ rows = self.cur.fetchall()
+ if self.cur.rowcount==0:
+ return 0, "Name %s not found in table %s" %(name, table)
+ elif self.cur.rowcount>1:
+ return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table)
+ return self.cur.rowcount, rows[0]["uuid"]