From dfed511cd627748046d90894f8600331413b1cd3 Mon Sep 17 00:00:00 2001 From: Anderson Bravalheri Date: Fri, 8 Feb 2019 01:44:14 +0000 Subject: [PATCH] Improve race conditions/MySQL reconnection This commit aims to provide a better synchronization between all the different threads in RO, specially regarding DB usage and internal state consistency. The following improvements were done: 1. Centralize database retry logic into a single function This way we can change the procedure and the rules for retrying in a single place and this reflects in several functions simultaneously avoiding the need for manual copy and paste (and the potential risk of forgetting to change somewhere) 2. Minor fixes/improvements related to database connection loss. Previously `db_base` was already able to identify when the connection to MySQL was lost, but apparently in a few edge cases the automatic reconnection was not done. 3. Implement a transaction method This method replaces the old context manager API for the connection object that was removed from MySQLdb in version 1.4 In additional it is possible to use a decorator for transactions (not only the context manager), which is handy sometimes. 4. Add lock mechanism directly to db_base This helps to improve synchronization between threads. Some extra synchronization was introduced to functions, as it seemed to be the case. Moreover, previously, the cursor object was part of the internal state of the db_base object, and it was being changed/used without thread synchronization (error-prone). Having the locking mechanism around the changes in the cursor property of the class, avoids problems. 5. Add option to fork connection Useful when independent threading is needed (as long as different threads don't access the same database table, having separated connections and locks should work fine). Change-Id: I3ab34df5e8c2857d96ed14a70e7f65bd0b5189a0 Signed-off-by: Anderson Bravalheri --- osm_ro/db_base.py | 394 +++++--- osm_ro/nfvo.py | 8 +- osm_ro/nfvo_db.py | 1957 +++++++++++++++++------------------- osm_ro/tests/test_db.py | 123 +++ osm_ro/tests/test_utils.py | 50 + osm_ro/utils.py | 65 +- osm_ro/vim_thread.py | 145 ++- osm_ro/wim/persistence.py | 77 +- osm_ro/wim/schemas.py | 2 +- 9 files changed, 1537 insertions(+), 1284 deletions(-) create mode 100644 osm_ro/tests/test_db.py create mode 100644 osm_ro/tests/test_utils.py diff --git a/osm_ro/db_base.py b/osm_ro/db_base.py index a4cdef3b..58f5b8cf 100644 --- a/osm_ro/db_base.py +++ b/osm_ro/db_base.py @@ -35,9 +35,85 @@ import json import time import logging import datetime +from contextlib import contextmanager +from functools import wraps, partial +from threading import Lock from jsonschema import validate as js_v, exceptions as js_e from .http_tools import errors as httperrors +from .utils import Attempt, get_arg, inject_args + + +RECOVERY_TIME = 3 + +_ATTEMPT = Attempt() + + +def with_transaction(fn=None, cursor=None): + """Decorator that can be used together with instances of the ``db_base`` + class, to perform database actions wrapped in a commit/rollback fashion + + This decorator basically executes the function inside the context object + given by the ``transaction`` method in ``db_base`` + + Arguments: + cursor: [Optional] cursor class + """ + if fn is None: # Allows calling the decorator directly or with parameters + return partial(with_transaction, cursor=cursor) + + @wraps(fn) + def _wrapper(self, *args, **kwargs): + cursor_type = None + if cursor == 'dict': + # MySQLdB define the "cursors" module attribute lazily, + # so we have to defer references to mdb.cursors.DictCursor + cursor_type = mdb.cursors.DictCursor + + with self.transaction(cursor_type): + return fn(self, *args, **kwargs) + + return _wrapper + + +def retry(fn=None, max_attempts=Attempt.MAX, **info): + """Decorator that can be used together with instances of the ``db_base`` + class, to replay a method again after a unexpected error. + + The function being decorated needs to either be a method of ``db_base`` + subclasses or accept an ``db_base`` instance as the first parameter. + + All the extra keyword arguments will be passed to the ``_format_error`` + method + """ + if fn is None: # Allows calling the decorator directly or with parameters + return partial(retry, max_attempts=max_attempts, **info) + + @wraps(fn) + def _wrapper(*args, **kwargs): + self = args[0] + info.setdefault('table', get_arg('table', fn, args, kwargs)) + attempt = Attempt(max_attempts=max_attempts, info=info) + while attempt.countdown >= 0: + try: + return inject_args(fn, attempt=attempt)(*args, **kwargs) + except (mdb.Error, AttributeError) as ex: + self.logger.debug("Attempt #%d", attempt.number) + try: + # The format error will throw exceptions, however it can + # tolerate a certain amount of retries if it judges that + # the error can be solved with retrying + self._format_error(ex, attempt.countdown, **attempt.info) + # Anyway, unexpected/unknown errors can still be retried + except db_base_Exception as db_ex: + if (attempt.countdown < 0 or db_ex.http_code != + httperrors.Internal_Server_Error): + raise + + attempt.count += 1 + + return _wrapper + def _check_valid_uuid(uuid): id_schema = {"type" : "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"} @@ -137,7 +213,8 @@ class db_base_Exception(httperrors.HttpMappedError): class db_base(): tables_with_created_field=() - def __init__(self, host=None, user=None, passwd=None, database=None, log_name='db', log_level=None): + def __init__(self, host=None, user=None, passwd=None, database=None, + log_name='db', log_level=None, lock=None): self.host = host self.user = user self.passwd = passwd @@ -147,6 +224,7 @@ class db_base(): self.logger = logging.getLogger(log_name) if self.log_level: self.logger.setLevel( getattr(logging, log_level) ) + self.lock = lock or Lock() def connect(self, host=None, user=None, passwd=None, database=None): '''Connect to specific data base. @@ -174,31 +252,23 @@ class db_base(): value = value.encode("utf8") return self.con.escape_string(value) + @retry + @with_transaction def get_db_version(self): ''' Obtain the database schema version. Return: (negative, text) if error or version 0.0 where schema_version table is missing (version_int, version_text) if ok ''' cmd = "SELECT version_int,version FROM schema_version" - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - highest_version_int=0 - highest_version="" - for row in rows: #look for the latest version - if row[0]>highest_version_int: - highest_version_int, highest_version = row[0:2] - return highest_version_int, highest_version - except (mdb.Error, AttributeError) as e: - self.logger.error("Exception '{}' with command '{}'".format(e, cmd)) - #self.logger.error("get_db_version DB Exception %d: %s. Command %s",e.args[0], e.args[1], cmd) - self._format_error(e, tries) - tries -= 1 + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + highest_version_int=0 + highest_version="" + for row in rows: #look for the latest version + if row[0]>highest_version_int: + highest_version_int, highest_version = row[0:2] + return highest_version_int, highest_version def disconnect(self): '''disconnect from specific data base''' @@ -215,7 +285,73 @@ class db_base(): else: raise - def _format_error(self, e, tries=1, command=None, extra=None, table=None): + def reconnect(self): + """Try to gracefully to the database in case of error""" + try: + self.con.ping(True) # auto-reconnect if the server is available + except: + # The server is probably not available... + # Let's wait a bit + time.sleep(RECOVERY_TIME) + self.con = None + self.connect() + + def fork_connection(self): + """Return a new database object, with a separated connection to the + database (and lock), so it can act independently + """ + obj = self.__class__( + host=self.host, + user=self.user, + passwd=self.passwd, + database=self.database, + log_name=self.logger.name, + log_level=self.log_level, + lock=Lock() + ) + + obj.connect() + + return obj + + @contextmanager + def transaction(self, cursor_type=None): + """DB changes that are executed inside this context will be + automatically rolled back in case of error. + + This implementation also adds a lock, so threads sharing the same + connection object are synchronized. + + Arguments: + cursor_type: default: MySQLdb.cursors.DictCursor + + Yields: + Cursor object + + References: + https://www.oreilly.com/library/view/mysql-cookbook-2nd/059652708X/ch15s08.html + https://github.com/PyMySQL/mysqlclient-python/commit/c64915b1e5c705f4fb10e86db5dcfed0b58552cc + """ + # Previously MySQLdb had built-in support for that using the context + # API for the connection object. + # This support was removed in version 1.40 + # https://github.com/PyMySQL/mysqlclient-python/blob/master/HISTORY.rst#whats-new-in-140 + with self.lock: + try: + if self.con.get_autocommit(): + self.con.query("BEGIN") + + self.cur = self.con.cursor(cursor_type) + yield self.cur + except: # noqa + self.con.rollback() + raise + else: + self.con.commit() + + + def _format_error(self, e, tries=1, command=None, + extra=None, table=None, cmd=None, **_): '''Creates a text error base on the produced exception Params: e: mdb exception @@ -225,15 +361,22 @@ class db_base(): extra: extra information to add to some commands Return HTTP error in negative, formatted error text - ''' + ''' # the **_ ignores extra kwargs + table_info = ' (table `{}`)'.format(table) if table else '' + if cmd: + self.logger.debug("Exception '%s' with command '%s'%s", + e, cmd, table_info) + if isinstance(e,AttributeError ): self.logger.debug(str(e), exc_info=True) raise db_base_Exception("DB Exception " + str(e), httperrors.Internal_Server_Error) if e.args[0]==2006 or e.args[0]==2013 : #MySQL server has gone away (((or))) Exception 2013: Lost connection to MySQL server during query - if tries>1: + # Let's aways reconnect if the connection is lost + # so future calls are not affected. + self.reconnect() + + if tries > 1: self.logger.warn("DB Exception '%s'. Retry", str(e)) - #reconnect - self.connect() return else: raise db_base_Exception("Database connection timeout Try Again", httperrors.Request_Timeout) @@ -250,7 +393,6 @@ class db_base(): wc = e.args[1].find("in 'where clause'") fl = e.args[1].find("in 'field list'") #print de, fk, uk, wc,fl - table_info = ' (table `{}`)'.format(table) if table else '' if de>=0: if fk>=0: #error 1062 raise db_base_Exception( @@ -467,6 +609,8 @@ class db_base(): rows = self.cur.fetchall() return rows + @retry + @with_transaction def new_row(self, table, INSERT, add_uuid=False, created_time=0, confidential_data=False): ''' Add one row into a table. Attribute @@ -479,18 +623,11 @@ class db_base(): ''' if table in self.tables_with_created_field and created_time==0: created_time=time.time() - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data) - - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 + return self._new_row_internal(table, INSERT, add_uuid, None, created_time, confidential_data) - def update_rows(self, table, UPDATE, WHERE, modified_time=0): + @retry + @with_transaction + def update_rows(self, table, UPDATE, WHERE, modified_time=0, attempt=_ATTEMPT): """ Update one or several rows of a table. :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values :param table: database table to update @@ -506,16 +643,8 @@ class db_base(): """ if table in self.tables_with_created_field and modified_time==0: modified_time=time.time() - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - return self._update_rows( - table, UPDATE, WHERE, modified_time) - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 + + return self._update_rows(table, UPDATE, WHERE, modified_time) def _delete_row_by_id_internal(self, table, uuid): cmd = "DELETE FROM {} WHERE uuid = '{}'".format(table, uuid) @@ -529,19 +658,13 @@ class db_base(): self.cur.execute(cmd) return deleted + @retry(command='delete', extra='dependencies') + @with_transaction def delete_row_by_id(self, table, uuid): - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - return self._delete_row_by_id_internal(table, uuid) - except (mdb.Error, AttributeError) as e: - self._format_error( - e, tries, "delete", "dependencies", table=table) - tries -= 1 - - def delete_row(self, **sql_dict): + return self._delete_row_by_id_internal(table, uuid) + + @retry + def delete_row(self, attempt=_ATTEMPT, **sql_dict): """ Deletes rows from a table. :param UPDATE: dictionary with the changes. dict keys are database columns that will be set with the dict values :param FROM: string with table name (Mandatory) @@ -560,36 +683,28 @@ class db_base(): cmd += " WHERE " + self.__create_where(sql_dict['WHERE']) if sql_dict.get('LIMIT'): cmd += " LIMIT " + str(sql_dict['LIMIT']) - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor() - self.logger.debug(cmd) - self.cur.execute(cmd) - deleted = self.cur.rowcount - return deleted - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - - def get_rows_by_id(self, table, uuid): + + attempt.info['cmd'] = cmd + + with self.transaction(): + self.logger.debug(cmd) + self.cur.execute(cmd) + deleted = self.cur.rowcount + return deleted + + @retry + @with_transaction(cursor='dict') + def get_rows_by_id(self, table, uuid, attempt=_ATTEMPT): '''get row from a table based on uuid''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid)) - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - return rows - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 - - def get_rows(self, **sql_dict): + cmd="SELECT * FROM {} where uuid='{}'".format(str(table), str(uuid)) + attempt.info['cmd'] = cmd + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + return rows + + @retry + def get_rows(self, attempt=_ATTEMPT, **sql_dict): """ Obtain rows from a table. :param SELECT: list or tuple of fields to retrieve) (by default all) :param FROM: string with table name (Mandatory) @@ -628,21 +743,16 @@ class db_base(): if 'LIMIT' in sql_dict: cmd += " LIMIT " + str(sql_dict['LIMIT']) - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - return rows - except (mdb.Error, AttributeError) as e: - self.logger.error("Exception '{}' with command '{}'".format(e, cmd)) - self._format_error(e, tries) - tries -= 1 - - def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_serveral=False, WHERE_OR={}, WHERE_AND_OR="OR"): + attempt.info['cmd'] = cmd + + with self.transaction(mdb.cursors.DictCursor): + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + return rows + + @retry + def get_table_by_uuid_name(self, table, uuid_name, error_item_text=None, allow_serveral=False, WHERE_OR={}, WHERE_AND_OR="OR", attempt=_ATTEMPT): ''' Obtain One row from a table based on name or uuid. Attribute: table: string of table name @@ -666,58 +776,40 @@ class db_base(): else: cmd += " OR " + where_or - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - self.logger.debug(cmd) - self.cur.execute(cmd) - number = self.cur.rowcount - if number == 0: - raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found) - elif number > 1 and not allow_serveral: - raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict) - if allow_serveral: - rows = self.cur.fetchall() - else: - rows = self.cur.fetchone() - return rows - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 + attempt.info['cmd'] = cmd + + with self.transaction(mdb.cursors.DictCursor): + self.logger.debug(cmd) + self.cur.execute(cmd) + number = self.cur.rowcount + if number == 0: + raise db_base_Exception("No {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Not_Found) + elif number > 1 and not allow_serveral: + raise db_base_Exception("More than one {} found with {} '{}'".format(error_item_text, what, uuid_name), http_code=httperrors.Conflict) + if allow_serveral: + rows = self.cur.fetchall() + else: + rows = self.cur.fetchone() + return rows + @retry(table='uuids') + @with_transaction(cursor='dict') def get_uuid(self, uuid): '''check in the database if this uuid is already present''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'") - rows = self.cur.fetchall() - return self.cur.rowcount, rows - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 + self.cur.execute("SELECT * FROM uuids where uuid='" + str(uuid) + "'") + rows = self.cur.fetchall() + return self.cur.rowcount, rows + @retry + @with_transaction(cursor='dict') def get_uuid_from_name(self, table, name): '''Searchs in table the name and returns the uuid ''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - where_text = "name='" + name +"'" - self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text) - rows = self.cur.fetchall() - if self.cur.rowcount==0: - return 0, "Name %s not found in table %s" %(name, table) - elif self.cur.rowcount>1: - return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table) - return self.cur.rowcount, rows[0]["uuid"] - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table) - tries -= 1 - + where_text = "name='" + name +"'" + self.cur.execute("SELECT * FROM " + table + " WHERE "+ where_text) + rows = self.cur.fetchall() + if self.cur.rowcount==0: + return 0, "Name %s not found in table %s" %(name, table) + elif self.cur.rowcount>1: + return self.cur.rowcount, "More than one VNF with name %s found in table %s" %(name, table) + return self.cur.rowcount, rows[0]["uuid"] diff --git a/osm_ro/nfvo.py b/osm_ro/nfvo.py index 4b7ecc41..ebb30976 100644 --- a/osm_ro/nfvo.py +++ b/osm_ro/nfvo.py @@ -151,14 +151,12 @@ def get_non_used_wim_name(wim_name, wim_id, tenant_name, tenant_id): def start_service(mydb, persistence=None, wim=None): global db, global_config - db = nfvo_db.nfvo_db() + db = nfvo_db.nfvo_db(lock=db_lock) + mydb.lock = db_lock db.connect(global_config['db_host'], global_config['db_user'], global_config['db_passwd'], global_config['db_name']) global ovim - if persistence: - persistence.lock = db_lock - else: - persistence = WimPersistence(db, lock=db_lock) + persistence = persistence or WimPersistence(db) # Initialize openvim for SDN control # TODO: Avoid static configuration by adding new parameters to openmanod.cfg diff --git a/osm_ro/nfvo_db.py b/osm_ro/nfvo_db.py index bad7cec2..d0abdf38 100644 --- a/osm_ro/nfvo_db.py +++ b/osm_ro/nfvo_db.py @@ -34,7 +34,13 @@ import yaml import time #import sys, os +from .db_base import retry, with_transaction from .http_tools import errors as httperrors +from .utils import Attempt + + +_ATTEMPT = Attempt() + tables_with_createdat_field=["datacenters","instance_nets","instance_scenarios","instance_vms","instance_vnfs", "interfaces","nets","nfvo_tenants","scenarios","sce_interfaces","sce_nets", @@ -47,467 +53,442 @@ tables_with_createdat_field=["datacenters","instance_nets","instance_scenarios", class nfvo_db(db_base.db_base): - def __init__(self, host=None, user=None, passwd=None, database=None, log_name='openmano.db', log_level=None): - db_base.db_base.__init__(self, host, user, passwd, database, log_name, log_level) + def __init__(self, host=None, user=None, passwd=None, database=None, + log_name='openmano.db', log_level=None, lock=None): + db_base.db_base.__init__(self, host, user, passwd, database, + log_name, log_level, lock) db_base.db_base.tables_with_created_field=tables_with_createdat_field return + @retry + @with_transaction def new_vnf_as_a_whole(self,nfvo_tenant,vnf_name,vnf_descriptor,VNFCDict): self.logger.debug("Adding new vnf to the NFVO database") - tries = 2 - while tries: - created_time = time.time() - try: - with self.con: - - myVNFDict = {} - myVNFDict["name"] = vnf_name - myVNFDict["descriptor"] = vnf_descriptor['vnf'].get('descriptor') - myVNFDict["public"] = vnf_descriptor['vnf'].get('public', "false") - myVNFDict["description"] = vnf_descriptor['vnf']['description'] - myVNFDict["class"] = vnf_descriptor['vnf'].get('class',"MISC") - myVNFDict["tenant_id"] = vnf_descriptor['vnf'].get("tenant_id") - - vnf_id = self._new_row_internal('vnfs', myVNFDict, add_uuid=True, root_uuid=None, created_time=created_time) - #print "Adding new vms to the NFVO database" - #For each vm, we must create the appropriate vm in the NFVO database. - vmDict = {} - for _,vm in VNFCDict.iteritems(): - #This code could make the name of the vms grow and grow. - #If we agree to follow this convention, we should check with a regex that the vnfc name is not including yet the vnf name - #vm['name'] = "%s-%s" % (vnf_name,vm['name']) - #print "VM name: %s. Description: %s" % (vm['name'], vm['description']) - vm["vnf_id"] = vnf_id - created_time += 0.00001 - vm_id = self._new_row_internal('vms', vm, add_uuid=True, root_uuid=vnf_id, created_time=created_time) - #print "Internal vm id in NFVO DB: %s" % vm_id - vmDict[vm['name']] = vm_id - - #Collect the bridge interfaces of each VM/VNFC under the 'bridge-ifaces' field - bridgeInterfacesDict = {} - for vm in vnf_descriptor['vnf']['VNFC']: - if 'bridge-ifaces' in vm: - bridgeInterfacesDict[vm['name']] = {} - for bridgeiface in vm['bridge-ifaces']: - created_time += 0.00001 - if 'port-security' in bridgeiface: - bridgeiface['port_security'] = bridgeiface.pop('port-security') - if 'floating-ip' in bridgeiface: - bridgeiface['floating_ip'] = bridgeiface.pop('floating-ip') - db_base._convert_bandwidth(bridgeiface, logger=self.logger) - bridgeInterfacesDict[vm['name']][bridgeiface['name']] = {} - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['vpci'] = bridgeiface.get('vpci',None) - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['mac'] = bridgeiface.get('mac_address',None) - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['bw'] = bridgeiface.get('bandwidth', None) - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['model'] = bridgeiface.get('model', None) - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['port_security'] = \ - int(bridgeiface.get('port_security', True)) - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['floating_ip'] = \ - int(bridgeiface.get('floating_ip', False)) - bridgeInterfacesDict[vm['name']][bridgeiface['name']]['created_time'] = created_time - - # Collect the data interfaces of each VM/VNFC under the 'numas' field - dataifacesDict = {} - for vm in vnf_descriptor['vnf']['VNFC']: - dataifacesDict[vm['name']] = {} - for numa in vm.get('numas', []): - for dataiface in numa.get('interfaces', []): - created_time += 0.00001 - db_base._convert_bandwidth(dataiface, logger=self.logger) - dataifacesDict[vm['name']][dataiface['name']] = {} - dataifacesDict[vm['name']][dataiface['name']]['vpci'] = dataiface.get('vpci') - dataifacesDict[vm['name']][dataiface['name']]['bw'] = dataiface['bandwidth'] - dataifacesDict[vm['name']][dataiface['name']]['model'] = "PF" if dataiface[ - 'dedicated'] == "yes" else ( - "VF" if dataiface['dedicated'] == "no" else "VFnotShared") - dataifacesDict[vm['name']][dataiface['name']]['created_time'] = created_time - - #For each internal connection, we add it to the interfaceDict and we create the appropriate net in the NFVO database. - #print "Adding new nets (VNF internal nets) to the NFVO database (if any)" - internalconnList = [] - if 'internal-connections' in vnf_descriptor['vnf']: - for net in vnf_descriptor['vnf']['internal-connections']: - #print "Net name: %s. Description: %s" % (net['name'], net['description']) - - myNetDict = {} - myNetDict["name"] = net['name'] - myNetDict["description"] = net['description'] - myNetDict["type"] = net['type'] - myNetDict["vnf_id"] = vnf_id - - created_time += 0.00001 - net_id = self._new_row_internal('nets', myNetDict, add_uuid=True, root_uuid=vnf_id, created_time=created_time) - - for element in net['elements']: - ifaceItem = {} - #ifaceItem["internal_name"] = "%s-%s-%s" % (net['name'],element['VNFC'], element['local_iface_name']) - ifaceItem["internal_name"] = element['local_iface_name'] - #ifaceItem["vm_id"] = vmDict["%s-%s" % (vnf_name,element['VNFC'])] - ifaceItem["vm_id"] = vmDict[element['VNFC']] - ifaceItem["net_id"] = net_id - ifaceItem["type"] = net['type'] - if ifaceItem ["type"] == "data": - dataiface = dataifacesDict[ element['VNFC'] ][ element['local_iface_name'] ] - ifaceItem["vpci"] = dataiface['vpci'] - ifaceItem["bw"] = dataiface['bw'] - ifaceItem["model"] = dataiface['model'] - created_time_iface = dataiface['created_time'] - else: - bridgeiface = bridgeInterfacesDict[ element['VNFC'] ][ element['local_iface_name'] ] - ifaceItem["vpci"] = bridgeiface['vpci'] - ifaceItem["mac"] = bridgeiface['mac'] - ifaceItem["bw"] = bridgeiface['bw'] - ifaceItem["model"] = bridgeiface['model'] - ifaceItem["port_security"] = bridgeiface['port_security'] - ifaceItem["floating_ip"] = bridgeiface['floating_ip'] - created_time_iface = bridgeiface['created_time'] - internalconnList.append(ifaceItem) - #print "Internal net id in NFVO DB: %s" % net_id - - #print "Adding internal interfaces to the NFVO database (if any)" - for iface in internalconnList: - #print "Iface name: %s" % iface['internal_name'] - iface_id = self._new_row_internal('interfaces', iface, add_uuid=True, root_uuid=vnf_id, created_time = created_time_iface) - #print "Iface id in NFVO DB: %s" % iface_id - - #print "Adding external interfaces to the NFVO database" - for iface in vnf_descriptor['vnf']['external-connections']: - myIfaceDict = {} - #myIfaceDict["internal_name"] = "%s-%s-%s" % (vnf_name,iface['VNFC'], iface['local_iface_name']) - myIfaceDict["internal_name"] = iface['local_iface_name'] - #myIfaceDict["vm_id"] = vmDict["%s-%s" % (vnf_name,iface['VNFC'])] - myIfaceDict["vm_id"] = vmDict[iface['VNFC']] - myIfaceDict["external_name"] = iface['name'] - myIfaceDict["type"] = iface['type'] - if iface["type"] == "data": - dataiface = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ] - myIfaceDict["vpci"] = dataiface['vpci'] - myIfaceDict["bw"] = dataiface['bw'] - myIfaceDict["model"] = dataiface['model'] - created_time_iface = dataiface['created_time'] - else: - bridgeiface = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ] - myIfaceDict["vpci"] = bridgeiface['vpci'] - myIfaceDict["bw"] = bridgeiface['bw'] - myIfaceDict["model"] = bridgeiface['model'] - myIfaceDict["mac"] = bridgeiface['mac'] - myIfaceDict["port_security"]= bridgeiface['port_security'] - myIfaceDict["floating_ip"] = bridgeiface['floating_ip'] - created_time_iface = bridgeiface['created_time'] - #print "Iface name: %s" % iface['name'] - iface_id = self._new_row_internal('interfaces', myIfaceDict, add_uuid=True, root_uuid=vnf_id, created_time = created_time_iface) - #print "Iface id in NFVO DB: %s" % iface_id - - return vnf_id - - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - + created_time = time.time() + myVNFDict = {} + myVNFDict["name"] = vnf_name + myVNFDict["descriptor"] = vnf_descriptor['vnf'].get('descriptor') + myVNFDict["public"] = vnf_descriptor['vnf'].get('public', "false") + myVNFDict["description"] = vnf_descriptor['vnf']['description'] + myVNFDict["class"] = vnf_descriptor['vnf'].get('class',"MISC") + myVNFDict["tenant_id"] = vnf_descriptor['vnf'].get("tenant_id") + + vnf_id = self._new_row_internal('vnfs', myVNFDict, add_uuid=True, root_uuid=None, created_time=created_time) + #print "Adding new vms to the NFVO database" + #For each vm, we must create the appropriate vm in the NFVO database. + vmDict = {} + for _,vm in VNFCDict.iteritems(): + #This code could make the name of the vms grow and grow. + #If we agree to follow this convention, we should check with a regex that the vnfc name is not including yet the vnf name + #vm['name'] = "%s-%s" % (vnf_name,vm['name']) + #print "VM name: %s. Description: %s" % (vm['name'], vm['description']) + vm["vnf_id"] = vnf_id + created_time += 0.00001 + vm_id = self._new_row_internal('vms', vm, add_uuid=True, root_uuid=vnf_id, created_time=created_time) + #print "Internal vm id in NFVO DB: %s" % vm_id + vmDict[vm['name']] = vm_id + + #Collect the bridge interfaces of each VM/VNFC under the 'bridge-ifaces' field + bridgeInterfacesDict = {} + for vm in vnf_descriptor['vnf']['VNFC']: + if 'bridge-ifaces' in vm: + bridgeInterfacesDict[vm['name']] = {} + for bridgeiface in vm['bridge-ifaces']: + created_time += 0.00001 + if 'port-security' in bridgeiface: + bridgeiface['port_security'] = bridgeiface.pop('port-security') + if 'floating-ip' in bridgeiface: + bridgeiface['floating_ip'] = bridgeiface.pop('floating-ip') + db_base._convert_bandwidth(bridgeiface, logger=self.logger) + bridgeInterfacesDict[vm['name']][bridgeiface['name']] = {} + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['vpci'] = bridgeiface.get('vpci',None) + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['mac'] = bridgeiface.get('mac_address',None) + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['bw'] = bridgeiface.get('bandwidth', None) + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['model'] = bridgeiface.get('model', None) + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['port_security'] = \ + int(bridgeiface.get('port_security', True)) + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['floating_ip'] = \ + int(bridgeiface.get('floating_ip', False)) + bridgeInterfacesDict[vm['name']][bridgeiface['name']]['created_time'] = created_time + + # Collect the data interfaces of each VM/VNFC under the 'numas' field + dataifacesDict = {} + for vm in vnf_descriptor['vnf']['VNFC']: + dataifacesDict[vm['name']] = {} + for numa in vm.get('numas', []): + for dataiface in numa.get('interfaces', []): + created_time += 0.00001 + db_base._convert_bandwidth(dataiface, logger=self.logger) + dataifacesDict[vm['name']][dataiface['name']] = {} + dataifacesDict[vm['name']][dataiface['name']]['vpci'] = dataiface.get('vpci') + dataifacesDict[vm['name']][dataiface['name']]['bw'] = dataiface['bandwidth'] + dataifacesDict[vm['name']][dataiface['name']]['model'] = "PF" if dataiface[ + 'dedicated'] == "yes" else ( + "VF" if dataiface['dedicated'] == "no" else "VFnotShared") + dataifacesDict[vm['name']][dataiface['name']]['created_time'] = created_time + + #For each internal connection, we add it to the interfaceDict and we create the appropriate net in the NFVO database. + #print "Adding new nets (VNF internal nets) to the NFVO database (if any)" + internalconnList = [] + if 'internal-connections' in vnf_descriptor['vnf']: + for net in vnf_descriptor['vnf']['internal-connections']: + #print "Net name: %s. Description: %s" % (net['name'], net['description']) + + myNetDict = {} + myNetDict["name"] = net['name'] + myNetDict["description"] = net['description'] + myNetDict["type"] = net['type'] + myNetDict["vnf_id"] = vnf_id + + created_time += 0.00001 + net_id = self._new_row_internal('nets', myNetDict, add_uuid=True, root_uuid=vnf_id, created_time=created_time) + + for element in net['elements']: + ifaceItem = {} + #ifaceItem["internal_name"] = "%s-%s-%s" % (net['name'],element['VNFC'], element['local_iface_name']) + ifaceItem["internal_name"] = element['local_iface_name'] + #ifaceItem["vm_id"] = vmDict["%s-%s" % (vnf_name,element['VNFC'])] + ifaceItem["vm_id"] = vmDict[element['VNFC']] + ifaceItem["net_id"] = net_id + ifaceItem["type"] = net['type'] + if ifaceItem ["type"] == "data": + dataiface = dataifacesDict[ element['VNFC'] ][ element['local_iface_name'] ] + ifaceItem["vpci"] = dataiface['vpci'] + ifaceItem["bw"] = dataiface['bw'] + ifaceItem["model"] = dataiface['model'] + created_time_iface = dataiface['created_time'] + else: + bridgeiface = bridgeInterfacesDict[ element['VNFC'] ][ element['local_iface_name'] ] + ifaceItem["vpci"] = bridgeiface['vpci'] + ifaceItem["mac"] = bridgeiface['mac'] + ifaceItem["bw"] = bridgeiface['bw'] + ifaceItem["model"] = bridgeiface['model'] + ifaceItem["port_security"] = bridgeiface['port_security'] + ifaceItem["floating_ip"] = bridgeiface['floating_ip'] + created_time_iface = bridgeiface['created_time'] + internalconnList.append(ifaceItem) + #print "Internal net id in NFVO DB: %s" % net_id + + #print "Adding internal interfaces to the NFVO database (if any)" + for iface in internalconnList: + #print "Iface name: %s" % iface['internal_name'] + iface_id = self._new_row_internal('interfaces', iface, add_uuid=True, root_uuid=vnf_id, created_time = created_time_iface) + #print "Iface id in NFVO DB: %s" % iface_id + + #print "Adding external interfaces to the NFVO database" + for iface in vnf_descriptor['vnf']['external-connections']: + myIfaceDict = {} + #myIfaceDict["internal_name"] = "%s-%s-%s" % (vnf_name,iface['VNFC'], iface['local_iface_name']) + myIfaceDict["internal_name"] = iface['local_iface_name'] + #myIfaceDict["vm_id"] = vmDict["%s-%s" % (vnf_name,iface['VNFC'])] + myIfaceDict["vm_id"] = vmDict[iface['VNFC']] + myIfaceDict["external_name"] = iface['name'] + myIfaceDict["type"] = iface['type'] + if iface["type"] == "data": + dataiface = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ] + myIfaceDict["vpci"] = dataiface['vpci'] + myIfaceDict["bw"] = dataiface['bw'] + myIfaceDict["model"] = dataiface['model'] + created_time_iface = dataiface['created_time'] + else: + bridgeiface = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ] + myIfaceDict["vpci"] = bridgeiface['vpci'] + myIfaceDict["bw"] = bridgeiface['bw'] + myIfaceDict["model"] = bridgeiface['model'] + myIfaceDict["mac"] = bridgeiface['mac'] + myIfaceDict["port_security"]= bridgeiface['port_security'] + myIfaceDict["floating_ip"] = bridgeiface['floating_ip'] + created_time_iface = bridgeiface['created_time'] + #print "Iface name: %s" % iface['name'] + iface_id = self._new_row_internal('interfaces', myIfaceDict, add_uuid=True, root_uuid=vnf_id, created_time = created_time_iface) + #print "Iface id in NFVO DB: %s" % iface_id + + return vnf_id + + @retry + @with_transaction def new_vnf_as_a_whole2(self,nfvo_tenant,vnf_name,vnf_descriptor,VNFCDict): self.logger.debug("Adding new vnf to the NFVO database") - tries = 2 - while tries: - created_time = time.time() - try: - with self.con: - - myVNFDict = {} - myVNFDict["name"] = vnf_name - myVNFDict["descriptor"] = vnf_descriptor['vnf'].get('descriptor') - myVNFDict["public"] = vnf_descriptor['vnf'].get('public', "false") - myVNFDict["description"] = vnf_descriptor['vnf']['description'] - myVNFDict["class"] = vnf_descriptor['vnf'].get('class',"MISC") - myVNFDict["tenant_id"] = vnf_descriptor['vnf'].get("tenant_id") - - vnf_id = self._new_row_internal('vnfs', myVNFDict, add_uuid=True, root_uuid=None, created_time=created_time) - #print "Adding new vms to the NFVO database" - #For each vm, we must create the appropriate vm in the NFVO database. - vmDict = {} - for _,vm in VNFCDict.iteritems(): - #This code could make the name of the vms grow and grow. - #If we agree to follow this convention, we should check with a regex that the vnfc name is not including yet the vnf name - #vm['name'] = "%s-%s" % (vnf_name,vm['name']) - #print "VM name: %s. Description: %s" % (vm['name'], vm['description']) - vm["vnf_id"] = vnf_id - created_time += 0.00001 - vm_id = self._new_row_internal('vms', vm, add_uuid=True, root_uuid=vnf_id, created_time=created_time) - #print "Internal vm id in NFVO DB: %s" % vm_id - vmDict[vm['name']] = vm_id - - #Collect the bridge interfaces of each VM/VNFC under the 'bridge-ifaces' field - bridgeInterfacesDict = {} - for vm in vnf_descriptor['vnf']['VNFC']: - if 'bridge-ifaces' in vm: - bridgeInterfacesDict[vm['name']] = {} - for bridgeiface in vm['bridge-ifaces']: - created_time += 0.00001 - db_base._convert_bandwidth(bridgeiface, logger=self.logger) - if 'port-security' in bridgeiface: - bridgeiface['port_security'] = bridgeiface.pop('port-security') - if 'floating-ip' in bridgeiface: - bridgeiface['floating_ip'] = bridgeiface.pop('floating-ip') - ifaceDict = {} - ifaceDict['vpci'] = bridgeiface.get('vpci',None) - ifaceDict['mac'] = bridgeiface.get('mac_address',None) - ifaceDict['bw'] = bridgeiface.get('bandwidth', None) - ifaceDict['model'] = bridgeiface.get('model', None) - ifaceDict['port_security'] = int(bridgeiface.get('port_security', True)) - ifaceDict['floating_ip'] = int(bridgeiface.get('floating_ip', False)) - ifaceDict['created_time'] = created_time - bridgeInterfacesDict[vm['name']][bridgeiface['name']] = ifaceDict - - # Collect the data interfaces of each VM/VNFC under the 'numas' field - dataifacesDict = {} - for vm in vnf_descriptor['vnf']['VNFC']: - dataifacesDict[vm['name']] = {} - for numa in vm.get('numas', []): - for dataiface in numa.get('interfaces', []): - created_time += 0.00001 - db_base._convert_bandwidth(dataiface, logger=self.logger) - ifaceDict = {} - ifaceDict['vpci'] = dataiface.get('vpci') - ifaceDict['bw'] = dataiface['bandwidth'] - ifaceDict['model'] = "PF" if dataiface['dedicated'] == "yes" else \ - ("VF" if dataiface['dedicated'] == "no" else "VFnotShared") - ifaceDict['created_time'] = created_time - dataifacesDict[vm['name']][dataiface['name']] = ifaceDict - - #For each internal connection, we add it to the interfaceDict and we create the appropriate net in the NFVO database. - #print "Adding new nets (VNF internal nets) to the NFVO database (if any)" - if 'internal-connections' in vnf_descriptor['vnf']: - for net in vnf_descriptor['vnf']['internal-connections']: - #print "Net name: %s. Description: %s" % (net['name'], net['description']) - - myNetDict = {} - myNetDict["name"] = net['name'] - myNetDict["description"] = net['description'] - if (net["implementation"] == "overlay"): - net["type"] = "bridge" - #It should give an error if the type is e-line. For the moment, we consider it as a bridge - elif (net["implementation"] == "underlay"): - if (net["type"] == "e-line"): - net["type"] = "ptp" - elif (net["type"] == "e-lan"): - net["type"] = "data" - net.pop("implementation") - myNetDict["type"] = net['type'] - myNetDict["vnf_id"] = vnf_id - - created_time += 0.00001 - net_id = self._new_row_internal('nets', myNetDict, add_uuid=True, root_uuid=vnf_id, created_time=created_time) - - if "ip-profile" in net: - ip_profile = net["ip-profile"] - myIPProfileDict = {} - myIPProfileDict["net_id"] = net_id - myIPProfileDict["ip_version"] = ip_profile.get('ip-version',"IPv4") - myIPProfileDict["subnet_address"] = ip_profile.get('subnet-address',None) - myIPProfileDict["gateway_address"] = ip_profile.get('gateway-address',None) - myIPProfileDict["dns_address"] = ip_profile.get('dns-address',None) - if ("dhcp" in ip_profile): - myIPProfileDict["dhcp_enabled"] = ip_profile["dhcp"].get('enabled',"true") - myIPProfileDict["dhcp_start_address"] = ip_profile["dhcp"].get('start-address',None) - myIPProfileDict["dhcp_count"] = ip_profile["dhcp"].get('count',None) - - created_time += 0.00001 - ip_profile_id = self._new_row_internal('ip_profiles', myIPProfileDict) - - for element in net['elements']: - ifaceItem = {} - #ifaceItem["internal_name"] = "%s-%s-%s" % (net['name'],element['VNFC'], element['local_iface_name']) - ifaceItem["internal_name"] = element['local_iface_name'] - #ifaceItem["vm_id"] = vmDict["%s-%s" % (vnf_name,element['VNFC'])] - ifaceItem["vm_id"] = vmDict[element['VNFC']] - ifaceItem["net_id"] = net_id - ifaceItem["type"] = net['type'] - ifaceItem["ip_address"] = element.get('ip_address',None) - if ifaceItem ["type"] == "data": - ifaceDict = dataifacesDict[ element['VNFC'] ][ element['local_iface_name'] ] - ifaceItem["vpci"] = ifaceDict['vpci'] - ifaceItem["bw"] = ifaceDict['bw'] - ifaceItem["model"] = ifaceDict['model'] - else: - ifaceDict = bridgeInterfacesDict[ element['VNFC'] ][ element['local_iface_name'] ] - ifaceItem["vpci"] = ifaceDict['vpci'] - ifaceItem["mac"] = ifaceDict['mac'] - ifaceItem["bw"] = ifaceDict['bw'] - ifaceItem["model"] = ifaceDict['model'] - ifaceItem["port_security"] = ifaceDict['port_security'] - ifaceItem["floating_ip"] = ifaceDict['floating_ip'] - created_time_iface = ifaceDict["created_time"] - #print "Iface name: %s" % iface['internal_name'] - iface_id = self._new_row_internal('interfaces', ifaceItem, add_uuid=True, root_uuid=vnf_id, created_time=created_time_iface) - #print "Iface id in NFVO DB: %s" % iface_id - - #print "Adding external interfaces to the NFVO database" - for iface in vnf_descriptor['vnf']['external-connections']: - myIfaceDict = {} - #myIfaceDict["internal_name"] = "%s-%s-%s" % (vnf_name,iface['VNFC'], iface['local_iface_name']) - myIfaceDict["internal_name"] = iface['local_iface_name'] - #myIfaceDict["vm_id"] = vmDict["%s-%s" % (vnf_name,iface['VNFC'])] - myIfaceDict["vm_id"] = vmDict[iface['VNFC']] - myIfaceDict["external_name"] = iface['name'] - myIfaceDict["type"] = iface['type'] - if iface["type"] == "data": - myIfaceDict["vpci"] = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['vpci'] - myIfaceDict["bw"] = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['bw'] - myIfaceDict["model"] = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['model'] - created_time_iface = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['created_time'] - else: - myIfaceDict["vpci"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['vpci'] - myIfaceDict["bw"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['bw'] - myIfaceDict["model"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['model'] - myIfaceDict["mac"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['mac'] - myIfaceDict["port_security"] = \ - bridgeInterfacesDict[iface['VNFC']][iface['local_iface_name']]['port_security'] - myIfaceDict["floating_ip"] = \ - bridgeInterfacesDict[iface['VNFC']][iface['local_iface_name']]['floating_ip'] - created_time_iface = bridgeInterfacesDict[iface['VNFC']][iface['local_iface_name']]['created_time'] - #print "Iface name: %s" % iface['name'] - iface_id = self._new_row_internal('interfaces', myIfaceDict, add_uuid=True, root_uuid=vnf_id, created_time=created_time_iface) - #print "Iface id in NFVO DB: %s" % iface_id - - return vnf_id - - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) + created_time = time.time() + myVNFDict = {} + myVNFDict["name"] = vnf_name + myVNFDict["descriptor"] = vnf_descriptor['vnf'].get('descriptor') + myVNFDict["public"] = vnf_descriptor['vnf'].get('public', "false") + myVNFDict["description"] = vnf_descriptor['vnf']['description'] + myVNFDict["class"] = vnf_descriptor['vnf'].get('class',"MISC") + myVNFDict["tenant_id"] = vnf_descriptor['vnf'].get("tenant_id") + + vnf_id = self._new_row_internal('vnfs', myVNFDict, add_uuid=True, root_uuid=None, created_time=created_time) + #print "Adding new vms to the NFVO database" + #For each vm, we must create the appropriate vm in the NFVO database. + vmDict = {} + for _,vm in VNFCDict.iteritems(): + #This code could make the name of the vms grow and grow. + #If we agree to follow this convention, we should check with a regex that the vnfc name is not including yet the vnf name + #vm['name'] = "%s-%s" % (vnf_name,vm['name']) + #print "VM name: %s. Description: %s" % (vm['name'], vm['description']) + vm["vnf_id"] = vnf_id + created_time += 0.00001 + vm_id = self._new_row_internal('vms', vm, add_uuid=True, root_uuid=vnf_id, created_time=created_time) + #print "Internal vm id in NFVO DB: %s" % vm_id + vmDict[vm['name']] = vm_id + + #Collect the bridge interfaces of each VM/VNFC under the 'bridge-ifaces' field + bridgeInterfacesDict = {} + for vm in vnf_descriptor['vnf']['VNFC']: + if 'bridge-ifaces' in vm: + bridgeInterfacesDict[vm['name']] = {} + for bridgeiface in vm['bridge-ifaces']: + created_time += 0.00001 + db_base._convert_bandwidth(bridgeiface, logger=self.logger) + if 'port-security' in bridgeiface: + bridgeiface['port_security'] = bridgeiface.pop('port-security') + if 'floating-ip' in bridgeiface: + bridgeiface['floating_ip'] = bridgeiface.pop('floating-ip') + ifaceDict = {} + ifaceDict['vpci'] = bridgeiface.get('vpci',None) + ifaceDict['mac'] = bridgeiface.get('mac_address',None) + ifaceDict['bw'] = bridgeiface.get('bandwidth', None) + ifaceDict['model'] = bridgeiface.get('model', None) + ifaceDict['port_security'] = int(bridgeiface.get('port_security', True)) + ifaceDict['floating_ip'] = int(bridgeiface.get('floating_ip', False)) + ifaceDict['created_time'] = created_time + bridgeInterfacesDict[vm['name']][bridgeiface['name']] = ifaceDict + + # Collect the data interfaces of each VM/VNFC under the 'numas' field + dataifacesDict = {} + for vm in vnf_descriptor['vnf']['VNFC']: + dataifacesDict[vm['name']] = {} + for numa in vm.get('numas', []): + for dataiface in numa.get('interfaces', []): + created_time += 0.00001 + db_base._convert_bandwidth(dataiface, logger=self.logger) + ifaceDict = {} + ifaceDict['vpci'] = dataiface.get('vpci') + ifaceDict['bw'] = dataiface['bandwidth'] + ifaceDict['model'] = "PF" if dataiface['dedicated'] == "yes" else \ + ("VF" if dataiface['dedicated'] == "no" else "VFnotShared") + ifaceDict['created_time'] = created_time + dataifacesDict[vm['name']][dataiface['name']] = ifaceDict + + #For each internal connection, we add it to the interfaceDict and we create the appropriate net in the NFVO database. + #print "Adding new nets (VNF internal nets) to the NFVO database (if any)" + if 'internal-connections' in vnf_descriptor['vnf']: + for net in vnf_descriptor['vnf']['internal-connections']: + #print "Net name: %s. Description: %s" % (net['name'], net['description']) + + myNetDict = {} + myNetDict["name"] = net['name'] + myNetDict["description"] = net['description'] + if (net["implementation"] == "overlay"): + net["type"] = "bridge" + #It should give an error if the type is e-line. For the moment, we consider it as a bridge + elif (net["implementation"] == "underlay"): + if (net["type"] == "e-line"): + net["type"] = "ptp" + elif (net["type"] == "e-lan"): + net["type"] = "data" + net.pop("implementation") + myNetDict["type"] = net['type'] + myNetDict["vnf_id"] = vnf_id + + created_time += 0.00001 + net_id = self._new_row_internal('nets', myNetDict, add_uuid=True, root_uuid=vnf_id, created_time=created_time) + + if "ip-profile" in net: + ip_profile = net["ip-profile"] + myIPProfileDict = {} + myIPProfileDict["net_id"] = net_id + myIPProfileDict["ip_version"] = ip_profile.get('ip-version',"IPv4") + myIPProfileDict["subnet_address"] = ip_profile.get('subnet-address',None) + myIPProfileDict["gateway_address"] = ip_profile.get('gateway-address',None) + myIPProfileDict["dns_address"] = ip_profile.get('dns-address',None) + if ("dhcp" in ip_profile): + myIPProfileDict["dhcp_enabled"] = ip_profile["dhcp"].get('enabled',"true") + myIPProfileDict["dhcp_start_address"] = ip_profile["dhcp"].get('start-address',None) + myIPProfileDict["dhcp_count"] = ip_profile["dhcp"].get('count',None) + + created_time += 0.00001 + ip_profile_id = self._new_row_internal('ip_profiles', myIPProfileDict) + + for element in net['elements']: + ifaceItem = {} + #ifaceItem["internal_name"] = "%s-%s-%s" % (net['name'],element['VNFC'], element['local_iface_name']) + ifaceItem["internal_name"] = element['local_iface_name'] + #ifaceItem["vm_id"] = vmDict["%s-%s" % (vnf_name,element['VNFC'])] + ifaceItem["vm_id"] = vmDict[element['VNFC']] + ifaceItem["net_id"] = net_id + ifaceItem["type"] = net['type'] + ifaceItem["ip_address"] = element.get('ip_address',None) + if ifaceItem ["type"] == "data": + ifaceDict = dataifacesDict[ element['VNFC'] ][ element['local_iface_name'] ] + ifaceItem["vpci"] = ifaceDict['vpci'] + ifaceItem["bw"] = ifaceDict['bw'] + ifaceItem["model"] = ifaceDict['model'] + else: + ifaceDict = bridgeInterfacesDict[ element['VNFC'] ][ element['local_iface_name'] ] + ifaceItem["vpci"] = ifaceDict['vpci'] + ifaceItem["mac"] = ifaceDict['mac'] + ifaceItem["bw"] = ifaceDict['bw'] + ifaceItem["model"] = ifaceDict['model'] + ifaceItem["port_security"] = ifaceDict['port_security'] + ifaceItem["floating_ip"] = ifaceDict['floating_ip'] + created_time_iface = ifaceDict["created_time"] + #print "Iface name: %s" % iface['internal_name'] + iface_id = self._new_row_internal('interfaces', ifaceItem, add_uuid=True, root_uuid=vnf_id, created_time=created_time_iface) + #print "Iface id in NFVO DB: %s" % iface_id + + #print "Adding external interfaces to the NFVO database" + for iface in vnf_descriptor['vnf']['external-connections']: + myIfaceDict = {} + #myIfaceDict["internal_name"] = "%s-%s-%s" % (vnf_name,iface['VNFC'], iface['local_iface_name']) + myIfaceDict["internal_name"] = iface['local_iface_name'] + #myIfaceDict["vm_id"] = vmDict["%s-%s" % (vnf_name,iface['VNFC'])] + myIfaceDict["vm_id"] = vmDict[iface['VNFC']] + myIfaceDict["external_name"] = iface['name'] + myIfaceDict["type"] = iface['type'] + if iface["type"] == "data": + myIfaceDict["vpci"] = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['vpci'] + myIfaceDict["bw"] = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['bw'] + myIfaceDict["model"] = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['model'] + created_time_iface = dataifacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['created_time'] + else: + myIfaceDict["vpci"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['vpci'] + myIfaceDict["bw"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['bw'] + myIfaceDict["model"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['model'] + myIfaceDict["mac"] = bridgeInterfacesDict[ iface['VNFC'] ][ iface['local_iface_name'] ]['mac'] + myIfaceDict["port_security"] = \ + bridgeInterfacesDict[iface['VNFC']][iface['local_iface_name']]['port_security'] + myIfaceDict["floating_ip"] = \ + bridgeInterfacesDict[iface['VNFC']][iface['local_iface_name']]['floating_ip'] + created_time_iface = bridgeInterfacesDict[iface['VNFC']][iface['local_iface_name']]['created_time'] + #print "Iface name: %s" % iface['name'] + iface_id = self._new_row_internal('interfaces', myIfaceDict, add_uuid=True, root_uuid=vnf_id, created_time=created_time_iface) + #print "Iface id in NFVO DB: %s" % iface_id + + return vnf_id + # except KeyError as e2: # exc_type, exc_obj, exc_tb = sys.exc_info() # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] # self.logger.debug("Exception type: %s; Filename: %s; Line number: %s", exc_type, fname, exc_tb.tb_lineno) # raise KeyError - tries -= 1 + @retry + @with_transaction def new_scenario(self, scenario_dict): - tries = 2 - while tries: - created_time = time.time() - try: - with self.con: - self.cur = self.con.cursor() - tenant_id = scenario_dict.get('tenant_id') - #scenario - INSERT_={'tenant_id': tenant_id, - 'name': scenario_dict['name'], - 'description': scenario_dict['description'], - 'public': scenario_dict.get('public', "false")} - - scenario_uuid = self._new_row_internal('scenarios', INSERT_, add_uuid=True, root_uuid=None, created_time=created_time) - #sce_nets - for net in scenario_dict['nets'].values(): - net_dict={'scenario_id': scenario_uuid} - net_dict["name"] = net["name"] - net_dict["type"] = net["type"] - net_dict["description"] = net.get("description") - net_dict["external"] = net.get("external", False) - if "graph" in net: - #net["graph"]=yaml.safe_dump(net["graph"],default_flow_style=True,width=256) - #TODO, must be json because of the GUI, change to yaml - net_dict["graph"]=json.dumps(net["graph"]) - created_time += 0.00001 - net_uuid = self._new_row_internal('sce_nets', net_dict, add_uuid=True, root_uuid=scenario_uuid, created_time=created_time) - net['uuid']=net_uuid - - if net.get("ip-profile"): - ip_profile = net["ip-profile"] - myIPProfileDict = { - "sce_net_id": net_uuid, - "ip_version": ip_profile.get('ip-version', "IPv4"), - "subnet_address": ip_profile.get('subnet-address'), - "gateway_address": ip_profile.get('gateway-address'), - "dns_address": ip_profile.get('dns-address')} - if "dhcp" in ip_profile: - myIPProfileDict["dhcp_enabled"] = ip_profile["dhcp"].get('enabled', "true") - myIPProfileDict["dhcp_start_address"] = ip_profile["dhcp"].get('start-address') - myIPProfileDict["dhcp_count"] = ip_profile["dhcp"].get('count') - self._new_row_internal('ip_profiles', myIPProfileDict) - - # sce_vnfs - for k, vnf in scenario_dict['vnfs'].items(): - INSERT_ = {'scenario_id': scenario_uuid, - 'name': k, - 'vnf_id': vnf['uuid'], - # 'description': scenario_dict['name'] - 'description': vnf['description']} - if "graph" in vnf: - #I NSERT_["graph"]=yaml.safe_dump(vnf["graph"],default_flow_style=True,width=256) - # TODO, must be json because of the GUI, change to yaml - INSERT_["graph"] = json.dumps(vnf["graph"]) - created_time += 0.00001 - scn_vnf_uuid = self._new_row_internal('sce_vnfs', INSERT_, add_uuid=True, - root_uuid=scenario_uuid, created_time=created_time) - vnf['scn_vnf_uuid']=scn_vnf_uuid - # sce_interfaces - for iface in vnf['ifaces'].values(): - # print 'iface', iface - if 'net_key' not in iface: - continue - iface['net_id'] = scenario_dict['nets'][ iface['net_key'] ]['uuid'] - INSERT_={'sce_vnf_id': scn_vnf_uuid, - 'sce_net_id': iface['net_id'], - 'interface_id': iface['uuid'], - 'ip_address': iface.get('ip_address')} - created_time += 0.00001 - iface_uuid = self._new_row_internal('sce_interfaces', INSERT_, add_uuid=True, - root_uuid=scenario_uuid, created_time=created_time) - - return scenario_uuid - - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - + created_time = time.time() + tenant_id = scenario_dict.get('tenant_id') + #scenario + INSERT_={'tenant_id': tenant_id, + 'name': scenario_dict['name'], + 'description': scenario_dict['description'], + 'public': scenario_dict.get('public', "false")} + + scenario_uuid = self._new_row_internal('scenarios', INSERT_, add_uuid=True, root_uuid=None, created_time=created_time) + #sce_nets + for net in scenario_dict['nets'].values(): + net_dict={'scenario_id': scenario_uuid} + net_dict["name"] = net["name"] + net_dict["type"] = net["type"] + net_dict["description"] = net.get("description") + net_dict["external"] = net.get("external", False) + if "graph" in net: + #net["graph"]=yaml.safe_dump(net["graph"],default_flow_style=True,width=256) + #TODO, must be json because of the GUI, change to yaml + net_dict["graph"]=json.dumps(net["graph"]) + created_time += 0.00001 + net_uuid = self._new_row_internal('sce_nets', net_dict, add_uuid=True, root_uuid=scenario_uuid, created_time=created_time) + net['uuid']=net_uuid + + if net.get("ip-profile"): + ip_profile = net["ip-profile"] + myIPProfileDict = { + "sce_net_id": net_uuid, + "ip_version": ip_profile.get('ip-version', "IPv4"), + "subnet_address": ip_profile.get('subnet-address'), + "gateway_address": ip_profile.get('gateway-address'), + "dns_address": ip_profile.get('dns-address')} + if "dhcp" in ip_profile: + myIPProfileDict["dhcp_enabled"] = ip_profile["dhcp"].get('enabled', "true") + myIPProfileDict["dhcp_start_address"] = ip_profile["dhcp"].get('start-address') + myIPProfileDict["dhcp_count"] = ip_profile["dhcp"].get('count') + self._new_row_internal('ip_profiles', myIPProfileDict) + + # sce_vnfs + for k, vnf in scenario_dict['vnfs'].items(): + INSERT_ = {'scenario_id': scenario_uuid, + 'name': k, + 'vnf_id': vnf['uuid'], + # 'description': scenario_dict['name'] + 'description': vnf['description']} + if "graph" in vnf: + #I NSERT_["graph"]=yaml.safe_dump(vnf["graph"],default_flow_style=True,width=256) + # TODO, must be json because of the GUI, change to yaml + INSERT_["graph"] = json.dumps(vnf["graph"]) + created_time += 0.00001 + scn_vnf_uuid = self._new_row_internal('sce_vnfs', INSERT_, add_uuid=True, + root_uuid=scenario_uuid, created_time=created_time) + vnf['scn_vnf_uuid']=scn_vnf_uuid + # sce_interfaces + for iface in vnf['ifaces'].values(): + # print 'iface', iface + if 'net_key' not in iface: + continue + iface['net_id'] = scenario_dict['nets'][ iface['net_key'] ]['uuid'] + INSERT_={'sce_vnf_id': scn_vnf_uuid, + 'sce_net_id': iface['net_id'], + 'interface_id': iface['uuid'], + 'ip_address': iface.get('ip_address')} + created_time += 0.00001 + iface_uuid = self._new_row_internal('sce_interfaces', INSERT_, add_uuid=True, + root_uuid=scenario_uuid, created_time=created_time) + + return scenario_uuid + + @retry + @with_transaction def edit_scenario(self, scenario_dict): - tries = 2 - while tries: - modified_time = time.time() - item_changed=0 - try: - with self.con: - self.cur = self.con.cursor() - #check that scenario exist - tenant_id = scenario_dict.get('tenant_id') - scenario_uuid = scenario_dict['uuid'] - - where_text = "uuid='{}'".format(scenario_uuid) - if not tenant_id and tenant_id != "any": - where_text += " AND (tenant_id='{}' OR public='True')".format(tenant_id) - cmd = "SELECT * FROM scenarios WHERE "+ where_text - self.logger.debug(cmd) - self.cur.execute(cmd) - self.cur.fetchall() - if self.cur.rowcount==0: - raise db_base.db_base_Exception("No scenario found with this criteria " + where_text, httperrors.Bad_Request) - elif self.cur.rowcount>1: - raise db_base.db_base_Exception("More than one scenario found with this criteria " + where_text, httperrors.Bad_Request) - - #scenario - nodes = {} - topology = scenario_dict.pop("topology", None) - if topology != None and "nodes" in topology: - nodes = topology.get("nodes",{}) - UPDATE_ = {} - if "name" in scenario_dict: UPDATE_["name"] = scenario_dict["name"] - if "description" in scenario_dict: UPDATE_["description"] = scenario_dict["description"] - if len(UPDATE_)>0: - WHERE_={'tenant_id': tenant_id, 'uuid': scenario_uuid} - item_changed += self._update_rows('scenarios', UPDATE_, WHERE_, modified_time=modified_time) - #sce_nets - for node_id, node in nodes.items(): - if "graph" in node: - #node["graph"] = yaml.safe_dump(node["graph"],default_flow_style=True,width=256) - #TODO, must be json because of the GUI, change to yaml - node["graph"] = json.dumps(node["graph"]) - WHERE_={'scenario_id': scenario_uuid, 'uuid': node_id} - #Try to change at sce_nets(version 0 API backward compatibility and sce_vnfs) - item_changed += self._update_rows('sce_nets', node, WHERE_) - item_changed += self._update_rows('sce_vnfs', node, WHERE_, modified_time=modified_time) - return item_changed - - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 + modified_time = time.time() + item_changed=0 + #check that scenario exist + tenant_id = scenario_dict.get('tenant_id') + scenario_uuid = scenario_dict['uuid'] + + where_text = "uuid='{}'".format(scenario_uuid) + if not tenant_id and tenant_id != "any": + where_text += " AND (tenant_id='{}' OR public='True')".format(tenant_id) + cmd = "SELECT * FROM scenarios WHERE "+ where_text + self.logger.debug(cmd) + self.cur.execute(cmd) + self.cur.fetchall() + if self.cur.rowcount==0: + raise db_base.db_base_Exception("No scenario found with this criteria " + where_text, httperrors.Bad_Request) + elif self.cur.rowcount>1: + raise db_base.db_base_Exception("More than one scenario found with this criteria " + where_text, httperrors.Bad_Request) + + #scenario + nodes = {} + topology = scenario_dict.pop("topology", None) + if topology != None and "nodes" in topology: + nodes = topology.get("nodes",{}) + UPDATE_ = {} + if "name" in scenario_dict: UPDATE_["name"] = scenario_dict["name"] + if "description" in scenario_dict: UPDATE_["description"] = scenario_dict["description"] + if len(UPDATE_)>0: + WHERE_={'tenant_id': tenant_id, 'uuid': scenario_uuid} + item_changed += self._update_rows('scenarios', UPDATE_, WHERE_, modified_time=modified_time) + #sce_nets + for node_id, node in nodes.items(): + if "graph" in node: + #node["graph"] = yaml.safe_dump(node["graph"],default_flow_style=True,width=256) + #TODO, must be json because of the GUI, change to yaml + node["graph"] = json.dumps(node["graph"]) + WHERE_={'scenario_id': scenario_uuid, 'uuid': node_id} + #Try to change at sce_nets(version 0 API backward compatibility and sce_vnfs) + item_changed += self._update_rows('sce_nets', node, WHERE_) + item_changed += self._update_rows('sce_vnfs', node, WHERE_, modified_time=modified_time) + return item_changed # def get_instance_scenario(self, instance_scenario_id, tenant_id=None): # '''Obtain the scenario instance information, filtering by one or serveral of the tenant, uuid or name @@ -516,8 +497,7 @@ class nfvo_db(db_base.db_base): # ''' # print "1******************************************************************" # try: -# with self.con: -# self.cur = self.con.cursor(mdb.cursors.DictCursor) +# with self.transaction(mdb.cursors.DictCursor): # #scenario table # where_list=[] # if tenant_id is not None: where_list.append( "tenant_id='" + tenant_id +"'" ) @@ -560,233 +540,222 @@ class nfvo_db(db_base.db_base): # print "nfvo_db.get_instance_scenario DB Exception %d: %s" % (e.args[0], e.args[1]) # return self._format_error(e) + @retry + @with_transaction(cursor='dict') def get_scenario(self, scenario_id, tenant_id=None, datacenter_vim_id=None, datacenter_id=None): '''Obtain the scenario information, filtering by one or serveral of the tenant, uuid or name scenario_id is the uuid or the name if it is not a valid uuid format if datacenter_vim_id,d datacenter_id is provided, it supply aditional vim_id fields with the matching vim uuid Only one scenario must mutch the filtering or an error is returned ''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - where_text = "uuid='{}'".format(scenario_id) - if not tenant_id and tenant_id != "any": - where_text += " AND (tenant_id='{}' OR public='True')".format(tenant_id) - cmd = "SELECT * FROM scenarios WHERE " + where_text - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - if self.cur.rowcount==0: - raise db_base.db_base_Exception("No scenario found with this criteria " + where_text, httperrors.Bad_Request) - elif self.cur.rowcount>1: - raise db_base.db_base_Exception("More than one scenario found with this criteria " + where_text, httperrors.Bad_Request) - scenario_dict = rows[0] - if scenario_dict["cloud_config"]: - scenario_dict["cloud-config"] = yaml.load(scenario_dict["cloud_config"]) - del scenario_dict["cloud_config"] - # sce_vnfs - cmd = "SELECT uuid,name,member_vnf_index,vnf_id,description FROM sce_vnfs WHERE scenario_id='{}' "\ - "ORDER BY created_at".format(scenario_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - scenario_dict['vnfs'] = self.cur.fetchall() - - for vnf in scenario_dict['vnfs']: - cmd = "SELECT mgmt_access FROM vnfs WHERE uuid='{}'".format(scenario_dict['vnfs'][0]['vnf_id']) - self.logger.debug(cmd) - self.cur.execute(cmd) - mgmt_access_dict = self.cur.fetchall() - if mgmt_access_dict[0].get('mgmt_access'): - vnf['mgmt_access'] = yaml.load(mgmt_access_dict[0]['mgmt_access']) - else: - vnf['mgmt_access'] = None - # sce_interfaces - cmd = "SELECT scei.uuid,scei.sce_net_id,scei.interface_id,i.external_name,scei.ip_address"\ - " FROM sce_interfaces as scei join interfaces as i on scei.interface_id=i.uuid"\ - " WHERE scei.sce_vnf_id='{}' ORDER BY scei.created_at".format(vnf['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - vnf['interfaces'] = self.cur.fetchall() - # vms - cmd = "SELECT vms.uuid as uuid, flavor_id, image_id, image_list, vms.name as name," \ - " vms.description as description, vms.boot_data as boot_data, count," \ - " vms.availability_zone as availability_zone, vms.osm_id as osm_id, vms.pdu_type" \ - " FROM vnfs join vms on vnfs.uuid=vms.vnf_id" \ - " WHERE vnfs.uuid='" + vnf['vnf_id'] + "'" \ - " ORDER BY vms.created_at" + where_text = "uuid='{}'".format(scenario_id) + if not tenant_id and tenant_id != "any": + where_text += " AND (tenant_id='{}' OR public='True')".format(tenant_id) + cmd = "SELECT * FROM scenarios WHERE " + where_text + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + if self.cur.rowcount==0: + raise db_base.db_base_Exception("No scenario found with this criteria " + where_text, httperrors.Bad_Request) + elif self.cur.rowcount>1: + raise db_base.db_base_Exception("More than one scenario found with this criteria " + where_text, httperrors.Bad_Request) + scenario_dict = rows[0] + if scenario_dict["cloud_config"]: + scenario_dict["cloud-config"] = yaml.load(scenario_dict["cloud_config"]) + del scenario_dict["cloud_config"] + # sce_vnfs + cmd = "SELECT uuid,name,member_vnf_index,vnf_id,description FROM sce_vnfs WHERE scenario_id='{}' "\ + "ORDER BY created_at".format(scenario_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + scenario_dict['vnfs'] = self.cur.fetchall() + + for vnf in scenario_dict['vnfs']: + cmd = "SELECT mgmt_access FROM vnfs WHERE uuid='{}'".format(scenario_dict['vnfs'][0]['vnf_id']) + self.logger.debug(cmd) + self.cur.execute(cmd) + mgmt_access_dict = self.cur.fetchall() + if mgmt_access_dict[0].get('mgmt_access'): + vnf['mgmt_access'] = yaml.load(mgmt_access_dict[0]['mgmt_access']) + else: + vnf['mgmt_access'] = None + # sce_interfaces + cmd = "SELECT scei.uuid,scei.sce_net_id,scei.interface_id,i.external_name,scei.ip_address"\ + " FROM sce_interfaces as scei join interfaces as i on scei.interface_id=i.uuid"\ + " WHERE scei.sce_vnf_id='{}' ORDER BY scei.created_at".format(vnf['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + vnf['interfaces'] = self.cur.fetchall() + # vms + cmd = "SELECT vms.uuid as uuid, flavor_id, image_id, image_list, vms.name as name," \ + " vms.description as description, vms.boot_data as boot_data, count," \ + " vms.availability_zone as availability_zone, vms.osm_id as osm_id, vms.pdu_type" \ + " FROM vnfs join vms on vnfs.uuid=vms.vnf_id" \ + " WHERE vnfs.uuid='" + vnf['vnf_id'] + "'" \ + " ORDER BY vms.created_at" + self.logger.debug(cmd) + self.cur.execute(cmd) + vnf['vms'] = self.cur.fetchall() + for vm in vnf['vms']: + if vm["boot_data"]: + vm["boot_data"] = yaml.safe_load(vm["boot_data"]) + else: + del vm["boot_data"] + if vm["image_list"]: + vm["image_list"] = yaml.safe_load(vm["image_list"]) + else: + del vm["image_list"] + if datacenter_vim_id!=None: + if vm['image_id']: + cmd = "SELECT vim_id FROM datacenters_images WHERE image_id='{}' AND " \ + "datacenter_vim_id='{}'".format(vm['image_id'], datacenter_vim_id) self.logger.debug(cmd) self.cur.execute(cmd) - vnf['vms'] = self.cur.fetchall() - for vm in vnf['vms']: - if vm["boot_data"]: - vm["boot_data"] = yaml.safe_load(vm["boot_data"]) - else: - del vm["boot_data"] - if vm["image_list"]: - vm["image_list"] = yaml.safe_load(vm["image_list"]) - else: - del vm["image_list"] - if datacenter_vim_id!=None: - if vm['image_id']: - cmd = "SELECT vim_id FROM datacenters_images WHERE image_id='{}' AND " \ - "datacenter_vim_id='{}'".format(vm['image_id'], datacenter_vim_id) - self.logger.debug(cmd) - self.cur.execute(cmd) - if self.cur.rowcount==1: - vim_image_dict = self.cur.fetchone() - vm['vim_image_id']=vim_image_dict['vim_id'] - if vm['flavor_id']: - cmd = "SELECT vim_id FROM datacenters_flavors WHERE flavor_id='{}' AND " \ - "datacenter_vim_id='{}'".format(vm['flavor_id'], datacenter_vim_id) - self.logger.debug(cmd) - self.cur.execute(cmd) - if self.cur.rowcount==1: - vim_flavor_dict = self.cur.fetchone() - vm['vim_flavor_id']=vim_flavor_dict['vim_id'] - - #interfaces - cmd = "SELECT uuid,internal_name,external_name,net_id,type,vpci,mac,bw,model,ip_address," \ - "floating_ip, port_security" \ - " FROM interfaces" \ - " WHERE vm_id='{}'" \ - " ORDER BY created_at".format(vm['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - vm['interfaces'] = self.cur.fetchall() - for iface in vm['interfaces']: - iface['port-security'] = iface.pop("port_security") - iface['floating-ip'] = iface.pop("floating_ip") - for sce_interface in vnf["interfaces"]: - if sce_interface["interface_id"] == iface["uuid"]: - if sce_interface["ip_address"]: - iface["ip_address"] = sce_interface["ip_address"] - break - #nets every net of a vms - cmd = "SELECT uuid,name,type,description, osm_id FROM nets WHERE vnf_id='{}'".format(vnf['vnf_id']) + if self.cur.rowcount==1: + vim_image_dict = self.cur.fetchone() + vm['vim_image_id']=vim_image_dict['vim_id'] + if vm['flavor_id']: + cmd = "SELECT vim_id FROM datacenters_flavors WHERE flavor_id='{}' AND " \ + "datacenter_vim_id='{}'".format(vm['flavor_id'], datacenter_vim_id) self.logger.debug(cmd) self.cur.execute(cmd) - vnf['nets'] = self.cur.fetchall() - for vnf_net in vnf['nets']: - SELECT_ = "ip_version,subnet_address,gateway_address,dns_address,dhcp_enabled,dhcp_start_address,dhcp_count" - cmd = "SELECT {} FROM ip_profiles WHERE net_id='{}'".format(SELECT_,vnf_net['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - ipprofiles = self.cur.fetchall() - if self.cur.rowcount==1: - vnf_net["ip_profile"] = ipprofiles[0] - elif self.cur.rowcount>1: - raise db_base.db_base_Exception("More than one ip-profile found with this criteria: net_id='{}'".format(vnf_net['uuid']), httperrors.Bad_Request) - - #sce_nets - cmd = "SELECT uuid,name,type,external,description,vim_network_name, osm_id" \ - " FROM sce_nets WHERE scenario_id='{}'" \ - " ORDER BY created_at ".format(scenario_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - scenario_dict['nets'] = self.cur.fetchall() - #datacenter_nets - for net in scenario_dict['nets']: - if str(net['external']) == 'false': - SELECT_ = "ip_version,subnet_address,gateway_address,dns_address,dhcp_enabled,dhcp_start_address,dhcp_count" - cmd = "SELECT {} FROM ip_profiles WHERE sce_net_id='{}'".format(SELECT_,net['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - ipprofiles = self.cur.fetchall() - if self.cur.rowcount==1: - net["ip_profile"] = ipprofiles[0] - elif self.cur.rowcount>1: - raise db_base.db_base_Exception("More than one ip-profile found with this criteria: sce_net_id='{}'".format(net['uuid']), httperrors.Bad_Request) - continue - WHERE_=" WHERE name='{}'".format(net['name']) - if datacenter_id!=None: - WHERE_ += " AND datacenter_id='{}'".format(datacenter_id) - cmd = "SELECT vim_net_id FROM datacenter_nets" + WHERE_ - self.logger.debug(cmd) - self.cur.execute(cmd) - d_net = self.cur.fetchone() - if d_net==None or datacenter_vim_id==None: - #print "nfvo_db.get_scenario() WARNING external net %s not found" % net['name'] - net['vim_id']=None - else: - net['vim_id']=d_net['vim_net_id'] - - db_base._convert_datetime2str(scenario_dict) - db_base._convert_str2boolean(scenario_dict, ('public','shared','external','port-security','floating-ip') ) - - #forwarding graphs - cmd = "SELECT uuid,name,description,vendor FROM sce_vnffgs WHERE scenario_id='{}' "\ - "ORDER BY created_at".format(scenario_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - scenario_dict['vnffgs'] = self.cur.fetchall() - for vnffg in scenario_dict['vnffgs']: - cmd = "SELECT uuid,name FROM sce_rsps WHERE sce_vnffg_id='{}' "\ - "ORDER BY created_at".format(vnffg['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - vnffg['rsps'] = self.cur.fetchall() - for rsp in vnffg['rsps']: - cmd = "SELECT uuid,if_order,ingress_interface_id,egress_interface_id,sce_vnf_id " \ - "FROM sce_rsp_hops WHERE sce_rsp_id='{}' "\ - "ORDER BY created_at".format(rsp['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - rsp['connection_points'] = self.cur.fetchall(); - cmd = "SELECT uuid,name,sce_vnf_id,interface_id FROM sce_classifiers WHERE sce_vnffg_id='{}' "\ - "AND sce_rsp_id='{}' ORDER BY created_at".format(vnffg['uuid'], rsp['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - rsp['classifier'] = self.cur.fetchone(); - cmd = "SELECT uuid,ip_proto,source_ip,destination_ip,source_port,destination_port FROM sce_classifier_matches "\ - "WHERE sce_classifier_id='{}' ORDER BY created_at".format(rsp['classifier']['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - rsp['classifier']['matches'] = self.cur.fetchall() - - return scenario_dict - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - + if self.cur.rowcount==1: + vim_flavor_dict = self.cur.fetchone() + vm['vim_flavor_id']=vim_flavor_dict['vim_id'] + + #interfaces + cmd = "SELECT uuid,internal_name,external_name,net_id,type,vpci,mac,bw,model,ip_address," \ + "floating_ip, port_security" \ + " FROM interfaces" \ + " WHERE vm_id='{}'" \ + " ORDER BY created_at".format(vm['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + vm['interfaces'] = self.cur.fetchall() + for iface in vm['interfaces']: + iface['port-security'] = iface.pop("port_security") + iface['floating-ip'] = iface.pop("floating_ip") + for sce_interface in vnf["interfaces"]: + if sce_interface["interface_id"] == iface["uuid"]: + if sce_interface["ip_address"]: + iface["ip_address"] = sce_interface["ip_address"] + break + #nets every net of a vms + cmd = "SELECT uuid,name,type,description, osm_id FROM nets WHERE vnf_id='{}'".format(vnf['vnf_id']) + self.logger.debug(cmd) + self.cur.execute(cmd) + vnf['nets'] = self.cur.fetchall() + for vnf_net in vnf['nets']: + SELECT_ = "ip_version,subnet_address,gateway_address,dns_address,dhcp_enabled,dhcp_start_address,dhcp_count" + cmd = "SELECT {} FROM ip_profiles WHERE net_id='{}'".format(SELECT_,vnf_net['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + ipprofiles = self.cur.fetchall() + if self.cur.rowcount==1: + vnf_net["ip_profile"] = ipprofiles[0] + elif self.cur.rowcount>1: + raise db_base.db_base_Exception("More than one ip-profile found with this criteria: net_id='{}'".format(vnf_net['uuid']), httperrors.Bad_Request) + + #sce_nets + cmd = "SELECT uuid,name,type,external,description,vim_network_name, osm_id" \ + " FROM sce_nets WHERE scenario_id='{}'" \ + " ORDER BY created_at ".format(scenario_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + scenario_dict['nets'] = self.cur.fetchall() + #datacenter_nets + for net in scenario_dict['nets']: + if str(net['external']) == 'false': + SELECT_ = "ip_version,subnet_address,gateway_address,dns_address,dhcp_enabled,dhcp_start_address,dhcp_count" + cmd = "SELECT {} FROM ip_profiles WHERE sce_net_id='{}'".format(SELECT_,net['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + ipprofiles = self.cur.fetchall() + if self.cur.rowcount==1: + net["ip_profile"] = ipprofiles[0] + elif self.cur.rowcount>1: + raise db_base.db_base_Exception("More than one ip-profile found with this criteria: sce_net_id='{}'".format(net['uuid']), httperrors.Bad_Request) + continue + WHERE_=" WHERE name='{}'".format(net['name']) + if datacenter_id!=None: + WHERE_ += " AND datacenter_id='{}'".format(datacenter_id) + cmd = "SELECT vim_net_id FROM datacenter_nets" + WHERE_ + self.logger.debug(cmd) + self.cur.execute(cmd) + d_net = self.cur.fetchone() + if d_net==None or datacenter_vim_id==None: + #print "nfvo_db.get_scenario() WARNING external net %s not found" % net['name'] + net['vim_id']=None + else: + net['vim_id']=d_net['vim_net_id'] + + db_base._convert_datetime2str(scenario_dict) + db_base._convert_str2boolean(scenario_dict, ('public','shared','external','port-security','floating-ip') ) + + #forwarding graphs + cmd = "SELECT uuid,name,description,vendor FROM sce_vnffgs WHERE scenario_id='{}' "\ + "ORDER BY created_at".format(scenario_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + scenario_dict['vnffgs'] = self.cur.fetchall() + for vnffg in scenario_dict['vnffgs']: + cmd = "SELECT uuid,name FROM sce_rsps WHERE sce_vnffg_id='{}' "\ + "ORDER BY created_at".format(vnffg['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + vnffg['rsps'] = self.cur.fetchall() + for rsp in vnffg['rsps']: + cmd = "SELECT uuid,if_order,ingress_interface_id,egress_interface_id,sce_vnf_id " \ + "FROM sce_rsp_hops WHERE sce_rsp_id='{}' "\ + "ORDER BY created_at".format(rsp['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + rsp['connection_points'] = self.cur.fetchall(); + cmd = "SELECT uuid,name,sce_vnf_id,interface_id FROM sce_classifiers WHERE sce_vnffg_id='{}' "\ + "AND sce_rsp_id='{}' ORDER BY created_at".format(vnffg['uuid'], rsp['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + rsp['classifier'] = self.cur.fetchone(); + cmd = "SELECT uuid,ip_proto,source_ip,destination_ip,source_port,destination_port FROM sce_classifier_matches "\ + "WHERE sce_classifier_id='{}' ORDER BY created_at".format(rsp['classifier']['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + rsp['classifier']['matches'] = self.cur.fetchall() + + return scenario_dict + + @retry(command="delete", extra="instances running") + @with_transaction(cursor='dict') def delete_scenario(self, scenario_id, tenant_id=None): '''Deletes a scenario, filtering by one or several of the tenant, uuid or name scenario_id is the uuid or the name if it is not a valid uuid format Only one scenario must mutch the filtering or an error is returned ''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - - #scenario table - where_text = "uuid='{}'".format(scenario_id) - if not tenant_id and tenant_id != "any": - where_text += " AND (tenant_id='{}' OR public='True')".format(tenant_id) - cmd = "SELECT * FROM scenarios WHERE "+ where_text - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - if self.cur.rowcount==0: - raise db_base.db_base_Exception("No scenario found where " + where_text, httperrors.Not_Found) - elif self.cur.rowcount>1: - raise db_base.db_base_Exception("More than one scenario found where " + where_text, httperrors.Conflict) - scenario_uuid = rows[0]["uuid"] - scenario_name = rows[0]["name"] - - #sce_vnfs - cmd = "DELETE FROM scenarios WHERE uuid='{}'".format(scenario_uuid) - self.logger.debug(cmd) - self.cur.execute(cmd) - - return scenario_uuid + " " + scenario_name - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, "delete", "instances running") - tries -= 1 - - def new_rows(self, tables, uuid_list=None, confidential_data=False): + #scenario table + where_text = "uuid='{}'".format(scenario_id) + if not tenant_id and tenant_id != "any": + where_text += " AND (tenant_id='{}' OR public='True')".format(tenant_id) + cmd = "SELECT * FROM scenarios WHERE "+ where_text + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + if self.cur.rowcount==0: + raise db_base.db_base_Exception("No scenario found where " + where_text, httperrors.Not_Found) + elif self.cur.rowcount>1: + raise db_base.db_base_Exception("More than one scenario found where " + where_text, httperrors.Conflict) + scenario_uuid = rows[0]["uuid"] + scenario_name = rows[0]["name"] + + #sce_vnfs + cmd = "DELETE FROM scenarios WHERE uuid='{}'".format(scenario_uuid) + self.logger.debug(cmd) + self.cur.execute(cmd) + + return scenario_uuid + " " + scenario_name + + @retry + @with_transaction + def new_rows(self, tables, uuid_list=None, confidential_data=False, attempt=_ATTEMPT): """ Make a transactional insertion of rows at several tables. Can be also a deletion :param tables: list with dictionary where the keys are the table names and the values are a row or row list @@ -803,350 +772,324 @@ class nfvo_db(db_base.db_base): :param uuid_list: list of created uuids, first one is the root (#TODO to store at uuid table) :return: None if success, raise exception otherwise """ - tries = 2 table_name = None - while tries: - created_time = time.time() - try: - with self.con: - self.cur = self.con.cursor() - for table in tables: - for table_name, row_list in table.items(): - index = 0 - if isinstance(row_list, dict): - row_list = (row_list, ) #create a list with the single value - for row in row_list: - if "TO-DELETE" in row: - self._delete_row_by_id_internal(table_name, row["TO-DELETE"]) - continue - - if table_name in self.tables_with_created_field: - if "created_at" in row: - created_time_param = created_time + (index + row.pop("created_at"))*0.00001 - else: - created_time_param = created_time + index*0.00001 - index += 1 - else: - created_time_param = 0 - self._new_row_internal(table_name, row, add_uuid=False, root_uuid=None, - confidential_data=confidential_data, - created_time=created_time_param) - return - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, table=table_name) - tries -= 1 + created_time = time.time() + for table in tables: + for table_name, row_list in table.items(): + index = 0 + attempt.info['table'] = table_name + if isinstance(row_list, dict): + row_list = (row_list, ) #create a list with the single value + for row in row_list: + if "TO-DELETE" in row: + self._delete_row_by_id_internal(table_name, row["TO-DELETE"]) + continue + if table_name in self.tables_with_created_field: + if "created_at" in row: + created_time_param = created_time + (index + row.pop("created_at"))*0.00001 + else: + created_time_param = created_time + index*0.00001 + index += 1 + else: + created_time_param = 0 + self._new_row_internal(table_name, row, add_uuid=False, root_uuid=None, + confidential_data=confidential_data, + created_time=created_time_param) + @retry + @with_transaction def new_instance_scenario_as_a_whole(self,tenant_id,instance_scenario_name,instance_scenario_description,scenarioDict): - tries = 2 - while tries: - created_time = time.time() - try: - with self.con: - self.cur = self.con.cursor() - #instance_scenarios - datacenter_id = scenarioDict['datacenter_id'] - INSERT_={'tenant_id': tenant_id, - 'datacenter_tenant_id': scenarioDict["datacenter2tenant"][datacenter_id], - 'name': instance_scenario_name, - 'description': instance_scenario_description, - 'scenario_id' : scenarioDict['uuid'], - 'datacenter_id': datacenter_id - } - if scenarioDict.get("cloud-config"): - INSERT_["cloud_config"] = yaml.safe_dump(scenarioDict["cloud-config"], default_flow_style=True, width=256) - - instance_uuid = self._new_row_internal('instance_scenarios', INSERT_, add_uuid=True, root_uuid=None, created_time=created_time) - - net_scene2instance={} - #instance_nets #nets interVNF - for net in scenarioDict['nets']: - net_scene2instance[ net['uuid'] ] ={} - datacenter_site_id = net.get('datacenter_id', datacenter_id) - if not "vim_id_sites" in net: - net["vim_id_sites"] ={datacenter_site_id: net['vim_id']} - net["vim_id_sites"]["datacenter_site_id"] = {datacenter_site_id: net['vim_id']} - sce_net_id = net.get("uuid") - - for datacenter_site_id,vim_id in net["vim_id_sites"].iteritems(): - INSERT_={'vim_net_id': vim_id, 'created': net.get('created', False), 'instance_scenario_id':instance_uuid } #, 'type': net['type'] - INSERT_['datacenter_id'] = datacenter_site_id - INSERT_['datacenter_tenant_id'] = scenarioDict["datacenter2tenant"][datacenter_site_id] - if not net.get('created', False): - INSERT_['status'] = "ACTIVE" - if sce_net_id: - INSERT_['sce_net_id'] = sce_net_id - created_time += 0.00001 - instance_net_uuid = self._new_row_internal('instance_nets', INSERT_, True, instance_uuid, created_time) - net_scene2instance[ sce_net_id ][datacenter_site_id] = instance_net_uuid - net['uuid'] = instance_net_uuid #overwrite scnario uuid by instance uuid - - if 'ip_profile' in net: - net['ip_profile']['net_id'] = None - net['ip_profile']['sce_net_id'] = None - net['ip_profile']['instance_net_id'] = instance_net_uuid - created_time += 0.00001 - ip_profile_id = self._new_row_internal('ip_profiles', net['ip_profile']) - - #instance_vnfs - for vnf in scenarioDict['vnfs']: - datacenter_site_id = vnf.get('datacenter_id', datacenter_id) - INSERT_={'instance_scenario_id': instance_uuid, 'vnf_id': vnf['vnf_id'] } - INSERT_['datacenter_id'] = datacenter_site_id - INSERT_['datacenter_tenant_id'] = scenarioDict["datacenter2tenant"][datacenter_site_id] - if vnf.get("uuid"): - INSERT_['sce_vnf_id'] = vnf['uuid'] - created_time += 0.00001 - instance_vnf_uuid = self._new_row_internal('instance_vnfs', INSERT_, True, instance_uuid, created_time) - vnf['uuid'] = instance_vnf_uuid #overwrite scnario uuid by instance uuid - - #instance_nets #nets intraVNF - for net in vnf['nets']: - net_scene2instance[ net['uuid'] ] = {} - INSERT_={'vim_net_id': net['vim_id'], 'created': net.get('created', False), 'instance_scenario_id':instance_uuid } #, 'type': net['type'] - INSERT_['datacenter_id'] = net.get('datacenter_id', datacenter_site_id) - INSERT_['datacenter_tenant_id'] = scenarioDict["datacenter2tenant"][datacenter_id] - if net.get("uuid"): - INSERT_['net_id'] = net['uuid'] - created_time += 0.00001 - instance_net_uuid = self._new_row_internal('instance_nets', INSERT_, True, instance_uuid, created_time) - net_scene2instance[ net['uuid'] ][datacenter_site_id] = instance_net_uuid - net['uuid'] = instance_net_uuid #overwrite scnario uuid by instance uuid - - if 'ip_profile' in net: - net['ip_profile']['net_id'] = None - net['ip_profile']['sce_net_id'] = None - net['ip_profile']['instance_net_id'] = instance_net_uuid - created_time += 0.00001 - ip_profile_id = self._new_row_internal('ip_profiles', net['ip_profile']) - - #instance_vms - for vm in vnf['vms']: - INSERT_={'instance_vnf_id': instance_vnf_uuid, 'vm_id': vm['uuid'], 'vim_vm_id': vm['vim_id'] } - created_time += 0.00001 - instance_vm_uuid = self._new_row_internal('instance_vms', INSERT_, True, instance_uuid, created_time) - vm['uuid'] = instance_vm_uuid #overwrite scnario uuid by instance uuid - - #instance_interfaces - for interface in vm['interfaces']: - net_id = interface.get('net_id', None) - if net_id is None: - #check if is connected to a inter VNFs net - for iface in vnf['interfaces']: - if iface['interface_id'] == interface['uuid']: - if 'ip_address' in iface: - interface['ip_address'] = iface['ip_address'] - net_id = iface.get('sce_net_id', None) - break - if net_id is None: - continue - interface_type='external' if interface['external_name'] is not None else 'internal' - INSERT_={'instance_vm_id': instance_vm_uuid, 'instance_net_id': net_scene2instance[net_id][datacenter_site_id], - 'interface_id': interface['uuid'], 'vim_interface_id': interface.get('vim_id'), 'type': interface_type, - 'ip_address': interface.get('ip_address'), 'floating_ip': int(interface.get('floating-ip',False)), - 'port_security': int(interface.get('port-security',True))} - #created_time += 0.00001 - interface_uuid = self._new_row_internal('instance_interfaces', INSERT_, True, instance_uuid) #, created_time) - interface['uuid'] = interface_uuid #overwrite scnario uuid by instance uuid - return instance_uuid - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - + created_time = time.time() + #instance_scenarios + datacenter_id = scenarioDict['datacenter_id'] + INSERT_={'tenant_id': tenant_id, + 'datacenter_tenant_id': scenarioDict["datacenter2tenant"][datacenter_id], + 'name': instance_scenario_name, + 'description': instance_scenario_description, + 'scenario_id' : scenarioDict['uuid'], + 'datacenter_id': datacenter_id + } + if scenarioDict.get("cloud-config"): + INSERT_["cloud_config"] = yaml.safe_dump(scenarioDict["cloud-config"], default_flow_style=True, width=256) + + instance_uuid = self._new_row_internal('instance_scenarios', INSERT_, add_uuid=True, root_uuid=None, created_time=created_time) + + net_scene2instance={} + #instance_nets #nets interVNF + for net in scenarioDict['nets']: + net_scene2instance[ net['uuid'] ] ={} + datacenter_site_id = net.get('datacenter_id', datacenter_id) + if not "vim_id_sites" in net: + net["vim_id_sites"] ={datacenter_site_id: net['vim_id']} + net["vim_id_sites"]["datacenter_site_id"] = {datacenter_site_id: net['vim_id']} + sce_net_id = net.get("uuid") + + for datacenter_site_id,vim_id in net["vim_id_sites"].iteritems(): + INSERT_={'vim_net_id': vim_id, 'created': net.get('created', False), 'instance_scenario_id':instance_uuid } #, 'type': net['type'] + INSERT_['datacenter_id'] = datacenter_site_id + INSERT_['datacenter_tenant_id'] = scenarioDict["datacenter2tenant"][datacenter_site_id] + if not net.get('created', False): + INSERT_['status'] = "ACTIVE" + if sce_net_id: + INSERT_['sce_net_id'] = sce_net_id + created_time += 0.00001 + instance_net_uuid = self._new_row_internal('instance_nets', INSERT_, True, instance_uuid, created_time) + net_scene2instance[ sce_net_id ][datacenter_site_id] = instance_net_uuid + net['uuid'] = instance_net_uuid #overwrite scnario uuid by instance uuid + + if 'ip_profile' in net: + net['ip_profile']['net_id'] = None + net['ip_profile']['sce_net_id'] = None + net['ip_profile']['instance_net_id'] = instance_net_uuid + created_time += 0.00001 + ip_profile_id = self._new_row_internal('ip_profiles', net['ip_profile']) + + #instance_vnfs + for vnf in scenarioDict['vnfs']: + datacenter_site_id = vnf.get('datacenter_id', datacenter_id) + INSERT_={'instance_scenario_id': instance_uuid, 'vnf_id': vnf['vnf_id'] } + INSERT_['datacenter_id'] = datacenter_site_id + INSERT_['datacenter_tenant_id'] = scenarioDict["datacenter2tenant"][datacenter_site_id] + if vnf.get("uuid"): + INSERT_['sce_vnf_id'] = vnf['uuid'] + created_time += 0.00001 + instance_vnf_uuid = self._new_row_internal('instance_vnfs', INSERT_, True, instance_uuid, created_time) + vnf['uuid'] = instance_vnf_uuid #overwrite scnario uuid by instance uuid + + #instance_nets #nets intraVNF + for net in vnf['nets']: + net_scene2instance[ net['uuid'] ] = {} + INSERT_={'vim_net_id': net['vim_id'], 'created': net.get('created', False), 'instance_scenario_id':instance_uuid } #, 'type': net['type'] + INSERT_['datacenter_id'] = net.get('datacenter_id', datacenter_site_id) + INSERT_['datacenter_tenant_id'] = scenarioDict["datacenter2tenant"][datacenter_id] + if net.get("uuid"): + INSERT_['net_id'] = net['uuid'] + created_time += 0.00001 + instance_net_uuid = self._new_row_internal('instance_nets', INSERT_, True, instance_uuid, created_time) + net_scene2instance[ net['uuid'] ][datacenter_site_id] = instance_net_uuid + net['uuid'] = instance_net_uuid #overwrite scnario uuid by instance uuid + + if 'ip_profile' in net: + net['ip_profile']['net_id'] = None + net['ip_profile']['sce_net_id'] = None + net['ip_profile']['instance_net_id'] = instance_net_uuid + created_time += 0.00001 + ip_profile_id = self._new_row_internal('ip_profiles', net['ip_profile']) + + #instance_vms + for vm in vnf['vms']: + INSERT_={'instance_vnf_id': instance_vnf_uuid, 'vm_id': vm['uuid'], 'vim_vm_id': vm['vim_id'] } + created_time += 0.00001 + instance_vm_uuid = self._new_row_internal('instance_vms', INSERT_, True, instance_uuid, created_time) + vm['uuid'] = instance_vm_uuid #overwrite scnario uuid by instance uuid + + #instance_interfaces + for interface in vm['interfaces']: + net_id = interface.get('net_id', None) + if net_id is None: + #check if is connected to a inter VNFs net + for iface in vnf['interfaces']: + if iface['interface_id'] == interface['uuid']: + if 'ip_address' in iface: + interface['ip_address'] = iface['ip_address'] + net_id = iface.get('sce_net_id', None) + break + if net_id is None: + continue + interface_type='external' if interface['external_name'] is not None else 'internal' + INSERT_={'instance_vm_id': instance_vm_uuid, 'instance_net_id': net_scene2instance[net_id][datacenter_site_id], + 'interface_id': interface['uuid'], 'vim_interface_id': interface.get('vim_id'), 'type': interface_type, + 'ip_address': interface.get('ip_address'), 'floating_ip': int(interface.get('floating-ip',False)), + 'port_security': int(interface.get('port-security',True))} + #created_time += 0.00001 + interface_uuid = self._new_row_internal('instance_interfaces', INSERT_, True, instance_uuid) #, created_time) + interface['uuid'] = interface_uuid #overwrite scnario uuid by instance uuid + return instance_uuid + + @retry + @with_transaction(cursor='dict') def get_instance_scenario(self, instance_id, tenant_id=None, verbose=False): '''Obtain the instance information, filtering by one or several of the tenant, uuid or name instance_id is the uuid or the name if it is not a valid uuid format Only one instance must mutch the filtering or an error is returned ''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - # instance table - where_list = [] - if tenant_id: - where_list.append("inst.tenant_id='{}'".format(tenant_id)) - if db_base._check_valid_uuid(instance_id): - where_list.append("inst.uuid='{}'".format(instance_id)) - else: - where_list.append("inst.name='{}'".format(instance_id)) - where_text = " AND ".join(where_list) - cmd = "SELECT inst.uuid as uuid, inst.name as name, inst.scenario_id as scenario_id, datacenter_id"\ - " ,datacenter_tenant_id, s.name as scenario_name,inst.tenant_id as tenant_id" \ - " ,inst.description as description, inst.created_at as created_at" \ - " ,inst.cloud_config as cloud_config, s.osm_id as nsd_osm_id" \ - " FROM instance_scenarios as inst left join scenarios as s on inst.scenario_id=s.uuid" \ - " WHERE " + where_text - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - - if self.cur.rowcount == 0: - raise db_base.db_base_Exception("No instance found where " + where_text, httperrors.Not_Found) - elif self.cur.rowcount > 1: - raise db_base.db_base_Exception("More than one instance found where " + where_text, - httperrors.Bad_Request) - instance_dict = rows[0] - if instance_dict["cloud_config"]: - instance_dict["cloud-config"] = yaml.load(instance_dict["cloud_config"]) - del instance_dict["cloud_config"] - - # instance_vnfs - cmd = "SELECT iv.uuid as uuid, iv.vnf_id as vnf_id, sv.name as vnf_name, sce_vnf_id, datacenter_id"\ - ", datacenter_tenant_id, v.mgmt_access, sv.member_vnf_index, v.osm_id as vnfd_osm_id "\ - "FROM instance_vnfs as iv left join sce_vnfs as sv "\ - " on iv.sce_vnf_id=sv.uuid join vnfs as v on iv.vnf_id=v.uuid " \ - "WHERE iv.instance_scenario_id='{}' " \ - "ORDER BY iv.created_at ".format(instance_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - instance_dict['vnfs'] = self.cur.fetchall() - for vnf in instance_dict['vnfs']: - vnf["ip_address"] = None - vnf_mgmt_access_iface = None - vnf_mgmt_access_vm = None - if vnf["mgmt_access"]: - vnf_mgmt_access = yaml.load(vnf["mgmt_access"]) - vnf_mgmt_access_iface = vnf_mgmt_access.get("interface_id") - vnf_mgmt_access_vm = vnf_mgmt_access.get("vm_id") - vnf["ip_address"] = vnf_mgmt_access.get("ip-address") - - # instance vms - cmd = "SELECT iv.uuid as uuid, vim_vm_id, status, error_msg, vim_info, iv.created_at as "\ - "created_at, name, vms.osm_id as vdu_osm_id, vim_name, vms.uuid as vm_uuid"\ - " FROM instance_vms as iv join vms on iv.vm_id=vms.uuid "\ - " WHERE instance_vnf_id='{}' ORDER BY iv.created_at".format(vnf['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - vnf['vms'] = self.cur.fetchall() - for vm in vnf['vms']: - vm_manage_iface_list=[] - # instance_interfaces - cmd = "SELECT vim_interface_id, instance_net_id, internal_name,external_name, mac_address,"\ - " ii.ip_address as ip_address, vim_info, i.type as type, sdn_port_id, i.uuid"\ - " FROM instance_interfaces as ii join interfaces as i on ii.interface_id=i.uuid"\ - " WHERE instance_vm_id='{}' ORDER BY created_at".format(vm['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd ) - vm['interfaces'] = self.cur.fetchall() - for iface in vm['interfaces']: - if vnf_mgmt_access_iface and vnf_mgmt_access_iface == iface["uuid"]: - if not vnf["ip_address"]: - vnf["ip_address"] = iface["ip_address"] - if iface["type"] == "mgmt" and iface["ip_address"]: - vm_manage_iface_list.append(iface["ip_address"]) - if not verbose: - del iface["type"] - del iface["uuid"] - if vm_manage_iface_list: - vm["ip_address"] = ",".join(vm_manage_iface_list) - if not vnf["ip_address"] and vnf_mgmt_access_vm == vm["vm_uuid"]: - vnf["ip_address"] = vm["ip_address"] - del vm["vm_uuid"] - - #instance_nets - #select_text = "instance_nets.uuid as uuid,sce_nets.name as net_name,instance_nets.vim_net_id as net_id,instance_nets.status as status,instance_nets.external as external" - #from_text = "instance_nets join instance_scenarios on instance_nets.instance_scenario_id=instance_scenarios.uuid " + \ - # "join sce_nets on instance_scenarios.scenario_id=sce_nets.scenario_id" - #where_text = "instance_nets.instance_scenario_id='"+ instance_dict['uuid'] + "'" - cmd = "SELECT inets.uuid as uuid,vim_net_id,status,error_msg,vim_info,created, sce_net_id, " \ - "net_id as vnf_net_id, datacenter_id, datacenter_tenant_id, sdn_net_id, " \ - "snets.osm_id as ns_net_osm_id, nets.osm_id as vnf_net_osm_id, inets.vim_name " \ - "FROM instance_nets as inets left join sce_nets as snets on inets.sce_net_id=snets.uuid " \ - "left join nets on inets.net_id=nets.uuid " \ - "WHERE instance_scenario_id='{}' ORDER BY inets.created_at".format(instance_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - instance_dict['nets'] = self.cur.fetchall() - - #instance_sfps - cmd = "SELECT uuid,vim_sfp_id,sce_rsp_id,datacenter_id,"\ - "datacenter_tenant_id,status,error_msg,vim_info"\ - " FROM instance_sfps" \ - " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - instance_dict['sfps'] = self.cur.fetchall() - - # for sfp in instance_dict['sfps']: - #instance_sfs - cmd = "SELECT uuid,vim_sf_id,sce_rsp_hop_id,datacenter_id,"\ - "datacenter_tenant_id,status,error_msg,vim_info"\ - " FROM instance_sfs" \ - " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) # TODO: replace instance_scenario_id with instance_sfp_id - self.logger.debug(cmd) - self.cur.execute(cmd) - instance_dict['sfs'] = self.cur.fetchall() - - #for sf in instance_dict['sfs']: - #instance_sfis - cmd = "SELECT uuid,vim_sfi_id,sce_rsp_hop_id,datacenter_id,"\ - "datacenter_tenant_id,status,error_msg,vim_info"\ - " FROM instance_sfis" \ - " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) # TODO: replace instance_scenario_id with instance_sf_id - self.logger.debug(cmd) - self.cur.execute(cmd) - instance_dict['sfis'] = self.cur.fetchall() + # instance table + where_list = [] + if tenant_id: + where_list.append("inst.tenant_id='{}'".format(tenant_id)) + if db_base._check_valid_uuid(instance_id): + where_list.append("inst.uuid='{}'".format(instance_id)) + else: + where_list.append("inst.name='{}'".format(instance_id)) + where_text = " AND ".join(where_list) + cmd = "SELECT inst.uuid as uuid, inst.name as name, inst.scenario_id as scenario_id, datacenter_id"\ + " ,datacenter_tenant_id, s.name as scenario_name,inst.tenant_id as tenant_id" \ + " ,inst.description as description, inst.created_at as created_at" \ + " ,inst.cloud_config as cloud_config, s.osm_id as nsd_osm_id" \ + " FROM instance_scenarios as inst left join scenarios as s on inst.scenario_id=s.uuid" \ + " WHERE " + where_text + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + + if self.cur.rowcount == 0: + raise db_base.db_base_Exception("No instance found where " + where_text, httperrors.Not_Found) + elif self.cur.rowcount > 1: + raise db_base.db_base_Exception("More than one instance found where " + where_text, + httperrors.Bad_Request) + instance_dict = rows[0] + if instance_dict["cloud_config"]: + instance_dict["cloud-config"] = yaml.load(instance_dict["cloud_config"]) + del instance_dict["cloud_config"] + + # instance_vnfs + cmd = "SELECT iv.uuid as uuid, iv.vnf_id as vnf_id, sv.name as vnf_name, sce_vnf_id, datacenter_id"\ + ", datacenter_tenant_id, v.mgmt_access, sv.member_vnf_index, v.osm_id as vnfd_osm_id "\ + "FROM instance_vnfs as iv left join sce_vnfs as sv "\ + " on iv.sce_vnf_id=sv.uuid join vnfs as v on iv.vnf_id=v.uuid " \ + "WHERE iv.instance_scenario_id='{}' " \ + "ORDER BY iv.created_at ".format(instance_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + instance_dict['vnfs'] = self.cur.fetchall() + for vnf in instance_dict['vnfs']: + vnf["ip_address"] = None + vnf_mgmt_access_iface = None + vnf_mgmt_access_vm = None + if vnf["mgmt_access"]: + vnf_mgmt_access = yaml.load(vnf["mgmt_access"]) + vnf_mgmt_access_iface = vnf_mgmt_access.get("interface_id") + vnf_mgmt_access_vm = vnf_mgmt_access.get("vm_id") + vnf["ip_address"] = vnf_mgmt_access.get("ip-address") + + # instance vms + cmd = "SELECT iv.uuid as uuid, vim_vm_id, status, error_msg, vim_info, iv.created_at as "\ + "created_at, name, vms.osm_id as vdu_osm_id, vim_name, vms.uuid as vm_uuid"\ + " FROM instance_vms as iv join vms on iv.vm_id=vms.uuid "\ + " WHERE instance_vnf_id='{}' ORDER BY iv.created_at".format(vnf['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + vnf['vms'] = self.cur.fetchall() + for vm in vnf['vms']: + vm_manage_iface_list=[] + # instance_interfaces + cmd = "SELECT vim_interface_id, instance_net_id, internal_name,external_name, mac_address,"\ + " ii.ip_address as ip_address, vim_info, i.type as type, sdn_port_id, i.uuid"\ + " FROM instance_interfaces as ii join interfaces as i on ii.interface_id=i.uuid"\ + " WHERE instance_vm_id='{}' ORDER BY created_at".format(vm['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd ) + vm['interfaces'] = self.cur.fetchall() + for iface in vm['interfaces']: + if vnf_mgmt_access_iface and vnf_mgmt_access_iface == iface["uuid"]: + if not vnf["ip_address"]: + vnf["ip_address"] = iface["ip_address"] + if iface["type"] == "mgmt" and iface["ip_address"]: + vm_manage_iface_list.append(iface["ip_address"]) + if not verbose: + del iface["type"] + del iface["uuid"] + if vm_manage_iface_list: + vm["ip_address"] = ",".join(vm_manage_iface_list) + if not vnf["ip_address"] and vnf_mgmt_access_vm == vm["vm_uuid"]: + vnf["ip_address"] = vm["ip_address"] + del vm["vm_uuid"] + + #instance_nets + #select_text = "instance_nets.uuid as uuid,sce_nets.name as net_name,instance_nets.vim_net_id as net_id,instance_nets.status as status,instance_nets.external as external" + #from_text = "instance_nets join instance_scenarios on instance_nets.instance_scenario_id=instance_scenarios.uuid " + \ + # "join sce_nets on instance_scenarios.scenario_id=sce_nets.scenario_id" + #where_text = "instance_nets.instance_scenario_id='"+ instance_dict['uuid'] + "'" + cmd = "SELECT inets.uuid as uuid,vim_net_id,status,error_msg,vim_info,created, sce_net_id, " \ + "net_id as vnf_net_id, datacenter_id, datacenter_tenant_id, sdn_net_id, " \ + "snets.osm_id as ns_net_osm_id, nets.osm_id as vnf_net_osm_id, inets.vim_name " \ + "FROM instance_nets as inets left join sce_nets as snets on inets.sce_net_id=snets.uuid " \ + "left join nets on inets.net_id=nets.uuid " \ + "WHERE instance_scenario_id='{}' ORDER BY inets.created_at".format(instance_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + instance_dict['nets'] = self.cur.fetchall() + + #instance_sfps + cmd = "SELECT uuid,vim_sfp_id,sce_rsp_id,datacenter_id,"\ + "datacenter_tenant_id,status,error_msg,vim_info"\ + " FROM instance_sfps" \ + " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + instance_dict['sfps'] = self.cur.fetchall() + + # for sfp in instance_dict['sfps']: + #instance_sfs + cmd = "SELECT uuid,vim_sf_id,sce_rsp_hop_id,datacenter_id,"\ + "datacenter_tenant_id,status,error_msg,vim_info"\ + " FROM instance_sfs" \ + " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) # TODO: replace instance_scenario_id with instance_sfp_id + self.logger.debug(cmd) + self.cur.execute(cmd) + instance_dict['sfs'] = self.cur.fetchall() + + #for sf in instance_dict['sfs']: + #instance_sfis + cmd = "SELECT uuid,vim_sfi_id,sce_rsp_hop_id,datacenter_id,"\ + "datacenter_tenant_id,status,error_msg,vim_info"\ + " FROM instance_sfis" \ + " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) # TODO: replace instance_scenario_id with instance_sf_id + self.logger.debug(cmd) + self.cur.execute(cmd) + instance_dict['sfis'] = self.cur.fetchall() # for sfi in instance_dict['sfi']: - #instance_classifications - cmd = "SELECT uuid,vim_classification_id,sce_classifier_match_id,datacenter_id,"\ - "datacenter_tenant_id,status,error_msg,vim_info"\ - " FROM instance_classifications" \ - " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) - self.logger.debug(cmd) - self.cur.execute(cmd) - instance_dict['classifications'] = self.cur.fetchall() + #instance_classifications + cmd = "SELECT uuid,vim_classification_id,sce_classifier_match_id,datacenter_id,"\ + "datacenter_tenant_id,status,error_msg,vim_info"\ + " FROM instance_classifications" \ + " WHERE instance_scenario_id='{}' ORDER BY created_at".format(instance_dict['uuid']) + self.logger.debug(cmd) + self.cur.execute(cmd) + instance_dict['classifications'] = self.cur.fetchall() # for classification in instance_dict['classifications'] - db_base._convert_datetime2str(instance_dict) - db_base._convert_str2boolean(instance_dict, ('public','shared','created') ) - return instance_dict - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 + db_base._convert_datetime2str(instance_dict) + db_base._convert_str2boolean(instance_dict, ('public','shared','created') ) + return instance_dict + @retry(command='delete', extra='No dependences can avoid deleting!!!!') + @with_transaction(cursor='dict') def delete_instance_scenario(self, instance_id, tenant_id=None): '''Deletes a instance_Scenario, filtering by one or serveral of the tenant, uuid or name instance_id is the uuid or the name if it is not a valid uuid format Only one instance_scenario must mutch the filtering or an error is returned ''' - tries = 2 - while tries: - try: - with self.con: - self.cur = self.con.cursor(mdb.cursors.DictCursor) - - #instance table - where_list=[] - if tenant_id is not None: where_list.append( "tenant_id='" + tenant_id +"'" ) - if db_base._check_valid_uuid(instance_id): - where_list.append( "uuid='" + instance_id +"'" ) - else: - where_list.append( "name='" + instance_id +"'" ) - where_text = " AND ".join(where_list) - cmd = "SELECT * FROM instance_scenarios WHERE "+ where_text - self.logger.debug(cmd) - self.cur.execute(cmd) - rows = self.cur.fetchall() - - if self.cur.rowcount==0: - raise db_base.db_base_Exception("No instance found where " + where_text, httperrors.Bad_Request) - elif self.cur.rowcount>1: - raise db_base.db_base_Exception("More than one instance found where " + where_text, httperrors.Bad_Request) - instance_uuid = rows[0]["uuid"] - instance_name = rows[0]["name"] - - #sce_vnfs - cmd = "DELETE FROM instance_scenarios WHERE uuid='{}'".format(instance_uuid) - self.logger.debug(cmd) - self.cur.execute(cmd) - - return instance_uuid + " " + instance_name - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries, "delete", "No dependences can avoid deleting!!!!") - tries -= 1 - + #instance table + where_list=[] + if tenant_id is not None: where_list.append( "tenant_id='" + tenant_id +"'" ) + if db_base._check_valid_uuid(instance_id): + where_list.append( "uuid='" + instance_id +"'" ) + else: + where_list.append( "name='" + instance_id +"'" ) + where_text = " AND ".join(where_list) + cmd = "SELECT * FROM instance_scenarios WHERE "+ where_text + self.logger.debug(cmd) + self.cur.execute(cmd) + rows = self.cur.fetchall() + + if self.cur.rowcount==0: + raise db_base.db_base_Exception("No instance found where " + where_text, httperrors.Bad_Request) + elif self.cur.rowcount>1: + raise db_base.db_base_Exception("More than one instance found where " + where_text, httperrors.Bad_Request) + instance_uuid = rows[0]["uuid"] + instance_name = rows[0]["name"] + + #sce_vnfs + cmd = "DELETE FROM instance_scenarios WHERE uuid='{}'".format(instance_uuid) + self.logger.debug(cmd) + self.cur.execute(cmd) + + return instance_uuid + " " + instance_name + + @retry(table='instance_scenarios') + @with_transaction def new_instance_scenario(self, instance_scenario_dict, tenant_id): #return self.new_row('vnfs', vnf_dict, None, tenant_id, True, True) return self._new_row_internal('instance_scenarios', instance_scenario_dict, tenant_id, add_uuid=True, root_uuid=None, log=True) @@ -1155,6 +1098,8 @@ class nfvo_db(db_base.db_base): #TODO: return + @retry(table='instance_vnfs') + @with_transaction def new_instance_vnf(self, instance_vnf_dict, tenant_id, instance_scenario_id = None): #return self.new_row('vms', vm_dict, tenant_id, True, True) return self._new_row_internal('instance_vnfs', instance_vnf_dict, tenant_id, add_uuid=True, root_uuid=instance_scenario_id, log=True) @@ -1167,6 +1112,8 @@ class nfvo_db(db_base.db_base): #TODO: return + @retry(table='instance_vms') + @with_transaction def new_instance_vm(self, instance_vm_dict, tenant_id, instance_scenario_id = None): #return self.new_row('vms', vm_dict, tenant_id, True, True) return self._new_row_internal('instance_vms', instance_vm_dict, tenant_id, add_uuid=True, root_uuid=instance_scenario_id, log=True) @@ -1179,6 +1126,8 @@ class nfvo_db(db_base.db_base): #TODO: return + @retry(table='instance_nets') + @with_transaction def new_instance_net(self, instance_net_dict, tenant_id, instance_scenario_id = None): return self._new_row_internal('instance_nets', instance_net_dict, tenant_id, add_uuid=True, root_uuid=instance_scenario_id, log=True) @@ -1190,6 +1139,8 @@ class nfvo_db(db_base.db_base): #TODO: return + @retry(table='instance_interfaces') + @with_transaction def new_instance_interface(self, instance_interface_dict, tenant_id, instance_scenario_id = None): return self._new_row_internal('instance_interfaces', instance_interface_dict, tenant_id, add_uuid=True, root_uuid=instance_scenario_id, log=True) @@ -1201,6 +1152,8 @@ class nfvo_db(db_base.db_base): #TODO: return + @retry(table='datacenter_nets') + @with_transaction def update_datacenter_nets(self, datacenter_id, new_net_list=[]): ''' Removes the old and adds the new net list at datacenter list for one datacenter. Attribute @@ -1209,24 +1162,14 @@ class nfvo_db(db_base.db_base): new_net_list: the new values to be inserted. If empty it only deletes the existing nets Return: (Inserted items, Deleted items) if OK, (-Error, text) if error ''' - tries = 2 - while tries: - created_time = time.time() - try: - with self.con: - self.cur = self.con.cursor() - cmd="DELETE FROM datacenter_nets WHERE datacenter_id='{}'".format(datacenter_id) - self.logger.debug(cmd) - self.cur.execute(cmd) - deleted = self.cur.rowcount - inserted = 0 - for new_net in new_net_list: - created_time += 0.00001 - self._new_row_internal('datacenter_nets', new_net, add_uuid=True, created_time=created_time) - inserted += 1 - return inserted, deleted - except (mdb.Error, AttributeError) as e: - self._format_error(e, tries) - tries -= 1 - - + created_time = time.time() + cmd="DELETE FROM datacenter_nets WHERE datacenter_id='{}'".format(datacenter_id) + self.logger.debug(cmd) + self.cur.execute(cmd) + deleted = self.cur.rowcount + inserted = 0 + for new_net in new_net_list: + created_time += 0.00001 + self._new_row_internal('datacenter_nets', new_net, add_uuid=True, created_time=created_time) + inserted += 1 + return inserted, deleted diff --git a/osm_ro/tests/test_db.py b/osm_ro/tests/test_db.py new file mode 100644 index 00000000..e152347e --- /dev/null +++ b/osm_ro/tests/test_db.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# pylint: disable=E1101 +import unittest + +from MySQLdb import connect, cursors, DatabaseError, IntegrityError +import mock +from mock import Mock + +from ..db_base import retry, with_transaction +from ..nfvo_db import nfvo_db +from .db_helpers import TestCaseWithDatabase + + +class TestDbDecorators(TestCaseWithDatabase): + @classmethod + def setUpClass(cls): + connection = connect(cls.host, cls.user, cls.password) + cursor = connection.cursor() + cursor.execute( + "CREATE DATABASE IF NOT EXISTS {};".format( + connection.escape_string(cls.database))) + cursor.execute("use {};".format(cls.database)) + cursor.execute("""\ + CREATE TABLE IF NOT EXISTS `test_table` (\ + `id` int(11) NOT NULL, + PRIMARY KEY (`id`)\ + );\ + """) + cursor.close() + connection.close() + + @classmethod + def tearDownClass(cls): + cls.empty_database() + + def setUp(self): + self.maxDiff = None + self.db = nfvo_db(self.host, self.user, self.password, self.database) + self.db.connect() + self.addCleanup(lambda: self.db.disconnect()) + + def db_run(self, query, cursor=None): + cursor = cursor or self.db.con.cursor() + cursor.execute(query) + return cursor.fetchone() + + def test_retry_inject_attempt(self): + @retry + def _fn(db, attempt=None): + self.assertIsNotNone(attempt) + self.assertEqual(attempt.number, 1) + + _fn(self.db) + + def test_retry_accept_max_attempts(self): + success = [] + failures = [] + + @retry(max_attempts=5) + def _fn(db, attempt=None): + if attempt.count < 4: + failures.append(attempt.count) + raise DatabaseError("Emulate DB error", "msg") + success.append(attempt.count) + + _fn(self.db) + self.assertEqual(failures, [0, 1, 2, 3]) + self.assertEqual(success, [4]) + + def test_retry_reconnect_auctomatically(self): + success = [] + failures = [] + + @retry(max_attempts=3) + def _fn(db, attempt=None): + if attempt.count < 2: + failures.append(attempt.count) + db.con.close() # Simulate connection failure + result = self.db_run('select 1+1, 2+2;') + success.append(attempt.count) + return result + + result = _fn(self.db) + self.assertEqual(failures, [0, 1]) + self.assertEqual(success, [2]) + self.assertEqual(result, (2, 4)) + + def test_retry_reraise_non_db_errors(self): + failures = [] + + @retry + def _fn(db, attempt=None): + failures.append(attempt.count) + raise SystemError("Non Correlated Error") + + with self.assertRaises(SystemError): + _fn(self.db) + + self.assertEqual(failures, [0]) + + def test_transaction_rollback(self): + with self.assertRaises(IntegrityError), \ + self.db.transaction() as cursor: + # The first row is created normally + self.db_run('insert into test_table (id) values (1)', cursor) + # The second row fails due to repeated id + self.db_run('insert into test_table (id) values (1)', cursor) + # The entire transaction will rollback then, and therefore the + # first operation will be undone + + count = self.db_run('select count(*) FROM test_table') + self.assertEqual(count, (0,)) + + def test_transaction_cursor(self): + with self.db.transaction(cursors.DictCursor) as cursor: + count = self.db_run('select count(*) as counter FROM test_table', + cursor) + + self.assertEqual(count, {'counter': 0}) + + +if __name__ == '__main__': + unittest.main() diff --git a/osm_ro/tests/test_utils.py b/osm_ro/tests/test_utils.py new file mode 100644 index 00000000..c3a4e7af --- /dev/null +++ b/osm_ro/tests/test_utils.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# pylint: disable=E1101 + +import unittest + +from ..utils import get_arg, inject_args + + +class TestUtils(unittest.TestCase): + def test_inject_args_curries_arguments(self): + fn = inject_args(lambda a=None, b=None: a+b, a=3, b=5) + self.assertEqual(fn(), 8) + + def test_inject_args_doesnt_add_arg_if_not_needed(self): + fn = inject_args(lambda: 7, a=1, b=2) + self.assertEqual(fn(), 7) + fn = inject_args(lambda a=None: a, b=2) + self.assertEqual(fn(1), 1) + + def test_inject_args_knows_how_to_handle_arg_order(self): + fn = inject_args(lambda a=None, b=None: b - a, a=3) + self.assertEqual(fn(b=4), 1) + fn = inject_args(lambda b=None, a=None: b - a, a=3) + self.assertEqual(fn(b=4), 1) + + def test_inject_args_works_as_decorator(self): + fn = inject_args(x=1)(lambda x=None: x) + self.assertEqual(fn(), 1) + + def test_get_arg__positional(self): + def _fn(x, y, z): + return x + y + z + + x = get_arg('x', _fn, (1, 3, 4), {}) + self.assertEqual(x, 1) + y = get_arg('y', _fn, (1, 3, 4), {}) + self.assertEqual(y, 3) + z = get_arg('z', _fn, (1, 3, 4), {}) + self.assertEqual(z, 4) + + def test_get_arg__keyword(self): + def _fn(x, y, z=5): + return x + y + z + + z = get_arg('z', _fn, (1, 2), {'z': 3}) + self.assertEqual(z, 3) + + +if __name__ == '__main__': + unittest.main() diff --git a/osm_ro/utils.py b/osm_ro/utils.py index 1e3a8ee7..05c9801b 100644 --- a/osm_ro/utils.py +++ b/osm_ro/utils.py @@ -32,14 +32,20 @@ __date__ ="$08-sep-2014 12:21:22$" import datetime import time import warnings -from functools import reduce +from functools import reduce, partial, wraps from itertools import tee +import six from six.moves import filter, filterfalse from jsonschema import exceptions as js_e from jsonschema import validate as js_v +if six.PY3: + from inspect import getfullargspec as getspec +else: + from inspect import getargspec as getspec + #from bs4 import BeautifulSoup def read_file(file_to_read): @@ -347,3 +353,60 @@ def safe_get(target, key_path, default=None): keys = key_path.split('.') target = reduce(lambda acc, key: acc.get(key) or {}, keys[:-1], target) return target.get(keys[-1], default) + + +class Attempt(object): + """Auxiliary class to be used in an attempt to retry executing a failing + procedure + + Attributes: + count (int): 0-based "retries" counter + max_attempts (int): maximum number of "retries" allowed + info (dict): extra information about the specific attempt + (can be used to produce more meaningful error messages) + """ + __slots__ = ('count', 'max', 'info') + + MAX = 3 + + def __init__(self, count=0, max_attempts=MAX, info=None): + self.count = count + self.max = max_attempts + self.info = info or {} + + @property + def countdown(self): + """Like count, but in the opposite direction""" + return self.max - self.count + + @property + def number(self): + """1-based counter""" + return self.count + 1 + + +def inject_args(fn=None, **args): + """Partially apply keyword arguments in a function, but only if the function + define them in the first place + """ + if fn is None: # Allows calling the decorator directly or with parameters + return partial(inject_args, **args) + + spec = getspec(fn) + return wraps(fn)(partial(fn, **filter_dict_keys(args, spec.args))) + + +def get_arg(name, fn, args, kwargs): + """Find the value of an argument for a function, given its argument list. + + This function can be used to display more meaningful errors for debugging + """ + if name in kwargs: + return kwargs[name] + + spec = getspec(fn) + if name in spec.args: + i = spec.args.index(name) + return args[i] if i < len(args) else None + + return None diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index d5301f1a..80024365 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -149,8 +149,7 @@ class vim_thread(threading.Thread): 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id', 'user', 'passwd', 'dt.config as dt_config') where_ = {"dt.uuid": self.datacenter_tenant_id} - with self.db_lock: - vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_) + vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_) vim = vims[0] vim_config = {} if vim["config"]: @@ -161,8 +160,9 @@ class vim_thread(threading.Thread): vim_config['datacenter_id'] = vim.get('datacenter_id') # get port_mapping - vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings( - db_filter={"region": vim_config['datacenter_id'], "pci": None}) + with self.db_lock: + vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings( + db_filter={"region": vim_config['datacenter_id'], "pci": None}) self.vim = vim_module[vim["type"]].vimconnector( uuid=vim['datacenter_id'], name=vim['datacenter_name'], @@ -193,12 +193,11 @@ class vim_thread(threading.Thread): database_limit = 200 while True: # get 200 (database_limit) entries each time - with self.db_lock: - vim_actions = self.db.get_rows(FROM="vim_wim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, - "item_id>=": old_item_id}, - ORDER_BY=("item_id", "item", "created_at",), - LIMIT=database_limit) + vim_actions = self.db.get_rows(FROM="vim_wim_actions", + WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + "item_id>=": old_item_id}, + ORDER_BY=("item_id", "item", "created_at",), + LIMIT=database_limit) for task in vim_actions: item = task["item"] item_id = task["item_id"] @@ -342,12 +341,12 @@ class vim_thread(threading.Thread): with self.db_lock: sdn_port_id = self.ovim.new_external_port( {"compute_node": interface["compute_node"], - "pci": interface["pci"], - "vlan": interface.get("vlan"), - "net_id": sdn_net_id, - "region": self.vim["config"]["datacenter_id"], - "name": sdn_port_name, - "mac": interface.get("mac_address")}) + "pci": interface["pci"], + "vlan": interface.get("vlan"), + "net_id": sdn_net_id, + "region": self.vim["config"]["datacenter_id"], + "name": sdn_port_name, + "mac": interface.get("mac_address")}) task_interface["sdn_port_id"] = sdn_port_id task_need_update = True except (ovimException, Exception) as e: @@ -360,18 +359,17 @@ class vim_thread(threading.Thread): task_warning_msg += error_text # TODO Set error_msg at instance_nets instead of instance VMs - with self.db_lock: - self.db.update_rows( - 'instance_interfaces', - UPDATE={"mac_address": interface.get("mac_address"), - "ip_address": interface.get("ip_address"), - "vim_interface_id": interface.get("vim_interface_id"), - "vim_info": interface.get("vim_info"), - "sdn_port_id": task_interface.get("sdn_port_id"), - "compute_node": interface.get("compute_node"), - "pci": interface.get("pci"), - "vlan": interface.get("vlan")}, - WHERE={'uuid': task_interface["iface_id"]}) + self.db.update_rows( + 'instance_interfaces', + UPDATE={"mac_address": interface.get("mac_address"), + "ip_address": interface.get("ip_address"), + "vim_interface_id": interface.get("vim_interface_id"), + "vim_info": interface.get("vim_info"), + "sdn_port_id": task_interface.get("sdn_port_id"), + "compute_node": interface.get("compute_node"), + "pci": interface.get("pci"), + "vlan": interface.get("vlan")}, + WHERE={'uuid': task_interface["iface_id"]}) task["vim_interfaces"][vim_interface_id] = interface # check and update task and instance_vms database @@ -388,8 +386,7 @@ class vim_thread(threading.Thread): temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg} if vim_info.get("vim_info"): temp_dict["vim_info"] = vim_info["vim_info"] - with self.db_lock: - self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) + self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) task["extra"]["vim_status"] = vim_info["status"] task["error_msg"] = vim_info_error_msg if vim_info.get("vim_info"): @@ -397,13 +394,12 @@ class vim_thread(threading.Thread): task_need_update = True if task_need_update: - with self.db_lock: - self.db.update_rows( - 'vim_wim_actions', - UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), - "error_msg": task.get("error_msg"), "modified_at": now}, - WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) + self.db.update_rows( + 'vim_wim_actions', + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), + "error_msg": task.get("error_msg"), "modified_at": now}, + WHERE={'instance_action_id': task['instance_action_id'], + 'task_index': task['task_index']}) if task["extra"].get("vim_status") == "BUILD": self._insert_refresh(task, now + self.REFRESH_BUILD) else: @@ -466,14 +462,13 @@ class vim_thread(threading.Thread): temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg} if vim_info.get("vim_info"): temp_dict["vim_info"] = vim_info["vim_info"] - with self.db_lock: - self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) - self.db.update_rows( - 'vim_wim_actions', - UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), - "error_msg": task.get("error_msg"), "modified_at": now}, - WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) + self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) + self.db.update_rows( + 'vim_wim_actions', + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), + "error_msg": task.get("error_msg"), "modified_at": now}, + WHERE={'instance_action_id': task['instance_action_id'], + 'task_index': task['task_index']}) if task["extra"].get("vim_status") == "BUILD": self._insert_refresh(task, now + self.REFRESH_BUILD) else: @@ -648,23 +643,22 @@ class vim_thread(threading.Thread): task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"])) try: now = time.time() - with self.db_lock: + self.db.update_rows( + table="vim_wim_actions", + UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, + "error_msg": task["error_msg"], + "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, + WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]}) + if result is not None: self.db.update_rows( - table="vim_wim_actions", - UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now, - "error_msg": task["error_msg"], - "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)}, - WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]}) - if result is not None: - self.db.update_rows( - table="instance_actions", - UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1}, - "modified_at": now}, - WHERE={"uuid": task["instance_action_id"]}) - if database_update: - self.db.update_rows(table=task["item"], - UPDATE=database_update, - WHERE={"uuid": task["item_id"]}) + table="instance_actions", + UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1}, + "modified_at": now}, + WHERE={"uuid": task["instance_action_id"]}) + if database_update: + self.db.update_rows(table=task["item"], + UPDATE=database_update, + WHERE={"uuid": task["item_id"]}) except db_base_Exception as e: self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True) @@ -816,9 +810,8 @@ class vim_thread(threading.Thread): if ins_action_id: instance_action_id = ins_action_id - with self.db_lock: - tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id, - "task_index": task_index}) + tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id, + "task_index": task_index}) if not tasks: return None task = tasks[0] @@ -868,11 +861,10 @@ class vim_thread(threading.Thread): task_interfaces = {} for iface in params_copy[5]: task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]} - with self.db_lock: - result = self.db.get_rows( - SELECT=('sdn_net_id', 'interface_id'), - FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid', - WHERE={'ii.uuid': iface["uuid"]}) + result = self.db.get_rows( + SELECT=('sdn_net_id', 'interface_id'), + FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid', + WHERE={'ii.uuid': iface["uuid"]}) if result: task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id'] task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id'] @@ -946,11 +938,10 @@ class vim_thread(threading.Thread): # Discover if this network is managed by a sdn controller sdn_net_id = None - with self.db_lock: - result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets', - WHERE={'vim_net_id': vim_net_id, - 'datacenter_tenant_id': self.datacenter_tenant_id}, - ORDER="instance_scenario_id") + result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets', + WHERE={'vim_net_id': vim_net_id, + 'datacenter_tenant_id': self.datacenter_tenant_id}, + ORDER="instance_scenario_id") if result: sdn_net_id = result[0]['sdn_net_id'] @@ -1035,10 +1026,12 @@ class vim_thread(threading.Thread): "name": sdn_port_name, } try: - sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) + with self.db_lock: + sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) except ovimException: sdn_port_data["compute_node"] = "__WIM" - sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) + with self.db_lock: + sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id, sdn_net_id)) diff --git a/osm_ro/wim/persistence.py b/osm_ro/wim/persistence.py index b956965b..2e72640b 100644 --- a/osm_ro/wim/persistence.py +++ b/osm_ro/wim/persistence.py @@ -45,7 +45,6 @@ from hashlib import sha1 from itertools import groupby from operator import itemgetter from sys import exc_info -from threading import Lock from time import time from uuid import uuid1 as generate_uuid @@ -148,10 +147,9 @@ be connected to two different places using the same port) class WimPersistence(object): """High level interactions with the WIM tables in the database""" - def __init__(self, db, logger=None, lock=None): + def __init__(self, db, logger=None): self.db = db self.logger = logger or logging.getLogger('openmano.wim.persistence') - self.lock = lock or Lock() def query(self, FROM=None, @@ -220,8 +218,7 @@ class WimPersistence(object): 'SELECT': SELECT, 'FROM': FROM, 'WHERE': WHERE, 'LIMIT': LIMIT, 'ORDER_BY': ORDER_BY}) - with self.lock: - records = self.db.get_rows(**query) + records = self.db.get_rows(**query) table = FROM.split()[0] if error_if_none and not records: @@ -312,9 +309,8 @@ class WimPersistence(object): if "config" in wim_descriptor: wim_descriptor["config"] = _serialize(wim_descriptor["config"]) - with self.lock: - return self.db.new_row( - "wims", wim_descriptor, add_uuid=True, confidential_data=True) + return self.db.new_row( + "wims", wim_descriptor, add_uuid=True, confidential_data=True) def update_wim(self, uuid_or_name, wim_descriptor): """Change an existing WIM record on the database""" @@ -332,8 +328,7 @@ class WimPersistence(object): wim_descriptor['config'] = ( _serialize(config_dict) if config_dict else None) - with self.lock: - self.db.update_rows('wims', wim_descriptor, where) + self.db.update_rows('wims', wim_descriptor, where) return wim_id @@ -341,8 +336,7 @@ class WimPersistence(object): # get nfvo_tenant info wim = self.get_by_name_or_uuid('wims', wim) - with self.lock: - self.db.delete_row_by_id('wims', wim['uuid']) + self.db.delete_row_by_id('wims', wim['uuid']) return wim['uuid'] + ' ' + wim['name'] @@ -388,8 +382,7 @@ class WimPersistence(object): ``wim_nfvo_tenants`` """ try: - with self.lock: - yield + yield except DbBaseException as db_exception: error_msg = str(db_exception) if all([msg in error_msg @@ -478,10 +471,8 @@ class WimPersistence(object): new_config = merge_dicts(old_config, properties['config']) updates['config'] = _serialize(new_config) - with self.lock: - num_changes = self.db.update_rows( - 'wim_accounts', UPDATE=updates, - WHERE={'uuid': wim_account['uuid']}) + num_changes = self.db.update_rows('wim_accounts', UPDATE=updates, + WHERE={'uuid': wim_account['uuid']}) if num_changes is None: raise UnexpectedDatabaseError('Impossible to update wim_account ' @@ -494,8 +485,7 @@ class WimPersistence(object): # Since we have foreign keys configured with ON CASCADE, we can rely # on the database engine to guarantee consistency, deleting the # dependant records - with self.lock: - return self.db.delete_row_by_id('wim_accounts', uuid) + return self.db.delete_row_by_id('wim_accounts', uuid) def get_datacenters_by(self, datacenter=None, tenant=None, **kwargs): """Retrieve datacenter information from the database together @@ -530,9 +520,8 @@ class WimPersistence(object): properties['wan_service_mapping_info'] = _serialize(info) try: - with self.lock: - self.db.new_row('wim_port_mappings', properties, - add_uuid=False, confidential_data=True) + self.db.new_row('wim_port_mappings', properties, + add_uuid=False, confidential_data=True) except DbBaseException as old_exception: self.logger.exception(old_exception) ex = InvalidParameters( @@ -634,9 +623,7 @@ class WimPersistence(object): ] def delete_wim_port_mappings(self, wim_id): - with self.lock: - self.db.delete_row(FROM='wim_port_mappings', - WHERE={"wim_id": wim_id}) + self.db.delete_row(FROM='wim_port_mappings', WHERE={"wim_id": wim_id}) return "port mapping for wim {} deleted.".format(wim_id) def update_wim_port_mapping(self, id, properties): @@ -650,9 +637,8 @@ class WimPersistence(object): merge_dicts(original, remove_none_items(properties), wan_service_mapping_info=mapping_info)) - with self.lock: - num_changes = self.db.update_rows( - 'wim_port_mappings', UPDATE=updates, WHERE={'id': id}) + num_changes = self.db.update_rows('wim_port_mappings', + UPDATE=updates, WHERE={'id': id}) if num_changes is None: raise UnexpectedDatabaseError( @@ -699,9 +685,8 @@ class WimPersistence(object): ) join = 'vim_wim_actions NATURAL JOIN ({}) AS items'.format(items) - with self.lock: - db_results = self.db.get_rows( - FROM=join, ORDER_BY=('item', 'item_id', 'created_at')) + db_results = self.db.get_rows( + FROM=join, ORDER_BY=('item', 'item_id', 'created_at')) results = (_postprocess_action(r) for r in db_results) criteria = itemgetter('item', 'item_id') @@ -710,7 +695,13 @@ class WimPersistence(object): def update_action(self, instance_action_id, task_index, properties): condition = {'instance_action_id': instance_action_id, 'task_index': task_index} - action = self.query_one('vim_wim_actions', WHERE=condition) + try: + action = self.query_one('vim_wim_actions', WHERE=condition) + except: + actions = self.query('vim_wim_actions', WHERE=condition) + self.logger.error('More then one action found:\n%s', + json.dumps(actions, indent=4)) + action = actions[0] extra = remove_none_items(merge_dicts( action.get('extra') or {}, @@ -719,9 +710,8 @@ class WimPersistence(object): updates = preprocess_record( merge_dicts(action, properties, extra=extra)) - with self.lock: - num_changes = self.db.update_rows('vim_wim_actions', - UPDATE=updates, WHERE=condition) + num_changes = self.db.update_rows('vim_wim_actions', + UPDATE=updates, WHERE=condition) if num_changes is None: raise UnexpectedDatabaseError( @@ -758,10 +748,9 @@ class WimPersistence(object): merge_dicts(wan_link, properties, wim_info=wim_info)) self.logger.debug({'UPDATE': updates}) - with self.lock: - num_changes = self.db.update_rows( - 'instance_wim_nets', UPDATE=updates, - WHERE={'uuid': wan_link['uuid']}) + num_changes = self.db.update_rows( + 'instance_wim_nets', UPDATE=updates, + WHERE={'uuid': wan_link['uuid']}) if num_changes is None: raise UnexpectedDatabaseError( @@ -793,9 +782,8 @@ class WimPersistence(object): if not changes: return 0 - with self.lock: - return self.db.update_rows('instance_actions', - WHERE={'uuid': uuid}, UPDATE=changes) + return self.db.update_rows('instance_actions', + WHERE={'uuid': uuid}, UPDATE=changes) def get_only_vm_with_external_net(self, instance_net_id, **kwargs): """Return an instance VM if that is the only VM connected to an @@ -820,6 +808,9 @@ class WimPersistence(object): """Return a SQL safe string""" return self.db.escape_string(string) + def reconnect(self): + self.db.reconnect() + def _generate_port_mapping_id(self, mapping_info): """Given a port mapping represented by a dict with a 'type' field, generate a unique string, in a injective way. diff --git a/osm_ro/wim/schemas.py b/osm_ro/wim/schemas.py index fb65fdd3..6692ac59 100644 --- a/osm_ro/wim/schemas.py +++ b/osm_ro/wim/schemas.py @@ -83,7 +83,7 @@ wim_port_mapping_desc = { "required": ["mapping_type"] } }, - "oneOf": [ + "anyOf": [ { "required": [ "pop_switch_dpid", -- 2.17.1