X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=osm_ro%2Fwim%2Fpersistence.py;h=636676a4cb112de778da96439ff6e55d89d6396c;hp=8a74d491b5fb8f4daaaf6e92281486a2bc80f8b5;hb=433a63deecc246609abe0b070e6bf75826d03ef2;hpb=f2813e8e41b842c0f7ad2a2fc95a1195e18cded8 diff --git a/osm_ro/wim/persistence.py b/osm_ro/wim/persistence.py index 8a74d491..636676a4 100644 --- a/osm_ro/wim/persistence.py +++ b/osm_ro/wim/persistence.py @@ -45,8 +45,7 @@ from hashlib import sha1 from itertools import groupby from operator import itemgetter from sys import exc_info -from threading import Lock -from time import time +# from time import time from uuid import uuid1 as generate_uuid from six import reraise @@ -148,10 +147,9 @@ be connected to two different places using the same port) class WimPersistence(object): """High level interactions with the WIM tables in the database""" - def __init__(self, db, logger=None, lock=None): + def __init__(self, db, logger=None): self.db = db self.logger = logger or logging.getLogger('openmano.wim.persistence') - self.lock = lock or Lock() def query(self, FROM=None, @@ -220,8 +218,7 @@ class WimPersistence(object): 'SELECT': SELECT, 'FROM': FROM, 'WHERE': WHERE, 'LIMIT': LIMIT, 'ORDER_BY': ORDER_BY}) - with self.lock: - records = self.db.get_rows(**query) + records = self.db.get_rows(**query) table = FROM.split()[0] if error_if_none and not records: @@ -312,9 +309,8 @@ class WimPersistence(object): if "config" in wim_descriptor: wim_descriptor["config"] = _serialize(wim_descriptor["config"]) - with self.lock: - return self.db.new_row( - "wims", wim_descriptor, add_uuid=True, confidential_data=True) + return self.db.new_row( + "wims", wim_descriptor, add_uuid=True, confidential_data=True) def update_wim(self, uuid_or_name, wim_descriptor): """Change an existing WIM record on the database""" @@ -326,15 +322,13 @@ class WimPersistence(object): where = {'uuid': wim['uuid']} # unserialize config, edit and serialize it again - if wim_descriptor.get('config'): - new_config_dict = wim_descriptor["config"] - config_dict = remove_none_items(merge_dicts( - wim.get('config') or {}, new_config_dict)) - wim_descriptor['config'] = ( - _serialize(config_dict) if config_dict else None) + new_config_dict = wim_descriptor.get('config', {}) or {} + config_dict = remove_none_items(merge_dicts( + wim.get('config', {}) or {}, new_config_dict)) + wim_descriptor['config'] = ( + _serialize(config_dict) if config_dict else None) - with self.lock: - self.db.update_rows('wims', wim_descriptor, where) + self.db.update_rows('wims', wim_descriptor, where) return wim_id @@ -342,8 +336,7 @@ class WimPersistence(object): # get nfvo_tenant info wim = self.get_by_name_or_uuid('wims', wim) - with self.lock: - self.db.delete_row_by_id('wims', wim['uuid']) + self.db.delete_row_by_id('wims', wim['uuid']) return wim['uuid'] + ' ' + wim['name'] @@ -363,10 +356,10 @@ class WimPersistence(object): kwargs.setdefault('WHERE', {'wim_account.uuid': uuid}) return self.query(FROM=_WIM_ACCOUNT_JOIN, **kwargs) - def get_wim_account_by(self, wim=None, tenant=None, **kwargs): + def get_wim_account_by(self, wim=None, tenant=None, uuid=None, **kwargs): """Similar to ``get_wim_accounts_by``, but ensuring just one result""" kwargs.setdefault('error_if_multiple', True) - return self.get_wim_accounts_by(wim, tenant, **kwargs)[0] + return self.get_wim_accounts_by(wim, tenant, uuid, **kwargs)[0] def get_wim_accounts(self, **kwargs): """Retrieve all the accounts from the database""" @@ -389,8 +382,7 @@ class WimPersistence(object): ``wim_nfvo_tenants`` """ try: - with self.lock: - yield + yield except DbBaseException as db_exception: error_msg = str(db_exception) if all([msg in error_msg @@ -479,10 +471,8 @@ class WimPersistence(object): new_config = merge_dicts(old_config, properties['config']) updates['config'] = _serialize(new_config) - with self.lock: - num_changes = self.db.update_rows( - 'wim_accounts', UPDATE=updates, - WHERE={'uuid': wim_account['uuid']}) + num_changes = self.db.update_rows('wim_accounts', UPDATE=updates, + WHERE={'uuid': wim_account['uuid']}) if num_changes is None: raise UnexpectedDatabaseError('Impossible to update wim_account ' @@ -495,8 +485,7 @@ class WimPersistence(object): # Since we have foreign keys configured with ON CASCADE, we can rely # on the database engine to guarantee consistency, deleting the # dependant records - with self.lock: - return self.db.delete_row_by_id('wim_accounts', uuid) + return self.db.delete_row_by_id('wim_accounts', uuid) def get_datacenters_by(self, datacenter=None, tenant=None, **kwargs): """Retrieve datacenter information from the database together @@ -508,8 +497,12 @@ class WimPersistence(object): See :obj:`~.query` for additional keyword arguments. """ - kwargs.update(datacenter=datacenter, tenant=tenant) - return self.query(_DATACENTER_JOIN, **kwargs) + if tenant: + kwargs.update(datacenter=datacenter, tenant=tenant) + return self.query(_DATACENTER_JOIN, **kwargs) + else: + return [self.get_by_name_or_uuid('datacenters', + datacenter, **kwargs)] def get_datacenter_by(self, datacenter=None, tenant=None, **kwargs): """Similar to ``get_datacenters_by``, but ensuring just one result""" @@ -527,9 +520,8 @@ class WimPersistence(object): properties['wan_service_mapping_info'] = _serialize(info) try: - with self.lock: - self.db.new_row('wim_port_mappings', properties, - add_uuid=False, confidential_data=True) + self.db.new_row('wim_port_mappings', properties, + add_uuid=False, confidential_data=True) except DbBaseException as old_exception: self.logger.exception(old_exception) ex = InvalidParameters( @@ -622,7 +614,7 @@ class WimPersistence(object): return [ {'wim_id': key[0], 'datacenter_id': key[1], - 'wan_pop_port_mappings': [ + 'pop_wan_mappings': [ filter_out_dict_keys(mapping, ( 'id', 'wim_id', 'datacenter_id', 'created_at', 'modified_at')) @@ -631,9 +623,7 @@ class WimPersistence(object): ] def delete_wim_port_mappings(self, wim_id): - with self.lock: - self.db.delete_row(FROM='wim_port_mappings', - WHERE={"wim_id": wim_id}) + self.db.delete_row(FROM='wim_port_mappings', WHERE={"wim_id": wim_id}) return "port mapping for wim {} deleted.".format(wim_id) def update_wim_port_mapping(self, id, properties): @@ -647,9 +637,8 @@ class WimPersistence(object): merge_dicts(original, remove_none_items(properties), wan_service_mapping_info=mapping_info)) - with self.lock: - num_changes = self.db.update_rows( - 'wim_port_mappings', UPDATE=updates, WHERE={'id': id}) + num_changes = self.db.update_rows('wim_port_mappings', + UPDATE=updates, WHERE={'id': id}) if num_changes is None: raise UnexpectedDatabaseError( @@ -692,13 +681,11 @@ class WimPersistence(object): 'LIMIT {:d},{:d}').format( self.safe_str(wim_account_id), ','.join(type_options), - group_offset, group_limit - ) + group_offset, group_limit) join = 'vim_wim_actions NATURAL JOIN ({}) AS items'.format(items) - with self.lock: - db_results = self.db.get_rows( - FROM=join, ORDER_BY=('item', 'item_id', 'created_at')) + db_results = self.db.get_rows( + FROM=join, ORDER_BY=('item', 'item_id', 'created_at')) results = (_postprocess_action(r) for r in db_results) criteria = itemgetter('item', 'item_id') @@ -707,7 +694,13 @@ class WimPersistence(object): def update_action(self, instance_action_id, task_index, properties): condition = {'instance_action_id': instance_action_id, 'task_index': task_index} - action = self.query_one('vim_wim_actions', WHERE=condition) + try: + action = self.query_one('vim_wim_actions', WHERE=condition) + except Exception: + actions = self.query('vim_wim_actions', WHERE=condition) + self.logger.error('More then one action found:\n%s', + json.dumps(actions, indent=4)) + action = actions[0] extra = remove_none_items(merge_dicts( action.get('extra') or {}, @@ -716,9 +709,7 @@ class WimPersistence(object): updates = preprocess_record( merge_dicts(action, properties, extra=extra)) - with self.lock: - num_changes = self.db.update_rows('vim_wim_actions', - UPDATE=updates, WHERE=condition) + num_changes = self.db.update_rows('vim_wim_actions', UPDATE=updates, WHERE=condition) if num_changes is None: raise UnexpectedDatabaseError( @@ -755,10 +746,9 @@ class WimPersistence(object): merge_dicts(wan_link, properties, wim_info=wim_info)) self.logger.debug({'UPDATE': updates}) - with self.lock: - num_changes = self.db.update_rows( - 'instance_wim_nets', UPDATE=updates, - WHERE={'uuid': wan_link['uuid']}) + num_changes = self.db.update_rows( + 'instance_wim_nets', UPDATE=updates, + WHERE={'uuid': wan_link['uuid']}) if num_changes is None: raise UnexpectedDatabaseError( @@ -790,9 +780,7 @@ class WimPersistence(object): if not changes: return 0 - with self.lock: - return self.db.update_rows('instance_actions', - WHERE={'uuid': uuid}, UPDATE=changes) + return self.db.update_rows('instance_actions', WHERE={'uuid': uuid}, UPDATE=changes) def get_only_vm_with_external_net(self, instance_net_id, **kwargs): """Return an instance VM if that is the only VM connected to an @@ -817,6 +805,9 @@ class WimPersistence(object): """Return a SQL safe string""" return self.db.escape_string(string) + def reconnect(self): + self.db.reconnect() + def _generate_port_mapping_id(self, mapping_info): """Given a port mapping represented by a dict with a 'type' field, generate a unique string, in a injective way.