Improve race conditions/MySQL reconnection
This commit aims to provide a better synchronization between all the
different threads in RO, specially regarding DB usage and internal state
consistency.
The following improvements were done:
1. Centralize database retry logic into a single function
This way we can change the procedure and the rules for retrying in a
single place and this reflects in several functions simultaneously
avoiding the need for manual copy and paste (and the potential risk of
forgetting to change somewhere)
2. Minor fixes/improvements related to database connection loss.
Previously `db_base` was already able to identify when the connection
to MySQL was lost, but apparently in a few edge cases the automatic
reconnection was not done.
3. Implement a transaction method
This method replaces the old context manager API for the connection
object that was removed from MySQLdb in version 1.4
In additional it is possible to use a decorator for transactions
(not only the context manager), which is handy sometimes.
4. Add lock mechanism directly to db_base
This helps to improve synchronization between threads.
Some extra synchronization was introduced to functions, as it seemed
to be the case.
Moreover, previously, the cursor object was part of the internal state
of the db_base object, and it was being changed/used without thread
synchronization (error-prone). Having the locking mechanism around the
changes in the cursor property of the class, avoids problems.
5. Add option to fork connection
Useful when independent threading is needed (as long as different
threads don't access the same database table, having separated
connections and locks should work fine).
Change-Id: I3ab34df5e8c2857d96ed14a70e7f65bd0b5189a0
Signed-off-by: Anderson Bravalheri <a.bravalheri@bristol.ac.uk>
diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py
index d5301f1..8002436 100644
--- a/osm_ro/vim_thread.py
+++ b/osm_ro/vim_thread.py
@@ -149,8 +149,7 @@
'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
'user', 'passwd', 'dt.config as dt_config')
where_ = {"dt.uuid": self.datacenter_tenant_id}
- with self.db_lock:
- vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
+ vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
vim = vims[0]
vim_config = {}
if vim["config"]:
@@ -161,8 +160,9 @@
vim_config['datacenter_id'] = vim.get('datacenter_id')
# get port_mapping
- vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
- db_filter={"region": vim_config['datacenter_id'], "pci": None})
+ with self.db_lock:
+ vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
+ db_filter={"region": vim_config['datacenter_id'], "pci": None})
self.vim = vim_module[vim["type"]].vimconnector(
uuid=vim['datacenter_id'], name=vim['datacenter_name'],
@@ -193,12 +193,11 @@
database_limit = 200
while True:
# get 200 (database_limit) entries each time
- with self.db_lock:
- vim_actions = self.db.get_rows(FROM="vim_wim_actions",
- WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
- "item_id>=": old_item_id},
- ORDER_BY=("item_id", "item", "created_at",),
- LIMIT=database_limit)
+ vim_actions = self.db.get_rows(FROM="vim_wim_actions",
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "item_id>=": old_item_id},
+ ORDER_BY=("item_id", "item", "created_at",),
+ LIMIT=database_limit)
for task in vim_actions:
item = task["item"]
item_id = task["item_id"]
@@ -342,12 +341,12 @@
with self.db_lock:
sdn_port_id = self.ovim.new_external_port(
{"compute_node": interface["compute_node"],
- "pci": interface["pci"],
- "vlan": interface.get("vlan"),
- "net_id": sdn_net_id,
- "region": self.vim["config"]["datacenter_id"],
- "name": sdn_port_name,
- "mac": interface.get("mac_address")})
+ "pci": interface["pci"],
+ "vlan": interface.get("vlan"),
+ "net_id": sdn_net_id,
+ "region": self.vim["config"]["datacenter_id"],
+ "name": sdn_port_name,
+ "mac": interface.get("mac_address")})
task_interface["sdn_port_id"] = sdn_port_id
task_need_update = True
except (ovimException, Exception) as e:
@@ -360,18 +359,17 @@
task_warning_msg += error_text
# TODO Set error_msg at instance_nets instead of instance VMs
- with self.db_lock:
- self.db.update_rows(
- 'instance_interfaces',
- UPDATE={"mac_address": interface.get("mac_address"),
- "ip_address": interface.get("ip_address"),
- "vim_interface_id": interface.get("vim_interface_id"),
- "vim_info": interface.get("vim_info"),
- "sdn_port_id": task_interface.get("sdn_port_id"),
- "compute_node": interface.get("compute_node"),
- "pci": interface.get("pci"),
- "vlan": interface.get("vlan")},
- WHERE={'uuid': task_interface["iface_id"]})
+ self.db.update_rows(
+ 'instance_interfaces',
+ UPDATE={"mac_address": interface.get("mac_address"),
+ "ip_address": interface.get("ip_address"),
+ "vim_interface_id": interface.get("vim_interface_id"),
+ "vim_info": interface.get("vim_info"),
+ "sdn_port_id": task_interface.get("sdn_port_id"),
+ "compute_node": interface.get("compute_node"),
+ "pci": interface.get("pci"),
+ "vlan": interface.get("vlan")},
+ WHERE={'uuid': task_interface["iface_id"]})
task["vim_interfaces"][vim_interface_id] = interface
# check and update task and instance_vms database
@@ -388,8 +386,7 @@
temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
if vim_info.get("vim_info"):
temp_dict["vim_info"] = vim_info["vim_info"]
- with self.db_lock:
- self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
+ self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
task["extra"]["vim_status"] = vim_info["status"]
task["error_msg"] = vim_info_error_msg
if vim_info.get("vim_info"):
@@ -397,13 +394,12 @@
task_need_update = True
if task_need_update:
- with self.db_lock:
- self.db.update_rows(
- 'vim_wim_actions',
- UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
- "error_msg": task.get("error_msg"), "modified_at": now},
- WHERE={'instance_action_id': task['instance_action_id'],
- 'task_index': task['task_index']})
+ self.db.update_rows(
+ 'vim_wim_actions',
+ UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
+ "error_msg": task.get("error_msg"), "modified_at": now},
+ WHERE={'instance_action_id': task['instance_action_id'],
+ 'task_index': task['task_index']})
if task["extra"].get("vim_status") == "BUILD":
self._insert_refresh(task, now + self.REFRESH_BUILD)
else:
@@ -466,14 +462,13 @@
temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
if vim_info.get("vim_info"):
temp_dict["vim_info"] = vim_info["vim_info"]
- with self.db_lock:
- self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
- self.db.update_rows(
- 'vim_wim_actions',
- UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
- "error_msg": task.get("error_msg"), "modified_at": now},
- WHERE={'instance_action_id': task['instance_action_id'],
- 'task_index': task['task_index']})
+ self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
+ self.db.update_rows(
+ 'vim_wim_actions',
+ UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
+ "error_msg": task.get("error_msg"), "modified_at": now},
+ WHERE={'instance_action_id': task['instance_action_id'],
+ 'task_index': task['task_index']})
if task["extra"].get("vim_status") == "BUILD":
self._insert_refresh(task, now + self.REFRESH_BUILD)
else:
@@ -648,23 +643,22 @@
task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
try:
now = time.time()
- with self.db_lock:
+ self.db.update_rows(
+ table="vim_wim_actions",
+ UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
+ "error_msg": task["error_msg"],
+ "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
+ WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
+ if result is not None:
self.db.update_rows(
- table="vim_wim_actions",
- UPDATE={"status": task["status"], "vim_id": task.get("vim_id"), "modified_at": now,
- "error_msg": task["error_msg"],
- "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
- WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
- if result is not None:
- self.db.update_rows(
- table="instance_actions",
- UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
- "modified_at": now},
- WHERE={"uuid": task["instance_action_id"]})
- if database_update:
- self.db.update_rows(table=task["item"],
- UPDATE=database_update,
- WHERE={"uuid": task["item_id"]})
+ table="instance_actions",
+ UPDATE={("number_done" if result else "number_failed"): {"INCREMENT": 1},
+ "modified_at": now},
+ WHERE={"uuid": task["instance_action_id"]})
+ if database_update:
+ self.db.update_rows(table=task["item"],
+ UPDATE=database_update,
+ WHERE={"uuid": task["item_id"]})
except db_base_Exception as e:
self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
@@ -816,9 +810,8 @@
if ins_action_id:
instance_action_id = ins_action_id
- with self.db_lock:
- tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
- "task_index": task_index})
+ tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
+ "task_index": task_index})
if not tasks:
return None
task = tasks[0]
@@ -868,11 +861,10 @@
task_interfaces = {}
for iface in params_copy[5]:
task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
- with self.db_lock:
- result = self.db.get_rows(
- SELECT=('sdn_net_id', 'interface_id'),
- FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
- WHERE={'ii.uuid': iface["uuid"]})
+ result = self.db.get_rows(
+ SELECT=('sdn_net_id', 'interface_id'),
+ FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
+ WHERE={'ii.uuid': iface["uuid"]})
if result:
task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
@@ -946,11 +938,10 @@
# Discover if this network is managed by a sdn controller
sdn_net_id = None
- with self.db_lock:
- result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
- WHERE={'vim_net_id': vim_net_id,
- 'datacenter_tenant_id': self.datacenter_tenant_id},
- ORDER="instance_scenario_id")
+ result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
+ WHERE={'vim_net_id': vim_net_id,
+ 'datacenter_tenant_id': self.datacenter_tenant_id},
+ ORDER="instance_scenario_id")
if result:
sdn_net_id = result[0]['sdn_net_id']
@@ -1035,10 +1026,12 @@
"name": sdn_port_name,
}
try:
- sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
+ with self.db_lock:
+ sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
except ovimException:
sdn_port_data["compute_node"] = "__WIM"
- sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
+ with self.db_lock:
+ sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
sdn_net_id))