nb_processed += 1
self.refresh_tasks.pop(0)
if task["item"] == 'instance_vms':
- vm_to_refresh_list.append(task["vim_id"])
- vm_to_refresh_dict[task["vim_id"]] = task
+ if task["vim_id"] not in vm_to_refresh_dict:
+ vm_to_refresh_dict[task["vim_id"]] = [task]
+ vm_to_refresh_list.append(task["vim_id"])
+ else:
+ vm_to_refresh_dict[task["vim_id"]].append(task)
elif task["item"] == 'instance_nets':
- net_to_refresh_list.append(task["vim_id"])
- net_to_refresh_dict[task["vim_id"]] = task
+ if task["vim_id"] not in net_to_refresh_dict:
+ net_to_refresh_dict[task["vim_id"]] = [task]
+ net_to_refresh_list.append(task["vim_id"])
+ else:
+ net_to_refresh_dict[task["vim_id"]].append(task)
else:
task_id = task["instance_action_id"] + "." + str(task["task_index"])
self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True)
vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)}
for vim_id, vim_info in vim_dict.items():
- # look for task
- task_need_update = False
- task = vm_to_refresh_dict[vim_id]
- task_id = task["instance_action_id"] + "." + str(task["task_index"])
- self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
-
- # check and update interfaces
- task_warning_msg = ""
- for interface in vim_info.get("interfaces", ()):
- vim_interface_id = interface["vim_interface_id"]
- if vim_interface_id not in task["extra"]["interfaces"]:
- self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
- task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
- continue
- task_interface = task["extra"]["interfaces"][vim_interface_id]
- task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
- if task_vim_interface != interface:
- # delete old port
- if task_interface.get("sdn_port_id"):
- try:
- with self.db_lock:
- self.ovim.delete_port(task_interface["sdn_port_id"])
- task_interface["sdn_port_id"] = None
- task_need_update = True
- except ovimException as e:
- error_text = "ovimException deleting external_port={}: {}".format(
- task_interface["sdn_port_id"], e)
- self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
- task_warning_msg += error_text
- # TODO Set error_msg at instance_nets instead of instance VMs
-
- # Create SDN port
- sdn_net_id = task_interface.get("sdn_net_id")
- if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
- sdn_port_name = sdn_net_id + "." + task["vim_id"]
- sdn_port_name = sdn_port_name[:63]
- try:
- with self.db_lock:
- sdn_port_id = self.ovim.new_external_port(
- {"compute_node": interface["compute_node"],
- "pci": interface["pci"],
- "vlan": interface.get("vlan"),
- "net_id": sdn_net_id,
- "region": self.vim["config"]["datacenter_id"],
- "name": sdn_port_name,
- "mac": interface.get("mac_address")})
- task_interface["sdn_port_id"] = sdn_port_id
- task_need_update = True
- except (ovimException, Exception) as e:
- error_text = "ovimException creating new_external_port compute_node={}"\
- " pci={} vlan={} {}".format(
- interface["compute_node"],
- interface["pci"],
- interface.get("vlan"), e)
- self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
- task_warning_msg += error_text
- # TODO Set error_msg at instance_nets instead of instance VMs
+ # look for task
+ for task in vm_to_refresh_dict[vim_id]:
+ task_need_update = False
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
+
+ # check and update interfaces
+ task_warning_msg = ""
+ for interface in vim_info.get("interfaces", ()):
+ vim_interface_id = interface["vim_interface_id"]
+ if vim_interface_id not in task["extra"]["interfaces"]:
+ self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
+ task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
+ continue
+ task_interface = task["extra"]["interfaces"][vim_interface_id]
+ task_vim_interface = task["vim_interfaces"].get(vim_interface_id)
+ if task_vim_interface != interface:
+ # delete old port
+ if task_interface.get("sdn_port_id"):
+ try:
+ with self.db_lock:
+ self.ovim.delete_port(task_interface["sdn_port_id"])
+ task_interface["sdn_port_id"] = None
+ task_need_update = True
+ except ovimException as e:
+ error_text = "ovimException deleting external_port={}: {}".format(
+ task_interface["sdn_port_id"], e)
+ self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
+ task_warning_msg += error_text
+ # TODO Set error_msg at instance_nets instead of instance VMs
+
+ # Create SDN port
+ sdn_net_id = task_interface.get("sdn_net_id")
+ if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
+ sdn_port_name = sdn_net_id + "." + task["vim_id"]
+ sdn_port_name = sdn_port_name[:63]
+ try:
+ with self.db_lock:
+ sdn_port_id = self.ovim.new_external_port(
+ {"compute_node": interface["compute_node"],
+ "pci": interface["pci"],
+ "vlan": interface.get("vlan"),
+ "net_id": sdn_net_id,
+ "region": self.vim["config"]["datacenter_id"],
+ "name": sdn_port_name,
+ "mac": interface.get("mac_address")})
+ task_interface["sdn_port_id"] = sdn_port_id
+ task_need_update = True
+ except (ovimException, Exception) as e:
+ error_text = "ovimException creating new_external_port compute_node={}"\
+ " pci={} vlan={} {}".format(
+ interface["compute_node"],
+ interface["pci"],
+ interface.get("vlan"), e)
+ self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
+ task_warning_msg += error_text
+ # TODO Set error_msg at instance_nets instead of instance VMs
+
+ with self.db_lock:
+ self.db.update_rows(
+ 'instance_interfaces',
+ UPDATE={"mac_address": interface.get("mac_address"),
+ "ip_address": interface.get("ip_address"),
+ "vim_info": interface.get("vim_info"),
+ "sdn_port_id": task_interface.get("sdn_port_id"),
+ "compute_node": interface.get("compute_node"),
+ "pci": interface.get("pci"),
+ "vlan": interface.get("vlan")},
+ WHERE={'uuid': task_interface["iface_id"]})
+ task["vim_interfaces"][vim_interface_id] = interface
+
+ # check and update task and instance_vms database
+ vim_info_error_msg = None
+ if vim_info.get("error_msg"):
+ vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
+ elif task_warning_msg:
+ vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
+ task_vim_info = task.get("vim_info")
+ task_error_msg = task.get("error_msg")
+ task_vim_status = task["extra"].get("vim_status")
+ if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
+ (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
+ temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
+ if vim_info.get("vim_info"):
+ temp_dict["vim_info"] = vim_info["vim_info"]
+ with self.db_lock:
+ self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
+ task["extra"]["vim_status"] = vim_info["status"]
+ task["error_msg"] = vim_info_error_msg
+ if vim_info.get("vim_info"):
+ task["vim_info"] = vim_info["vim_info"]
+ task_need_update = True
+
+ if task_need_update:
with self.db_lock:
self.db.update_rows(
- 'instance_interfaces',
- UPDATE={"mac_address": interface.get("mac_address"),
- "ip_address": interface.get("ip_address"),
- "vim_info": interface.get("vim_info"),
- "sdn_port_id": task_interface.get("sdn_port_id"),
- "compute_node": interface.get("compute_node"),
- "pci": interface.get("pci"),
- "vlan": interface.get("vlan")},
- WHERE={'uuid': task_interface["iface_id"]})
- task["vim_interfaces"][vim_interface_id] = interface
-
- # check and update task and instance_vms database
- if vim_info.get("error_msg"):
- vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
- elif task_warning_msg:
- vim_info["error_msg"] = self._format_vim_error_msg(task_warning_msg)
-
- task_vim_info = task.get("vim_info")
- task_error_msg = task.get("error_msg")
- task_vim_status = task["extra"].get("vim_status")
- if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
- (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
- temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
- if vim_info.get("vim_info"):
- temp_dict["vim_info"] = vim_info["vim_info"]
- with self.db_lock:
- self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
- task["extra"]["vim_status"] = vim_info["status"]
- task["error_msg"] = vim_info.get("error_msg")
- if vim_info.get("vim_info"):
- task["vim_info"] = vim_info["vim_info"]
- task_need_update = True
-
- if task_need_update:
- with self.db_lock:
- self.db.update_rows(
- 'vim_actions',
- UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
- "error_msg": task.get("error_msg"), "modified_at": now},
- WHERE={'instance_action_id': task['instance_action_id'],
- 'task_index': task['task_index']})
- if task["extra"].get("vim_status") == "BUILD":
- self._insert_refresh(task, now + self.REFRESH_BUILD)
- else:
- self._insert_refresh(task, now + self.REFRESH_ACTIVE)
+ 'vim_actions',
+ UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
+ "error_msg": task.get("error_msg"), "modified_at": now},
+ WHERE={'instance_action_id': task['instance_action_id'],
+ 'task_index': task['task_index']})
+ if task["extra"].get("vim_status") == "BUILD":
+ self._insert_refresh(task, now + self.REFRESH_BUILD)
+ else:
+ self._insert_refresh(task, now + self.REFRESH_ACTIVE)
if net_to_refresh_list:
now = time.time()
for vim_id, vim_info in vim_dict.items():
# look for task
- task = net_to_refresh_dict[vim_id]
- task_id = task["instance_action_id"] + "." + str(task["task_index"])
- self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
-
- task_vim_info = task.get("vim_info")
- task_vim_status = task["extra"].get("vim_status")
- task_error_msg = task.get("error_msg")
- task_sdn_net_id = task["extra"].get("sdn_net_id")
-
- # get ovim status
- if task_sdn_net_id:
- try:
+ for task in net_to_refresh_dict[vim_id]:
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
+
+ task_vim_info = task.get("vim_info")
+ task_vim_status = task["extra"].get("vim_status")
+ task_error_msg = task.get("error_msg")
+ task_sdn_net_id = task["extra"].get("sdn_net_id")
+
+ vim_info_status = vim_info["status"]
+ vim_info_error_msg = vim_info.get("error_msg")
+ # get ovim status
+ if task_sdn_net_id:
+ try:
+ with self.db_lock:
+ sdn_net = self.ovim.show_network(task_sdn_net_id)
+ except (ovimException, Exception) as e:
+ text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
+ self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
+ sdn_net = {"status": "ERROR", "error_msg": text_error}
+ if sdn_net["status"] == "ERROR":
+ if not vim_info_error_msg:
+ vim_info_error_msg = sdn_net["error_msg"]
+ else:
+ vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
+ self._format_vim_error_msg(vim_info_error_msg, 1024//2-14),
+ self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
+ if vim_info_status == "VIM_ERROR":
+ vim_info_status = "VIM_SDN_ERROR"
+ else:
+ vim_info_status = "SDN_ERROR"
+
+ # update database
+ if vim_info_error_msg:
+ vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
+ if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
+ (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
+ task["extra"]["vim_status"] = vim_info_status
+ task["error_msg"] = vim_info_error_msg
+ if vim_info.get("vim_info"):
+ task["vim_info"] = vim_info["vim_info"]
+ temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg}
+ if vim_info.get("vim_info"):
+ temp_dict["vim_info"] = vim_info["vim_info"]
with self.db_lock:
- sdn_net = self.ovim.show_network(task_sdn_net_id)
- except (ovimException, Exception) as e:
- text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
- self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
- sdn_net = {"status": "ERROR", "error_msg": text_error}
- if sdn_net["status"] == "ERROR":
- if not vim_info.get("error_msg"):
- vim_info["error_msg"] = sdn_net["error_msg"]
- else:
- vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format(
- self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14),
- self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14))
- if vim_info["status"] == "VIM_ERROR":
- vim_info["status"] = "VIM_SDN_ERROR"
- else:
- vim_info["status"] = "SDN_ERROR"
-
- # update database
- if vim_info.get("error_msg"):
- vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"])
- if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \
- (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
- task["extra"]["vim_status"] = vim_info["status"]
- task["error_msg"] = vim_info.get("error_msg")
- if vim_info.get("vim_info"):
- task["vim_info"] = vim_info["vim_info"]
- temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")}
- if vim_info.get("vim_info"):
- temp_dict["vim_info"] = vim_info["vim_info"]
- with self.db_lock:
- self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
- self.db.update_rows(
- 'vim_actions',
- UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
- "error_msg": task.get("error_msg"), "modified_at": now},
- WHERE={'instance_action_id': task['instance_action_id'],
- 'task_index': task['task_index']})
- if task["extra"].get("vim_status") == "BUILD":
- self._insert_refresh(task, now + self.REFRESH_BUILD)
- else:
- self._insert_refresh(task, now + self.REFRESH_ACTIVE)
+ self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]})
+ self.db.update_rows(
+ 'vim_actions',
+ UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256),
+ "error_msg": task.get("error_msg"), "modified_at": now},
+ WHERE={'instance_action_id': task['instance_action_id'],
+ 'task_index': task['task_index']})
+ if task["extra"].get("vim_status") == "BUILD":
+ self._insert_refresh(task, now + self.REFRESH_BUILD)
+ else:
+ self._insert_refresh(task, now + self.REFRESH_ACTIVE)
return nb_processed