+ sdn_port_id = self.ovim.new_external_port(
+ {"compute_node": interface["compute_node"],
+ "pci": interface["pci"],
+ "vlan": interface.get("vlan"),
+ "net_id": sdn_net_id,
+ "region": self.vim["config"]["datacenter_id"],
+ "name": sdn_port_name,
+ "mac": interface.get("mac_address")})
+ task_interface["sdn_port_id"] = sdn_port_id
+ except (ovimException, Exception) as e:
+ error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
+ format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
+ self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
+ task_warning_msg += error_text
+ # TODO Set error_msg at instance_nets instead of instance VMs
+
+ self.db.update_rows('instance_interfaces',
+ UPDATE={"mac_address": interface.get("mac_address"),
+ "ip_address": interface.get("ip_address"),
+ "vim_interface_id": interface.get("vim_interface_id"),
+ "vim_info": interface.get("vim_info"),
+ "sdn_port_id": task_interface.get("sdn_port_id"),
+ "compute_node": interface.get("compute_node"),
+ "pci": interface.get("pci"),
+ "vlan": interface.get("vlan")},
+ WHERE={'uuid': task_interface["iface_id"]})
+ task_interface["vim_info"] = interface
+
+ # check and update task and instance_vms database
+ vim_info_error_msg = None
+ if vim_info.get("error_msg"):
+ vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
+ elif task_warning_msg:
+ vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
+ task_vim_info = task["extra"].get("vim_info")
+ task_error_msg = task.get("error_msg")
+ task_vim_status = task["extra"].get("vim_status")
+ if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
+ (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
+ database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
+ if vim_info.get("vim_info"):
+ database_update["vim_info"] = vim_info["vim_info"]
+
+ task["extra"]["vim_status"] = vim_info["status"]
+ task["error_msg"] = vim_info_error_msg
+ if vim_info.get("vim_info"):
+ task["extra"]["vim_info"] = vim_info["vim_info"]
+
+ return database_update
+
+ def _refres_net(self, task):
+ """Call VIM to get network status"""
+ database_update = None
+
+ vim_id = task["vim_id"]
+ net_to_refresh_list = [vim_id]
+ try:
+ vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
+ vim_info = vim_dict[vim_id]
+ except vimconn.vimconnException as e:
+ # Mark all tasks at VIM_ERROR status
+ self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
+ vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
+
+ task_vim_info = task["extra"].get("vim_info")
+ task_vim_status = task["extra"].get("vim_status")
+ task_error_msg = task.get("error_msg")
+ task_sdn_net_id = task["extra"].get("sdn_net_id")
+
+ vim_info_status = vim_info["status"]
+ vim_info_error_msg = vim_info.get("error_msg")
+ # get ovim status
+ if task_sdn_net_id:
+ try:
+ with self.db_lock:
+ sdn_net = self.ovim.show_network(task_sdn_net_id)
+ except (ovimException, Exception) as e:
+ text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
+ self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
+ sdn_net = {"status": "ERROR", "last_error": text_error}
+ if sdn_net["status"] == "ERROR":
+ if not vim_info_error_msg:
+ vim_info_error_msg = str(sdn_net.get("last_error"))
+ else:
+ vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
+ self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
+ self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
+ vim_info_status = "ERROR"
+ elif sdn_net["status"] == "BUILD":
+ if vim_info_status == "ACTIVE":
+ vim_info_status = "BUILD"
+
+ # update database
+ if vim_info_error_msg:
+ vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
+ if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
+ (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
+ task["extra"]["vim_status"] = vim_info_status
+ task["error_msg"] = vim_info_error_msg
+ if vim_info.get("vim_info"):
+ task["extra"]["vim_info"] = vim_info["vim_info"]
+ database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
+ if vim_info.get("vim_info"):
+ database_update["vim_info"] = vim_info["vim_info"]
+ return database_update
+
+ def _proccess_pending_tasks(self, task, related_tasks):
+ old_task_status = task["status"]
+ create_or_find = False # if as result of processing this task something is created or found
+ next_refresh = 0
+
+ try:
+ if task["status"] == "SCHEDULED":
+ # check if tasks that this depends on have been completed
+ dependency_not_completed = False
+ dependency_modified_at = 0
+ for task_index in task["extra"].get("depends_on", ()):
+ task_dependency = self._look_for_task(task["instance_action_id"], task_index)
+ if not task_dependency:
+ raise VimThreadException(
+ "Cannot get depending net task trying to get depending task {}.{}".format(
+ task["instance_action_id"], task_index))
+ # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
+ # database must be look again
+ if task_dependency["status"] == "SCHEDULED":
+ dependency_not_completed = True
+ dependency_modified_at = task_dependency["modified_at"]
+ break
+ elif task_dependency["status"] == "FAILED":
+ raise VimThreadException(
+ "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
+ task["action"], task["item"],
+ task["instance_action_id"], task["task_index"],
+ task_dependency["instance_action_id"], task_dependency["task_index"],
+ task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
+
+ task["depends"]["TASK-"+str(task_index)] = task_dependency
+ task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
+ if dependency_not_completed:
+ # Move this task to the time dependency is going to be modified plus 10 seconds.
+ self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
+ UPDATE={"worker": None},
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id,
+ "related": task["related"],
+ })
+ # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
+ # if task["extra"]["tries"] > 3:
+ # raise VimThreadException(
+ # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
+ # "(task {}.{})".format(task["action"], task["item"],
+ # task["instance_action_id"], task["task_index"],
+ # task_dependency["instance_action_id"], task_dependency["task_index"]
+ # task_dependency["action"], task_dependency["item"]))
+ return
+
+ database_update = None
+ if task["action"] == "DELETE":
+ deleted_needed = self._delete_task(task)
+ if not deleted_needed:
+ task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
+ task["error_msg"] = None
+
+ if task["status"] == "SUPERSEDED":
+ # not needed to do anything but update database with the new status
+ database_update = None
+ elif not self.vim:
+ task["status"] = "FAILED"
+ task["error_msg"] = self.error_status
+ database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+ elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
+ # Do nothing, just copy values from one to another and updata database
+ task["status"] = related_tasks[0]["status"]
+ task["error_msg"] = related_tasks[0]["error_msg"]
+ task["vim_id"] = related_tasks[0]["vim_id"]
+ extra = yaml.load(related_tasks[0]["extra"])
+ task["extra"]["vim_status"] = extra.get("vim_status")
+ next_refresh = related_tasks[0]["modified_at"] + 0.001
+ database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
+ "error_msg": task["error_msg"]}
+ if task["item"] == 'instance_vms':
+ database_update["vim_vm_id"] = task["vim_id"]
+ elif task["item"] == 'instance_nets':
+ database_update["vim_net_id"] = task["vim_id"]
+ elif task["item"] == 'instance_vms':
+ if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
+ database_update = self._refres_vm(task)
+ create_or_find = True
+ elif task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_vm(task)
+ elif task["action"] == "DELETE":
+ self.del_vm(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_nets':
+ if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
+ database_update = self._refres_net(task)
+ create_or_find = True
+ elif task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_net(task)
+ elif task["action"] == "DELETE":
+ self.del_net(task)
+ elif task["action"] == "FIND":
+ database_update = self.get_net(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_sfis':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_sfi(task)
+ elif task["action"] == "DELETE":
+ self.del_sfi(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_sfs':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_sf(task)
+ elif task["action"] == "DELETE":
+ self.del_sf(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_classifications':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_classification(task)
+ elif task["action"] == "DELETE":
+ self.del_classification(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_sfps':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_sfp(task)
+ elif task["action"] == "DELETE":
+ self.del_sfp(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
+ # TODO
+ except VimThreadException as e:
+ task["error_msg"] = str(e)
+ task["status"] = "FAILED"
+ database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+ if task["item"] == 'instance_vms':
+ database_update["vim_vm_id"] = None
+ elif task["item"] == 'instance_nets':
+ database_update["vim_net_id"] = None
+
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
+ task_id, task["item"], task["action"], task["status"],
+ task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
+ try:
+ if not next_refresh:
+ if task["status"] == "DONE":
+ next_refresh = time.time()
+ if task["extra"].get("vim_status") == "BUILD":
+ next_refresh += self.REFRESH_BUILD
+ elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
+ next_refresh += self.REFRESH_ERROR
+ elif task["extra"].get("vim_status") == "DELETED":
+ next_refresh += self.REFRESH_DELETE