X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fvim_thread.py;h=e82e37e8183f655e43f7d4fc8dd3ca2bae98e1ab;hb=1beea8613e1b0d20024da57d29aa3144f4ec2c10;hp=cbcd31f7d6b7675241efdccf941f051214ccd0ca;hpb=a6eff0a68fb60108545b81215f9551097f4bfd40;p=osm%2FRO.git diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index cbcd31f7..e82e37e8 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -37,6 +37,7 @@ The task content is (M: stored at memory, D: stored at database): params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken from other related tasks find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains the FIND params depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends on a net creation + can contain an int (single index on the same instance-action) or str (compete action ID) sdn_net_id: used for net. tries: interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database @@ -64,6 +65,7 @@ import vimconn import yaml from db_base import db_base_Exception from lib_osm_openvim.ovim import ovimException +from copy import deepcopy __author__ = "Alfonso Tierno, Pablo Montes" __date__ = "$28-Sep-2017 12:07:15$" @@ -223,11 +225,17 @@ class vim_thread(threading.Thread): nb_processed += 1 self.refresh_tasks.pop(0) if task["item"] == 'instance_vms': - vm_to_refresh_list.append(task["vim_id"]) - vm_to_refresh_dict[task["vim_id"]] = task + if task["vim_id"] not in vm_to_refresh_dict: + vm_to_refresh_dict[task["vim_id"]] = [task] + vm_to_refresh_list.append(task["vim_id"]) + else: + vm_to_refresh_dict[task["vim_id"]].append(task) elif task["item"] == 'instance_nets': - net_to_refresh_list.append(task["vim_id"]) - net_to_refresh_dict[task["vim_id"]] = task + if task["vim_id"] not in net_to_refresh_dict: + net_to_refresh_dict[task["vim_id"]] = [task] + net_to_refresh_list.append(task["vim_id"]) + else: + net_to_refresh_dict[task["vim_id"]].append(task) else: task_id = task["instance_action_id"] + "." + str(task["task_index"]) self.logger.critical("task={}: unknown task {}".format(task_id, task["item"]), exc_info=True) @@ -247,111 +255,112 @@ class vim_thread(threading.Thread): vim_dict[vim_id] = {"status": "VIM_ERROR", "error_msg": str(e)} for vim_id, vim_info in vim_dict.items(): - # look for task - task_need_update = False - task = vm_to_refresh_dict[vim_id] - task_id = task["instance_action_id"] + "." + str(task["task_index"]) - self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info)) - - # check and update interfaces - task_warning_msg = "" - for interface in vim_info.get("interfaces", ()): - vim_interface_id = interface["vim_interface_id"] - if vim_interface_id not in task["extra"]["interfaces"]: - self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format( - task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True) - continue - task_interface = task["extra"]["interfaces"][vim_interface_id] - task_vim_interface = task["vim_interfaces"].get(vim_interface_id) - if task_vim_interface != interface: - # delete old port - if task_interface.get("sdn_port_id"): - try: - with self.db_lock: - self.ovim.delete_port(task_interface["sdn_port_id"]) - task_interface["sdn_port_id"] = None - task_need_update = True - except ovimException as e: - error_text = "ovimException deleting external_port={}: {}".format( - task_interface["sdn_port_id"], e) - self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) - task_warning_msg += error_text - # TODO Set error_msg at instance_nets instead of instance VMs - - # Create SDN port - sdn_net_id = task_interface.get("sdn_net_id") - if sdn_net_id and interface.get("compute_node") and interface.get("pci"): - sdn_port_name = sdn_net_id + "." + task["vim_id"] - sdn_port_name = sdn_port_name[:63] - try: - with self.db_lock: - sdn_port_id = self.ovim.new_external_port( - {"compute_node": interface["compute_node"], - "pci": interface["pci"], - "vlan": interface.get("vlan"), - "net_id": sdn_net_id, - "region": self.vim["config"]["datacenter_id"], - "name": sdn_port_name, - "mac": interface.get("mac_address")}) - task_interface["sdn_port_id"] = sdn_port_id - task_need_update = True - except (ovimException, Exception) as e: - error_text = "ovimException creating new_external_port compute_node={}"\ - " pci={} vlan={} {}".format( - interface["compute_node"], - interface["pci"], - interface.get("vlan"), e) - self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) - task_warning_msg += error_text - # TODO Set error_msg at instance_nets instead of instance VMs + # look for task + for task in vm_to_refresh_dict[vim_id]: + task_need_update = False + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info)) + + # check and update interfaces + task_warning_msg = "" + for interface in vim_info.get("interfaces", ()): + vim_interface_id = interface["vim_interface_id"] + if vim_interface_id not in task["extra"]["interfaces"]: + self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format( + task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True) + continue + task_interface = task["extra"]["interfaces"][vim_interface_id] + task_vim_interface = task["vim_interfaces"].get(vim_interface_id) + if task_vim_interface != interface: + # delete old port + if task_interface.get("sdn_port_id"): + try: + with self.db_lock: + self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True) + task_interface["sdn_port_id"] = None + task_need_update = True + except ovimException as e: + error_text = "ovimException deleting external_port={}: {}".format( + task_interface["sdn_port_id"], e) + self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) + task_warning_msg += error_text + # TODO Set error_msg at instance_nets instead of instance VMs + + # Create SDN port + sdn_net_id = task_interface.get("sdn_net_id") + if sdn_net_id and interface.get("compute_node") and interface.get("pci"): + sdn_port_name = sdn_net_id + "." + task["vim_id"] + sdn_port_name = sdn_port_name[:63] + try: + with self.db_lock: + sdn_port_id = self.ovim.new_external_port( + {"compute_node": interface["compute_node"], + "pci": interface["pci"], + "vlan": interface.get("vlan"), + "net_id": sdn_net_id, + "region": self.vim["config"]["datacenter_id"], + "name": sdn_port_name, + "mac": interface.get("mac_address")}) + task_interface["sdn_port_id"] = sdn_port_id + task_need_update = True + except (ovimException, Exception) as e: + error_text = "ovimException creating new_external_port compute_node={}"\ + " pci={} vlan={} {}".format( + interface["compute_node"], + interface["pci"], + interface.get("vlan"), e) + self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) + task_warning_msg += error_text + # TODO Set error_msg at instance_nets instead of instance VMs + + with self.db_lock: + self.db.update_rows( + 'instance_interfaces', + UPDATE={"mac_address": interface.get("mac_address"), + "ip_address": interface.get("ip_address"), + "vim_info": interface.get("vim_info"), + "sdn_port_id": task_interface.get("sdn_port_id"), + "compute_node": interface.get("compute_node"), + "pci": interface.get("pci"), + "vlan": interface.get("vlan")}, + WHERE={'uuid': task_interface["iface_id"]}) + task["vim_interfaces"][vim_interface_id] = interface + + # check and update task and instance_vms database + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg) + elif task_warning_msg: + vim_info_error_msg = self._format_vim_error_msg(task_warning_msg) + task_vim_info = task.get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + temp_dict = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + temp_dict["vim_info"] = vim_info["vim_info"] + with self.db_lock: + self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["vim_info"] = vim_info["vim_info"] + task_need_update = True + + if task_need_update: with self.db_lock: self.db.update_rows( - 'instance_interfaces', - UPDATE={"mac_address": interface.get("mac_address"), - "ip_address": interface.get("ip_address"), - "vim_info": interface.get("vim_info"), - "sdn_port_id": task_interface.get("sdn_port_id"), - "compute_node": interface.get("compute_node"), - "pci": interface.get("pci"), - "vlan": interface.get("vlan")}, - WHERE={'uuid': task_interface["iface_id"]}) - task["vim_interfaces"][vim_interface_id] = interface - - # check and update task and instance_vms database - if vim_info.get("error_msg"): - vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg) - elif task_warning_msg: - vim_info["error_msg"] = self._format_vim_error_msg(task_warning_msg) - - task_vim_info = task.get("vim_info") - task_error_msg = task.get("error_msg") - task_vim_status = task["extra"].get("vim_status") - if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \ - (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): - temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")} - if vim_info.get("vim_info"): - temp_dict["vim_info"] = vim_info["vim_info"] - with self.db_lock: - self.db.update_rows('instance_vms', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) - task["extra"]["vim_status"] = vim_info["status"] - task["error_msg"] = vim_info.get("error_msg") - if vim_info.get("vim_info"): - task["vim_info"] = vim_info["vim_info"] - task_need_update = True - - if task_need_update: - with self.db_lock: - self.db.update_rows( - 'vim_actions', - UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), - "error_msg": task.get("error_msg"), "modified_at": now}, - WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) - if task["extra"].get("vim_status") == "BUILD": - self._insert_refresh(task, now + self.REFRESH_BUILD) - else: - self._insert_refresh(task, now + self.REFRESH_ACTIVE) + 'vim_actions', + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), + "error_msg": task.get("error_msg"), "modified_at": now}, + WHERE={'instance_action_id': task['instance_action_id'], + 'task_index': task['task_index']}) + if task["extra"].get("vim_status") == "BUILD": + self._insert_refresh(task, now + self.REFRESH_BUILD) + else: + self._insert_refresh(task, now + self.REFRESH_ACTIVE) if net_to_refresh_list: now = time.time() @@ -366,60 +375,62 @@ class vim_thread(threading.Thread): for vim_id, vim_info in vim_dict.items(): # look for task - task = net_to_refresh_dict[vim_id] - task_id = task["instance_action_id"] + "." + str(task["task_index"]) - self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info)) - - task_vim_info = task.get("vim_info") - task_vim_status = task["extra"].get("vim_status") - task_error_msg = task.get("error_msg") - task_sdn_net_id = task["extra"].get("sdn_net_id") - - # get ovim status - if task_sdn_net_id: - try: + for task in net_to_refresh_dict[vim_id]: + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info)) + + task_vim_info = task.get("vim_info") + task_vim_status = task["extra"].get("vim_status") + task_error_msg = task.get("error_msg") + task_sdn_net_id = task["extra"].get("sdn_net_id") + + vim_info_status = vim_info["status"] + vim_info_error_msg = vim_info.get("error_msg") + # get ovim status + if task_sdn_net_id: + try: + with self.db_lock: + sdn_net = self.ovim.show_network(task_sdn_net_id) + except (ovimException, Exception) as e: + text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e) + self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True) + sdn_net = {"status": "ERROR", "last_error": text_error} + if sdn_net["status"] == "ERROR": + if not vim_info_error_msg: + vim_info_error_msg = str(sdn_net.get("last_error")) + else: + vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format( + self._format_vim_error_msg(vim_info_error_msg, 1024//2-14), + self._format_vim_error_msg(sdn_net["last_error"], 1024//2-14)) + vim_info_status = "ERROR" + elif sdn_net["status"] == "BUILD": + if vim_info_status == "ACTIVE": + vim_info_status = "BUILD" + + # update database + if vim_info_error_msg: + vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg) + if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + task["extra"]["vim_status"] = vim_info_status + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["vim_info"] = vim_info["vim_info"] + temp_dict = {"status": vim_info_status, "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + temp_dict["vim_info"] = vim_info["vim_info"] with self.db_lock: - sdn_net = self.ovim.show_network(task_sdn_net_id) - except (ovimException, Exception) as e: - text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e) - self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True) - sdn_net = {"status": "ERROR", "error_msg": text_error} - if sdn_net["status"] == "ERROR": - if not vim_info.get("error_msg"): - vim_info["error_msg"] = sdn_net["error_msg"] - else: - vim_info["error_msg"] = "VIM_ERROR: {} && SDN_ERROR: {}".format( - self._format_vim_error_msg(vim_info["error_msg"], 1024//2-14), - self._format_vim_error_msg(sdn_net["error_msg"], 1024//2-14)) - if vim_info["status"] == "VIM_ERROR": - vim_info["status"] = "VIM_SDN_ERROR" - else: - vim_info["status"] = "SDN_ERROR" - - # update database - if vim_info.get("error_msg"): - vim_info["error_msg"] = self._format_vim_error_msg(vim_info["error_msg"]) - if task_vim_status != vim_info["status"] or task_error_msg != vim_info.get("error_msg") or \ - (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): - task["extra"]["vim_status"] = vim_info["status"] - task["error_msg"] = vim_info.get("error_msg") - if vim_info.get("vim_info"): - task["vim_info"] = vim_info["vim_info"] - temp_dict = {"status": vim_info["status"], "error_msg": vim_info.get("error_msg")} - if vim_info.get("vim_info"): - temp_dict["vim_info"] = vim_info["vim_info"] - with self.db_lock: - self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) - self.db.update_rows( - 'vim_actions', - UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), - "error_msg": task.get("error_msg"), "modified_at": now}, - WHERE={'instance_action_id': task['instance_action_id'], - 'task_index': task['task_index']}) - if task["extra"].get("vim_status") == "BUILD": - self._insert_refresh(task, now + self.REFRESH_BUILD) - else: - self._insert_refresh(task, now + self.REFRESH_ACTIVE) + self.db.update_rows('instance_nets', UPDATE=temp_dict, WHERE={"uuid": task["item_id"]}) + self.db.update_rows( + 'vim_actions', + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256), + "error_msg": task.get("error_msg"), "modified_at": now}, + WHERE={'instance_action_id': task['instance_action_id'], + 'task_index': task['task_index']}) + if task["extra"].get("vim_status") == "BUILD": + self._insert_refresh(task, now + self.REFRESH_BUILD) + else: + self._insert_refresh(task, now + self.REFRESH_ACTIVE) return nb_processed @@ -473,7 +484,7 @@ class vim_thread(threading.Thread): for task_index in task["extra"].get("depends_on", ()): task_dependency = task["depends"].get("TASK-" + str(task_index)) if not task_dependency: - task_dependency = self._look_for_task(task["instance_action_id"], "TASK-" + str(task_index)) + task_dependency = self._look_for_task(task["instance_action_id"], task_index) if not task_dependency: raise VimThreadException( "Cannot get depending net task trying to get depending task {}.{}".format( @@ -631,10 +642,19 @@ class vim_thread(threading.Thread): task["params"] = extra.get("params") depends_on_list = extra.get("depends_on") if depends_on_list: - for index in depends_on_list: + for dependency_task in depends_on_list: + if isinstance(dependency_task, int): + index = dependency_task + else: + instance_action_id, _, task_id = dependency_task.rpartition(".") + if instance_action_id != task["instance_action_id"]: + continue + index = int(task_id) + if index < len(vim_actions_list) and vim_actions_list[index]["task_index"] == index and\ vim_actions_list[index]["instance_action_id"] == task["instance_action_id"]: - task["depends"]["TASK-" + str(index)] = vim_actions_list[index] + task["depends"]["TASK-" + str(index)] = vim_actions_list[index] + task["depends"]["TASK-{}.{}".format(task["instance_action_id"], index)] = vim_actions_list[index] if extra.get("interfaces"): task["vim_interfaces"] = {} else: @@ -729,7 +749,24 @@ class vim_thread(threading.Thread): self.logger.debug("Finishing") def _look_for_task(self, instance_action_id, task_id): - task_index = task_id.split("-")[-1] + """ + Look for a concrete task at vim_actions database table + :param instance_action_id: The instance_action_id + :param task_id: Can have several formats: + : integer + TASK- :backward compatibility, + [TASK-].: this instance_action_id overrides the one in the parameter + :return: Task dictionary or None if not found + """ + if isinstance(task_id, int): + task_index = task_id + else: + if task_id.startswith("TASK-"): + task_id = task_id[5:] + ins_action_id, _, task_index = task_id.rpartition(".") + if ins_action_id: + instance_action_id = ins_action_id + with self.db_lock: tasks = self.db.get_rows(FROM="vim_actions", WHERE={"instance_action_id": instance_action_id, "task_index": task_index}) @@ -775,11 +812,12 @@ class vim_thread(threading.Thread): "Cannot create VM because depends on a network not created or found: " + str(depends[net["net_id"]]["error_msg"])) net["net_id"] = network_id - vim_vm_id, created_items = self.vim.new_vminstance(*params) + params_copy = deepcopy(params) + vim_vm_id, created_items = self.vim.new_vminstance(*params_copy) # fill task_interfaces. Look for snd_net_id at database for each interface task_interfaces = {} - for iface in net_list: + for iface in params_copy[5]: task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]} with self.db_lock: result = self.db.get_rows( @@ -821,7 +859,7 @@ class vim_thread(threading.Thread): if iface.get("sdn_port_id"): try: with self.db_lock: - self.ovim.delete_port(iface["sdn_port_id"]) + self.ovim.delete_port(iface["sdn_port_id"], idempotent=True) except ovimException as e: self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format( task_id, iface["sdn_port_id"], e), exc_info=True) @@ -958,8 +996,8 @@ class vim_thread(threading.Thread): port_list = self.ovim.get_ports(columns={'uuid'}, filter={'name': 'external_port', 'net_id': sdn_net_id}) for port in port_list: - self.ovim.delete_port(port['uuid']) - self.ovim.delete_network(sdn_net_id) + self.ovim.delete_port(port['uuid'], idempotent=True) + self.ovim.delete_network(sdn_net_id, idempotent=True) if net_vim_id: self.vim.delete_network(net_vim_id) task["status"] = "DONE"