X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=RO%2Fosm_ro%2Fvim_thread.py;h=7174fe67beae74bb63430108ec3ead7eed21f323;hp=587c0600c899d8f497e66beaf8fcb1f78f7a1306;hb=187b52ae9922b3b3170080ec78958d7933bde813;hpb=4126d05e24ada55226bb13a9d556655811cedadc diff --git a/RO/osm_ro/vim_thread.py b/RO/osm_ro/vim_thread.py index 587c0600..7174fe67 100644 --- a/RO/osm_ro/vim_thread.py +++ b/RO/osm_ro/vim_thread.py @@ -80,8 +80,8 @@ import threading import time import queue import logging -from osm_ro import vimconn -from osm_ro.wim.sdnconn import SdnConnectorError +from osm_ro_plugin import vimconn +from osm_ro_plugin.sdnconn import SdnConnectorError import yaml from osm_ro.db_base import db_base_Exception from http import HTTPStatus @@ -126,7 +126,7 @@ class vim_thread(threading.Thread): self.error_status = None self.wim_account_id = wim_account_id self.datacenter_tenant_id = datacenter_tenant_id - self.port_mapping = None + self.port_mappings = None if self.wim_account_id: self.target_k = "wim_account_id" self.target_v = self.wim_account_id @@ -156,12 +156,12 @@ class vim_thread(threading.Thread): raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc def _proccess_vim_exception(self, exc): - if isinstance(exc, vimconn.vimconnException): + if isinstance(exc, vimconn.VimConnException): raise else: self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc), exc_info=True) - raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc + raise vimconn.VimConnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc def get_vim_sdn_connector(self): if self.datacenter_tenant_id: @@ -188,7 +188,7 @@ class vim_thread(threading.Thread): # vim_config["wim_external_ports"] = [x for x in vim_port_mappings # if x["service_mapping_info"].get("wim")] self.plugin_name = "rovim_" + vim["type"] - self.vim = self.plugins[self.plugin_name].vimconnector( + self.vim = self.plugins[self.plugin_name]( uuid=vim['datacenter_id'], name=vim['datacenter_name'], tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'], url=vim['vim_url'], url_admin=vim['vim_url_admin'], @@ -224,9 +224,9 @@ class vim_thread(threading.Thread): self.wim_account_id, self.plugin_name)) except Exception as e: self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format( - self.wim_account_id, self.plugin_name, e)) + self.wim_account_id, self.plugin_name, e), exc_info=True) self.sdnconnector = None - self.error_status = "Error loading sdn connector: {}".format(e) + self.error_status = self._format_vim_error_msg("Error loading sdn connector: {}".format(e)) def _get_db_task(self): """ @@ -293,9 +293,6 @@ class vim_thread(threading.Thread): }) continue - # task of creation must be the first in the list of related_task - assert(related_tasks[0]["action"] in ("CREATE", "FIND")) - task["params"] = None if task["extra"]: extra = yaml.load(task["extra"], Loader=yaml.Loader) @@ -323,6 +320,8 @@ class vim_thread(threading.Thread): copy_to["sdn_net_id"] = copy_from["sdn_net_id"] if copy_from.get("interfaces"): copy_to["interfaces"] = copy_from["interfaces"] + if copy_from.get("sdn-ports"): + copy_to["sdn-ports"] = copy_from["sdn-ports"] if copy_from.get("created_items"): if not copy_to.get("created_items"): copy_to["created_items"] = {} @@ -330,11 +329,13 @@ class vim_thread(threading.Thread): task_create = None dependency_task = None - deletion_needed = False + deletion_needed = task["extra"].get("created", False) if task["status"] == "FAILED": return # TODO need to be retry?? try: - # get all related tasks + # get all related tasks. task of creation must be the first in the list of related_task, + # unless the deletion fails and it is pendingit fails + # TODO this should be removed, passing related_tasks related_tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={self.target_k: self.target_v, "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], @@ -358,13 +359,14 @@ class vim_thread(threading.Thread): break # mark task_create as FINISHED - self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"}, - WHERE={self.target_k: self.target_v, - "instance_action_id": task_create["instance_action_id"], - "task_index": task_create["task_index"] - }) + if task_create: + self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"}, + WHERE={self.target_k: self.target_v, + "instance_action_id": task_create["instance_action_id"], + "task_index": task_create["task_index"] + }) if not deletion_needed: - return + return False elif dependency_task: # move create information from task_create to relate_task extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {} @@ -380,10 +382,19 @@ class vim_thread(threading.Thread): "task_index": dependency_task["task_index"] }) return False - else: + elif task_create: task["vim_id"] = task_create["vim_id"] copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"]) + # Ensure this task extra information is stored at database + self.db.update_rows("vim_wim_actions", + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, + width=256)}, + WHERE={self.target_k: self.target_v, + "instance_action_id": task["instance_action_id"], + "task_index": task["task_index"], + }) return True + return deletion_needed except Exception as e: self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True) @@ -397,7 +408,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -495,7 +506,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_nets_status(net_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -548,6 +559,7 @@ class vim_thread(threading.Thread): old_task_status = task["status"] create_or_find = False # if as result of processing this task something is created or found next_refresh = 0 + task_id = task["instance_action_id"] + "." + str(task["task_index"]) try: if task["status"] == "SCHEDULED": @@ -632,7 +644,7 @@ class vim_thread(threading.Thread): elif task["action"] == "DELETE": self.del_vm(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_nets': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self._refres_net(task) @@ -645,7 +657,7 @@ class vim_thread(threading.Thread): elif task["action"] == "FIND": database_update = self.get_net(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_wim_nets': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self.new_or_update_sdn_net(task) @@ -658,52 +670,66 @@ class vim_thread(threading.Thread): elif task["action"] == "FIND": database_update = self.get_sdn_net(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfis': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_sfis(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_sfi(task) elif task["action"] == "DELETE": self.del_sfi(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfs': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_sfs(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_sf(task) elif task["action"] == "DELETE": self.del_sf(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_classifications': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_classifications(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_classification(task) elif task["action"] == "DELETE": self.del_classification(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfps': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_sfps(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_sfp(task) elif task["action"] == "DELETE": self.del_sfp(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) else: - raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) + raise vimconn.VimConnException(self.name + "unknown task item {}".format(task["item"])) # TODO - except VimThreadException as e: + except Exception as e: + if not isinstance(e, VimThreadException): + self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) task["error_msg"] = str(e) task["status"] = "FAILED" - database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} - if task["item"] == 'instance_vms': - database_update["vim_vm_id"] = None - elif task["item"] == 'instance_nets': - database_update["vim_net_id"] = None + database_update = {"status": "VIM_ERROR" if task["item"] != "instance_wim_nets" else "WIM_ERROR", + "error_msg": task["error_msg"]} + # if task["item"] == 'instance_vms': + # database_update["vim_vm_id"] = None + # elif task["item"] == 'instance_nets': + # database_update["vim_net_id"] = None - task_id = task["instance_action_id"] + "." + str(task["task_index"]) self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format( task_id, task["item"], task["action"], task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"])) @@ -761,7 +787,7 @@ class vim_thread(threading.Thread): if database_update: where_filter = {"related": task["related"]} if task["item"] == "instance_nets" and task["datacenter_vim_id"]: - where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"] + where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"] self.db.update_rows(table=task["item"], UPDATE=database_update, WHERE=where_filter) @@ -773,7 +799,7 @@ class vim_thread(threading.Thread): self.task_queue.put(task, False) return None except queue.Full: - raise vimconn.vimconnException(self.name + ": timeout inserting a task") + raise vimconn.VimConnException(self.name + ": timeout inserting a task") def del_task(self, task): with self.task_lock: @@ -785,10 +811,9 @@ class vim_thread(threading.Thread): return False def run(self): - self.logger.debug("Starting") + self.logger.info("Starting") while True: self.get_vim_sdn_connector() - self.logger.debug("Vimconnector loaded") reload_thread = False while True: @@ -902,7 +927,7 @@ class vim_thread(threading.Thread): instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("task={} new-VM: {}".format(task_id, e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -930,9 +955,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -981,7 +1006,7 @@ class vim_thread(threading.Thread): instance_element_update = self._get_net_internal(task, filter_param) return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("task={} get-net: {}".format(task_id, e)) task["status"] = "FAILED" task["vim_id"] = None @@ -1047,7 +1072,7 @@ class vim_thread(threading.Thread): instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD", "created": True, "error_msg": None} return instance_element_update - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e)) task["status"] = "FAILED" task["vim_id"] = vim_net_id @@ -1074,9 +1099,9 @@ class vim_thread(threading.Thread): task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1089,7 +1114,7 @@ class vim_thread(threading.Thread): connected_ports = task["extra"].get("connected_ports", []) new_connected_ports = [] last_update = task["extra"].get("last_update", 0) - sdn_status = "BUILD" + sdn_status = task["extra"].get("vim_status", "BUILD") sdn_info = None task_id = task["instance_action_id"] + "." + str(task["task_index"]) @@ -1107,7 +1132,7 @@ class vim_thread(threading.Thread): except Exception as e: if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value: pass - else: + else: self._proccess_sdn_exception(e) params = task["params"] @@ -1115,19 +1140,21 @@ class vim_thread(threading.Thread): # look for ports sdn_ports = [] pending_ports = 0 + vlan_used = None ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]}) sdn_need_update = False for port in ports: + vlan_used = port.get("vlan") or vlan_used # TODO. Do not connect if already done if port.get("compute_node") and port.get("pci"): - for map in self.port_mappings: - if map.get("device_id") == port["compute_node"] and \ - map.get("device_interface_id") == port["pci"]: + for pmap in self.port_mappings: + if pmap.get("device_id") == port["compute_node"] and \ + pmap.get("device_interface_id") == port["pci"]: break else: if self.sdnconn_config.get("mapping_not_needed"): - map = { + pmap = { "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]), "service_endpoint_encapsulation_info": { "vlan": port["vlan"], @@ -1137,25 +1164,25 @@ class vim_thread(threading.Thread): } } else: - map = None + pmap = None error_list.append("Port mapping not found for compute_node={} pci={}".format( port["compute_node"], port["pci"])) - if map: - if port["uuid"] not in connected_ports or port["modified_at"] > last_update: + if pmap: + if port["modified_at"] > last_update: sdn_need_update = True new_connected_ports.append(port["uuid"]) sdn_ports.append({ - "service_endpoint_id": map["service_endpoint_id"], + "service_endpoint_id": pmap["service_endpoint_id"], "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None, "service_endpoint_encapsulation_info": { "vlan": port["vlan"], "mac": port["mac_address"], - "device_id": map.get("device_id"), - "device_interface_id": map.get("device_interface_id"), - "switch_dpid": map.get("switch_dpid"), - "switch_port": map.get("switch_port"), - "service_mapping_info": map.get("service_mapping_info"), + "device_id": pmap.get("device_id"), + "device_interface_id": pmap.get("device_interface_id"), + "switch_dpid": pmap.get("switch_dpid"), + "switch_port": pmap.get("switch_port"), + "service_mapping_info": pmap.get("service_mapping_info"), } }) @@ -1164,24 +1191,47 @@ class vim_thread(threading.Thread): if pending_ports: error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}" .format(len(ports)-pending_ports, len(ports))) + + # connect external ports + for index, external_port in enumerate(task["extra"].get("sdn-ports") or ()): + external_port_id = external_port.get("service_endpoint_id") or str(index) + sdn_ports.append({ + "service_endpoint_id": external_port_id, + "service_endpoint_encapsulation_type": external_port.get("service_endpoint_encapsulation_type", + "dot1q"), + "service_endpoint_encapsulation_info": { + "vlan": external_port.get("vlan") or vlan_used, + "mac": external_port.get("mac_address"), + "device_id": external_port.get("device_id"), + "device_interface_id": external_port.get("device_interface_id"), + "switch_dpid": external_port.get("switch_dpid") or external_port.get("switch_id"), + "switch_port": external_port.get("switch_port"), + "service_mapping_info": external_port.get("service_mapping_info"), + }}) + new_connected_ports.append(external_port_id) + # if there are more ports to connect or they have been modified, call create/update - if sdn_need_update and len(sdn_ports) >= 2: - if not wimconn_net_id: - if params[0] == "data": - net_type = "ELAN" - elif params[0] == "ptp": - net_type = "ELINE" + try: + if set(connected_ports) != set(new_connected_ports) or sdn_need_update: + last_update = time.time() + if not wimconn_net_id: + if len(sdn_ports) < 2: + if not pending_ports: + sdn_status = "ACTIVE" + else: + if params[0] == "data": + net_type = "ELAN" + elif params[0] == "ptp": + net_type = "ELINE" + else: + net_type = "L3" + wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service( + net_type, sdn_ports) else: - net_type = "L3" - - wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service(net_type, sdn_ports) - else: - created_items = self.sdnconnector.edit_connectivity_service(wimconn_net_id, conn_info=created_items, - connection_points=sdn_ports) - last_update = time.time() - connected_ports = new_connected_ports - elif wimconn_net_id: - try: + created_items = self.sdnconnector.edit_connectivity_service( + wimconn_net_id, conn_info=created_items, connection_points=sdn_ports) + connected_ports = new_connected_ports + elif wimconn_net_id: wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id, conn_info=created_items) sdn_status = wim_status_dict["sdn_status"] @@ -1189,8 +1239,8 @@ class vim_thread(threading.Thread): error_list.append(wim_status_dict.get("error_msg")) if wim_status_dict.get("sdn_info"): sdn_info = str(wim_status_dict.get("sdn_info")) - except Exception as e: - self._proccess_sdn_exception(e) + except Exception as e: + self._proccess_sdn_exception(e) task["status"] = "DONE" task["extra"]["vim_info"] = {} @@ -1204,7 +1254,7 @@ class vim_thread(threading.Thread): task["vim_id"] = wimconn_net_id instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status, "created": True, "error_msg": task["error_msg"] or None} - except (vimconn.vimconnException, SdnConnectorError) as e: + except (vimconn.VimConnException, SdnConnectorError) as e: self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e)) task["status"] = "FAILED" task["vim_id"] = wimconn_net_id @@ -1212,6 +1262,7 @@ class vim_thread(threading.Thread): # task["extra"]["sdn_net_id"] = sdn_net_id instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR", "error_msg": task["error_msg"]} + if sdn_info: instance_element_update["wim_info"] = sdn_info return instance_element_update @@ -1283,12 +1334,12 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_sfi_id instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1305,9 +1356,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1332,12 +1383,12 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_sf_id instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1354,9 +1405,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1428,13 +1479,13 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_classification_id instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1451,9 +1502,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1484,12 +1535,12 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_sfp_id instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1506,11 +1557,157 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None task["status"] = "FAILED" return None + + def _refres_sfps(self, task): + """Call VIM to get SFPs status""" + database_update = None + + vim_id = task["vim_id"] + sfp_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_sfps_status(sfp_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id, task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update + + def _refres_sfis(self, task): + """Call VIM to get sfis status""" + database_update = None + + vim_id = task["vim_id"] + sfi_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_sfis_status(sfi_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id, task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update + + def _refres_sfs(self, task): + """Call VIM to get sfs status""" + database_update = None + + vim_id = task["vim_id"] + sf_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_sfs_status(sf_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id, task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update + + def _refres_classifications(self, task): + """Call VIM to get classifications status""" + database_update = None + + vim_id = task["vim_id"] + classification_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_classifications_status(classification_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.VimConnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-classification: vimconnException when trying to refresh classifications {}" + .format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id, + task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update