X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=osm_ro%2Fwim%2Fpersistence.py;h=636676a4cb112de778da96439ff6e55d89d6396c;hp=b956965b320b13b22b5bae327cbc73ae789e6253;hb=433a63deecc246609abe0b070e6bf75826d03ef2;hpb=fed47b0903b8e5ee93591ef53540432d3ecce796 diff --git a/osm_ro/wim/persistence.py b/osm_ro/wim/persistence.py index b956965b..636676a4 100644 --- a/osm_ro/wim/persistence.py +++ b/osm_ro/wim/persistence.py @@ -45,8 +45,7 @@ from hashlib import sha1 from itertools import groupby from operator import itemgetter from sys import exc_info -from threading import Lock -from time import time +# from time import time from uuid import uuid1 as generate_uuid from six import reraise @@ -148,10 +147,9 @@ be connected to two different places using the same port) class WimPersistence(object): """High level interactions with the WIM tables in the database""" - def __init__(self, db, logger=None, lock=None): + def __init__(self, db, logger=None): self.db = db self.logger = logger or logging.getLogger('openmano.wim.persistence') - self.lock = lock or Lock() def query(self, FROM=None, @@ -220,8 +218,7 @@ class WimPersistence(object): 'SELECT': SELECT, 'FROM': FROM, 'WHERE': WHERE, 'LIMIT': LIMIT, 'ORDER_BY': ORDER_BY}) - with self.lock: - records = self.db.get_rows(**query) + records = self.db.get_rows(**query) table = FROM.split()[0] if error_if_none and not records: @@ -312,9 +309,8 @@ class WimPersistence(object): if "config" in wim_descriptor: wim_descriptor["config"] = _serialize(wim_descriptor["config"]) - with self.lock: - return self.db.new_row( - "wims", wim_descriptor, add_uuid=True, confidential_data=True) + return self.db.new_row( + "wims", wim_descriptor, add_uuid=True, confidential_data=True) def update_wim(self, uuid_or_name, wim_descriptor): """Change an existing WIM record on the database""" @@ -332,8 +328,7 @@ class WimPersistence(object): wim_descriptor['config'] = ( _serialize(config_dict) if config_dict else None) - with self.lock: - self.db.update_rows('wims', wim_descriptor, where) + self.db.update_rows('wims', wim_descriptor, where) return wim_id @@ -341,8 +336,7 @@ class WimPersistence(object): # get nfvo_tenant info wim = self.get_by_name_or_uuid('wims', wim) - with self.lock: - self.db.delete_row_by_id('wims', wim['uuid']) + self.db.delete_row_by_id('wims', wim['uuid']) return wim['uuid'] + ' ' + wim['name'] @@ -388,8 +382,7 @@ class WimPersistence(object): ``wim_nfvo_tenants`` """ try: - with self.lock: - yield + yield except DbBaseException as db_exception: error_msg = str(db_exception) if all([msg in error_msg @@ -478,10 +471,8 @@ class WimPersistence(object): new_config = merge_dicts(old_config, properties['config']) updates['config'] = _serialize(new_config) - with self.lock: - num_changes = self.db.update_rows( - 'wim_accounts', UPDATE=updates, - WHERE={'uuid': wim_account['uuid']}) + num_changes = self.db.update_rows('wim_accounts', UPDATE=updates, + WHERE={'uuid': wim_account['uuid']}) if num_changes is None: raise UnexpectedDatabaseError('Impossible to update wim_account ' @@ -494,8 +485,7 @@ class WimPersistence(object): # Since we have foreign keys configured with ON CASCADE, we can rely # on the database engine to guarantee consistency, deleting the # dependant records - with self.lock: - return self.db.delete_row_by_id('wim_accounts', uuid) + return self.db.delete_row_by_id('wim_accounts', uuid) def get_datacenters_by(self, datacenter=None, tenant=None, **kwargs): """Retrieve datacenter information from the database together @@ -530,9 +520,8 @@ class WimPersistence(object): properties['wan_service_mapping_info'] = _serialize(info) try: - with self.lock: - self.db.new_row('wim_port_mappings', properties, - add_uuid=False, confidential_data=True) + self.db.new_row('wim_port_mappings', properties, + add_uuid=False, confidential_data=True) except DbBaseException as old_exception: self.logger.exception(old_exception) ex = InvalidParameters( @@ -634,9 +623,7 @@ class WimPersistence(object): ] def delete_wim_port_mappings(self, wim_id): - with self.lock: - self.db.delete_row(FROM='wim_port_mappings', - WHERE={"wim_id": wim_id}) + self.db.delete_row(FROM='wim_port_mappings', WHERE={"wim_id": wim_id}) return "port mapping for wim {} deleted.".format(wim_id) def update_wim_port_mapping(self, id, properties): @@ -650,9 +637,8 @@ class WimPersistence(object): merge_dicts(original, remove_none_items(properties), wan_service_mapping_info=mapping_info)) - with self.lock: - num_changes = self.db.update_rows( - 'wim_port_mappings', UPDATE=updates, WHERE={'id': id}) + num_changes = self.db.update_rows('wim_port_mappings', + UPDATE=updates, WHERE={'id': id}) if num_changes is None: raise UnexpectedDatabaseError( @@ -695,13 +681,11 @@ class WimPersistence(object): 'LIMIT {:d},{:d}').format( self.safe_str(wim_account_id), ','.join(type_options), - group_offset, group_limit - ) + group_offset, group_limit) join = 'vim_wim_actions NATURAL JOIN ({}) AS items'.format(items) - with self.lock: - db_results = self.db.get_rows( - FROM=join, ORDER_BY=('item', 'item_id', 'created_at')) + db_results = self.db.get_rows( + FROM=join, ORDER_BY=('item', 'item_id', 'created_at')) results = (_postprocess_action(r) for r in db_results) criteria = itemgetter('item', 'item_id') @@ -710,7 +694,13 @@ class WimPersistence(object): def update_action(self, instance_action_id, task_index, properties): condition = {'instance_action_id': instance_action_id, 'task_index': task_index} - action = self.query_one('vim_wim_actions', WHERE=condition) + try: + action = self.query_one('vim_wim_actions', WHERE=condition) + except Exception: + actions = self.query('vim_wim_actions', WHERE=condition) + self.logger.error('More then one action found:\n%s', + json.dumps(actions, indent=4)) + action = actions[0] extra = remove_none_items(merge_dicts( action.get('extra') or {}, @@ -719,9 +709,7 @@ class WimPersistence(object): updates = preprocess_record( merge_dicts(action, properties, extra=extra)) - with self.lock: - num_changes = self.db.update_rows('vim_wim_actions', - UPDATE=updates, WHERE=condition) + num_changes = self.db.update_rows('vim_wim_actions', UPDATE=updates, WHERE=condition) if num_changes is None: raise UnexpectedDatabaseError( @@ -758,10 +746,9 @@ class WimPersistence(object): merge_dicts(wan_link, properties, wim_info=wim_info)) self.logger.debug({'UPDATE': updates}) - with self.lock: - num_changes = self.db.update_rows( - 'instance_wim_nets', UPDATE=updates, - WHERE={'uuid': wan_link['uuid']}) + num_changes = self.db.update_rows( + 'instance_wim_nets', UPDATE=updates, + WHERE={'uuid': wan_link['uuid']}) if num_changes is None: raise UnexpectedDatabaseError( @@ -793,9 +780,7 @@ class WimPersistence(object): if not changes: return 0 - with self.lock: - return self.db.update_rows('instance_actions', - WHERE={'uuid': uuid}, UPDATE=changes) + return self.db.update_rows('instance_actions', WHERE={'uuid': uuid}, UPDATE=changes) def get_only_vm_with_external_net(self, instance_net_id, **kwargs): """Return an instance VM if that is the only VM connected to an @@ -820,6 +805,9 @@ class WimPersistence(object): """Return a SQL safe string""" return self.db.escape_string(string) + def reconnect(self): + self.db.reconnect() + def _generate_port_mapping_id(self, mapping_info): """Given a port mapping represented by a dict with a 'type' field, generate a unique string, in a injective way.