+ def get_vimconnector(self):
+ try:
+ from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
+ select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
+ 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
+ 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
+ 'user', 'passwd', 'dt.config as dt_config')
+ where_ = {"dt.uuid": self.datacenter_tenant_id}
+ vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
+ vim = vims[0]
+ vim_config = {}
+ if vim["config"]:
+ vim_config.update(yaml.load(vim["config"]))
+ if vim["dt_config"]:
+ vim_config.update(yaml.load(vim["dt_config"]))
+ vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
+ vim_config['datacenter_id'] = vim.get('datacenter_id')
+
+ # get port_mapping
+ with self.db_lock:
+ vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
+ db_filter={"region": vim_config['datacenter_id'], "pci": None})
+
+ self.vim = vim_module[vim["type"]].vimconnector(
+ uuid=vim['datacenter_id'], name=vim['datacenter_name'],
+ tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
+ url=vim['vim_url'], url_admin=vim['vim_url_admin'],
+ user=vim['user'], passwd=vim['passwd'],
+ config=vim_config, persistent_info=self.vim_persistent_info
+ )
+ self.error_status = None
+ except Exception as e:
+ self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
+ self.vim = None
+ self.error_status = "Error loading vimconnector: {}".format(e)
+
+ def _get_db_task(self):
+ """
+ Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
+ :return: None
+ """
+ now = time.time()
+ try:
+ database_limit = 20
+ task_related = None
+ while True:
+ # get 20 (database_limit) entries each time
+ vim_actions = self.db.get_rows(FROM="vim_wim_actions",
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "status": ['SCHEDULED', 'BUILD', 'DONE'],
+ "worker": [None, self.my_id], "modified_at<=": now
+ },
+ ORDER_BY=("modified_at", "created_at",),
+ LIMIT=database_limit)
+ if not vim_actions:
+ return None, None
+ # if vim_actions[0]["modified_at"] > now:
+ # return int(vim_actions[0] - now)
+ for task in vim_actions:
+ # block related task
+ if task_related == task["related"]:
+ continue # ignore if a locking has already tried for these task set
+ task_related = task["related"]
+ # lock ...
+ self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "worker": [None, self.my_id],
+ "related": task_related,
+ "item": task["item"],
+ })
+ # ... and read all related and check if locked
+ related_tasks = self.db.get_rows(FROM="vim_wim_actions",
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "related": task_related,
+ "item": task["item"],
+ },
+ ORDER_BY=("created_at",))
+ # check that all related tasks have been locked. If not release and try again. It can happen
+ # for race conditions if a new related task has been inserted by nfvo in the process
+ some_tasks_locked = False
+ some_tasks_not_locked = False
+ creation_task = None
+ for relate_task in related_tasks:
+ if relate_task["worker"] != self.my_id:
+ some_tasks_not_locked = True
+ else:
+ some_tasks_locked = True
+ if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
+ creation_task = relate_task
+ if some_tasks_not_locked:
+ if some_tasks_locked: # unlock
+ self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "worker": self.my_id,
+ "related": task_related,
+ "item": task["item"],
+ })
+ continue
+
+ # task of creation must be the first in the list of related_task
+ assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
+
+ if task["extra"]:
+ extra = yaml.load(task["extra"])
+ else:
+ extra = {}
+ task["extra"] = extra
+ if extra.get("depends_on"):
+ task["depends"] = {}
+ if extra.get("params"):
+ task["params"] = deepcopy(extra["params"])
+ return task, related_tasks
+ except Exception as e:
+ self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
+ return None, None
+
+ def _delete_task(self, task):
+ """
+ Determine if this task need to be done or superseded
+ :return: None
+ """
+
+ def copy_extra_created(copy_to, copy_from):
+ copy_to["created"] = copy_from["created"]
+ if copy_from.get("sdn_net_id"):
+ copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
+ if copy_from.get("interfaces"):
+ copy_to["interfaces"] = copy_from["interfaces"]
+ if copy_from.get("created_items"):
+ if not copy_to.get("created_items"):
+ copy_to["created_items"] = {}
+ copy_to["created_items"].update(copy_from["created_items"])
+
+ task_create = None
+ dependency_task = None
+ deletion_needed = False
+ if task["status"] == "FAILED":
+ return # TODO need to be retry??
+ try:
+ # get all related tasks
+ related_tasks = self.db.get_rows(FROM="vim_wim_actions",
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
+ "action": ["FIND", "CREATE"],
+ "related": task["related"],
+ },
+ ORDER_BY=("created_at",),
+ )
+ for related_task in related_tasks:
+ if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
+ task_create = related_task
+ # TASK_CREATE
+ if related_task["extra"]:
+ extra_created = yaml.load(related_task["extra"])
+ if extra_created.get("created"):
+ deletion_needed = True
+ related_task["extra"] = extra_created
+ elif not dependency_task:
+ dependency_task = related_task
+ if task_create and dependency_task:
+ break
+
+ # mark task_create as FINISHED
+ self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "instance_action_id": task_create["instance_action_id"],
+ "task_index": task_create["task_index"]
+ })
+ if not deletion_needed:
+ return
+ elif dependency_task:
+ # move create information from task_create to relate_task
+ extra_new_created = yaml.load(dependency_task["extra"]) or {}
+ extra_new_created["created"] = extra_created["created"]
+ copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
+
+ self.db.update_rows("vim_wim_actions",
+ UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
+ width=256),
+ "vim_id": task_create.get("vim_id")},
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "instance_action_id": dependency_task["instance_action_id"],
+ "task_index": dependency_task["task_index"]
+ })
+ return False
+ else:
+ task["vim_id"] = task_create["vim_id"]
+ copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
+ return True
+
+ except Exception as e:
+ self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
+
+ def _refres_vm(self, task):
+ """Call VIM to get VMs status"""
+ database_update = None
+
+ vim_id = task["vim_id"]
+ vm_to_refresh_list = [vim_id]
+ try:
+ vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
+ vim_info = vim_dict[vim_id]
+ except vimconn.vimconnException as e:
+ # Mark all tasks at VIM_ERROR status
+ self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
+ vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
+
+ # check and update interfaces
+ task_warning_msg = ""
+ for interface in vim_info.get("interfaces", ()):
+ vim_interface_id = interface["vim_interface_id"]
+ if vim_interface_id not in task["extra"]["interfaces"]:
+ self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
+ task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
+ continue
+ task_interface = task["extra"]["interfaces"][vim_interface_id]
+ task_vim_interface = task_interface.get("vim_info")
+ if task_vim_interface != interface:
+ # delete old port
+ if task_interface.get("sdn_port_id"):
+ try:
+ with self.db_lock:
+ self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
+ task_interface["sdn_port_id"] = None
+ except ovimException as e:
+ error_text = "ovimException deleting external_port={}: {}".format(
+ task_interface["sdn_port_id"], e)
+ self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
+ task_warning_msg += error_text
+ # TODO Set error_msg at instance_nets instead of instance VMs
+
+ # Create SDN port
+ sdn_net_id = task_interface.get("sdn_net_id")
+ if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
+ sdn_port_name = sdn_net_id + "." + task["vim_id"]
+ sdn_port_name = sdn_port_name[:63]
+ try:
+ with self.db_lock:
+ sdn_port_id = self.ovim.new_external_port(
+ {"compute_node": interface["compute_node"],
+ "pci": interface["pci"],
+ "vlan": interface.get("vlan"),
+ "net_id": sdn_net_id,
+ "region": self.vim["config"]["datacenter_id"],
+ "name": sdn_port_name,
+ "mac": interface.get("mac_address")})
+ task_interface["sdn_port_id"] = sdn_port_id
+ except (ovimException, Exception) as e:
+ error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
+ format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
+ self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
+ task_warning_msg += error_text
+ # TODO Set error_msg at instance_nets instead of instance VMs
+
+ self.db.update_rows('instance_interfaces',
+ UPDATE={"mac_address": interface.get("mac_address"),
+ "ip_address": interface.get("ip_address"),
+ "vim_interface_id": interface.get("vim_interface_id"),
+ "vim_info": interface.get("vim_info"),
+ "sdn_port_id": task_interface.get("sdn_port_id"),
+ "compute_node": interface.get("compute_node"),
+ "pci": interface.get("pci"),
+ "vlan": interface.get("vlan")},
+ WHERE={'uuid': task_interface["iface_id"]})
+ task_interface["vim_info"] = interface
+
+ # check and update task and instance_vms database
+ vim_info_error_msg = None
+ if vim_info.get("error_msg"):
+ vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
+ elif task_warning_msg:
+ vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
+ task_vim_info = task["extra"].get("vim_info")
+ task_error_msg = task.get("error_msg")
+ task_vim_status = task["extra"].get("vim_status")
+ if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
+ (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
+ database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
+ if vim_info.get("vim_info"):
+ database_update["vim_info"] = vim_info["vim_info"]
+
+ task["extra"]["vim_status"] = vim_info["status"]
+ task["error_msg"] = vim_info_error_msg
+ if vim_info.get("vim_info"):
+ task["extra"]["vim_info"] = vim_info["vim_info"]
+
+ return database_update
+
+ def _refres_net(self, task):
+ """Call VIM to get network status"""
+ database_update = None
+
+ vim_id = task["vim_id"]
+ net_to_refresh_list = [vim_id]
+ try:
+ vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
+ vim_info = vim_dict[vim_id]
+ except vimconn.vimconnException as e:
+ # Mark all tasks at VIM_ERROR status
+ self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
+ vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
+
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
+
+ task_vim_info = task["extra"].get("vim_info")
+ task_vim_status = task["extra"].get("vim_status")
+ task_error_msg = task.get("error_msg")
+ task_sdn_net_id = task["extra"].get("sdn_net_id")
+
+ vim_info_status = vim_info["status"]
+ vim_info_error_msg = vim_info.get("error_msg")
+ # get ovim status
+ if task_sdn_net_id:
+ try:
+ with self.db_lock:
+ sdn_net = self.ovim.show_network(task_sdn_net_id)
+ except (ovimException, Exception) as e:
+ text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
+ self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
+ sdn_net = {"status": "ERROR", "last_error": text_error}
+ if sdn_net["status"] == "ERROR":
+ if not vim_info_error_msg:
+ vim_info_error_msg = str(sdn_net.get("last_error"))
+ else:
+ vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
+ self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
+ self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
+ vim_info_status = "ERROR"
+ elif sdn_net["status"] == "BUILD":
+ if vim_info_status == "ACTIVE":
+ vim_info_status = "BUILD"
+
+ # update database
+ if vim_info_error_msg:
+ vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
+ if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
+ (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
+ task["extra"]["vim_status"] = vim_info_status
+ task["error_msg"] = vim_info_error_msg
+ if vim_info.get("vim_info"):
+ task["extra"]["vim_info"] = vim_info["vim_info"]
+ database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
+ if vim_info.get("vim_info"):
+ database_update["vim_info"] = vim_info["vim_info"]
+ return database_update
+
+ def _proccess_pending_tasks(self, task, related_tasks):
+ old_task_status = task["status"]
+ create_or_find = False # if as result of processing this task something is created or found
+ next_refresh = 0
+
+ try:
+ if task["status"] == "SCHEDULED":
+ # check if tasks that this depends on have been completed
+ dependency_not_completed = False
+ dependency_modified_at = 0
+ for task_index in task["extra"].get("depends_on", ()):
+ task_dependency = self._look_for_task(task["instance_action_id"], task_index)
+ if not task_dependency:
+ raise VimThreadException(
+ "Cannot get depending net task trying to get depending task {}.{}".format(
+ task["instance_action_id"], task_index))
+ # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
+ # database must be look again
+ if task_dependency["status"] == "SCHEDULED":
+ dependency_not_completed = True
+ dependency_modified_at = task_dependency["modified_at"]
+ break
+ elif task_dependency["status"] == "FAILED":
+ raise VimThreadException(
+ "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
+ task["action"], task["item"],
+ task["instance_action_id"], task["task_index"],
+ task_dependency["instance_action_id"], task_dependency["task_index"],
+ task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
+
+ task["depends"]["TASK-"+str(task_index)] = task_dependency
+ task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
+ if dependency_not_completed:
+ # Move this task to the time dependency is going to be modified plus 10 seconds.
+ self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
+ UPDATE={"worker": None},
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id,
+ "related": task["related"],
+ })
+ # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
+ # if task["extra"]["tries"] > 3:
+ # raise VimThreadException(
+ # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
+ # "(task {}.{})".format(task["action"], task["item"],
+ # task["instance_action_id"], task["task_index"],
+ # task_dependency["instance_action_id"], task_dependency["task_index"]
+ # task_dependency["action"], task_dependency["item"]))
+ return
+
+ database_update = None
+ if task["action"] == "DELETE":
+ deleted_needed = self._delete_task(task)
+ if not deleted_needed:
+ task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
+ task["error_msg"] = None
+
+ if task["status"] == "SUPERSEDED":
+ # not needed to do anything but update database with the new status
+ database_update = None
+ elif not self.vim:
+ task["status"] = "FAILED"
+ task["error_msg"] = self.error_status
+ database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+ elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
+ # Do nothing, just copy values from one to another and updata database
+ task["status"] = related_tasks[0]["status"]
+ task["error_msg"] = related_tasks[0]["error_msg"]
+ task["vim_id"] = related_tasks[0]["vim_id"]
+ extra = yaml.load(related_tasks[0]["extra"])
+ task["extra"]["vim_status"] = extra["vim_status"]
+ next_refresh = related_tasks[0]["modified_at"] + 0.001
+ database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
+ "error_msg": task["error_msg"]}
+ if task["item"] == 'instance_vms':
+ database_update["vim_vm_id"] = task["vim_id"]
+ elif task["item"] == 'instance_nets':
+ database_update["vim_net_id"] = task["vim_id"]
+ elif task["item"] == 'instance_vms':
+ if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
+ database_update = self._refres_vm(task)
+ create_or_find = True
+ elif task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_vm(task)
+ elif task["action"] == "DELETE":
+ self.del_vm(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_nets':
+ if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
+ database_update = self._refres_net(task)
+ create_or_find = True
+ elif task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_net(task)
+ elif task["action"] == "DELETE":
+ self.del_net(task)
+ elif task["action"] == "FIND":
+ database_update = self.get_net(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_sfis':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_sfi(task)
+ elif task["action"] == "DELETE":
+ self.del_sfi(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_sfs':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_sf(task)
+ elif task["action"] == "DELETE":
+ self.del_sf(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_classifications':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_classification(task)
+ elif task["action"] == "DELETE":
+ self.del_classification(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ elif task["item"] == 'instance_sfps':
+ if task["action"] == "CREATE":
+ create_or_find = True
+ database_update = self.new_sfp(task)
+ elif task["action"] == "DELETE":
+ self.del_sfp(task)
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+ else:
+ raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
+ # TODO
+ except VimThreadException as e:
+ task["error_msg"] = str(e)
+ task["status"] = "FAILED"
+ database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
+ if task["item"] == 'instance_vms':
+ database_update["vim_vm_id"] = None
+ elif task["item"] == 'instance_nets':
+ database_update["vim_net_id"] = None
+
+ task_id = task["instance_action_id"] + "." + str(task["task_index"])
+ self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
+ task_id, task["item"], task["action"], task["status"],
+ task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
+ try:
+ if not next_refresh:
+ if task["status"] == "DONE":
+ next_refresh = time.time()
+ if task["extra"].get("vim_status") == "BUILD":
+ next_refresh += self.REFRESH_BUILD
+ elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
+ next_refresh += self.REFRESH_ERROR
+ elif task["extra"].get("vim_status") == "DELETED":
+ next_refresh += self.REFRESH_DELETE
+ else:
+ next_refresh += self.REFRESH_ACTIVE
+ elif task["status"] == "FAILED":
+ next_refresh = time.time() + self.REFRESH_DELETE
+
+ if create_or_find:
+ # modify all related task with action FIND/CREATED non SCHEDULED
+ self.db.update_rows(
+ table="vim_wim_actions", modified_time=next_refresh + 0.001,
+ UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
+ "error_msg": task["error_msg"],
+ },
+
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "worker": self.my_id,
+ "action": ["FIND", "CREATE"],
+ "related": task["related"],
+ "status<>": "SCHEDULED",
+ })
+ # modify own task
+ self.db.update_rows(
+ table="vim_wim_actions", modified_time=next_refresh,
+ UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
+ "error_msg": task["error_msg"],
+ "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
+ WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
+ # Unlock tasks
+ self.db.update_rows(
+ table="vim_wim_actions", modified_time=0,
+ UPDATE={"worker": None},
+ WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
+ "worker": self.my_id,
+ "related": task["related"],
+ })
+
+ # Update table instance_actions
+ if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
+ self.db.update_rows(
+ table="instance_actions",
+ UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
+ WHERE={"uuid": task["instance_action_id"]})
+ if database_update:
+ self.db.update_rows(table=task["item"],
+ UPDATE=database_update,
+ WHERE={"related": task["related"]})
+ except db_base_Exception as e:
+ self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
+