X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=RO%2Fosm_ro%2Fvim_thread.py;h=7174fe67beae74bb63430108ec3ead7eed21f323;hp=3039bab5901132102173c5abf47dea055353d711;hb=187b52ae9922b3b3170080ec78958d7933bde813;hpb=396e8135b7201bd35ced4abcd8514bf98a1e9bb9 diff --git a/RO/osm_ro/vim_thread.py b/RO/osm_ro/vim_thread.py index 3039bab5..7174fe67 100644 --- a/RO/osm_ro/vim_thread.py +++ b/RO/osm_ro/vim_thread.py @@ -80,8 +80,8 @@ import threading import time import queue import logging -from osm_ro import vimconn -from osm_ro.wim.sdnconn import SdnConnectorError +from osm_ro_plugin import vimconn +from osm_ro_plugin.sdnconn import SdnConnectorError import yaml from osm_ro.db_base import db_base_Exception from http import HTTPStatus @@ -156,12 +156,12 @@ class vim_thread(threading.Thread): raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc def _proccess_vim_exception(self, exc): - if isinstance(exc, vimconn.vimconnException): + if isinstance(exc, vimconn.VimConnException): raise else: self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc), exc_info=True) - raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc + raise vimconn.VimConnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc def get_vim_sdn_connector(self): if self.datacenter_tenant_id: @@ -188,7 +188,7 @@ class vim_thread(threading.Thread): # vim_config["wim_external_ports"] = [x for x in vim_port_mappings # if x["service_mapping_info"].get("wim")] self.plugin_name = "rovim_" + vim["type"] - self.vim = self.plugins[self.plugin_name].vimconnector( + self.vim = self.plugins[self.plugin_name]( uuid=vim['datacenter_id'], name=vim['datacenter_name'], tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'], url=vim['vim_url'], url_admin=vim['vim_url_admin'], @@ -224,9 +224,9 @@ class vim_thread(threading.Thread): self.wim_account_id, self.plugin_name)) except Exception as e: self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format( - self.wim_account_id, self.plugin_name, e)) + self.wim_account_id, self.plugin_name, e), exc_info=True) self.sdnconnector = None - self.error_status = "Error loading sdn connector: {}".format(e) + self.error_status = self._format_vim_error_msg("Error loading sdn connector: {}".format(e)) def _get_db_task(self): """ @@ -408,7 +408,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -506,7 +506,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_nets_status(net_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -644,7 +644,7 @@ class vim_thread(threading.Thread): elif task["action"] == "DELETE": self.del_vm(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_nets': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self._refres_net(task) @@ -657,7 +657,7 @@ class vim_thread(threading.Thread): elif task["action"] == "FIND": database_update = self.get_net(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_wim_nets': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self.new_or_update_sdn_net(task) @@ -670,7 +670,7 @@ class vim_thread(threading.Thread): elif task["action"] == "FIND": database_update = self.get_sdn_net(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfis': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self._refres_sfis(task) @@ -681,7 +681,7 @@ class vim_thread(threading.Thread): elif task["action"] == "DELETE": self.del_sfi(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfs': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self._refres_sfs(task) @@ -692,7 +692,7 @@ class vim_thread(threading.Thread): elif task["action"] == "DELETE": self.del_sf(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_classifications': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self._refres_classifications(task) @@ -703,7 +703,7 @@ class vim_thread(threading.Thread): elif task["action"] == "DELETE": self.del_classification(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfps': if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): database_update = self._refres_sfps(task) @@ -714,16 +714,17 @@ class vim_thread(threading.Thread): elif task["action"] == "DELETE": self.del_sfp(task) else: - raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"])) else: - raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) + raise vimconn.VimConnException(self.name + "unknown task item {}".format(task["item"])) # TODO except Exception as e: if not isinstance(e, VimThreadException): self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) task["error_msg"] = str(e) task["status"] = "FAILED" - database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} + database_update = {"status": "VIM_ERROR" if task["item"] != "instance_wim_nets" else "WIM_ERROR", + "error_msg": task["error_msg"]} # if task["item"] == 'instance_vms': # database_update["vim_vm_id"] = None # elif task["item"] == 'instance_nets': @@ -798,7 +799,7 @@ class vim_thread(threading.Thread): self.task_queue.put(task, False) return None except queue.Full: - raise vimconn.vimconnException(self.name + ": timeout inserting a task") + raise vimconn.VimConnException(self.name + ": timeout inserting a task") def del_task(self, task): with self.task_lock: @@ -810,10 +811,9 @@ class vim_thread(threading.Thread): return False def run(self): - self.logger.debug("Starting") + self.logger.info("Starting") while True: self.get_vim_sdn_connector() - self.logger.debug("Vimconnector loaded") reload_thread = False while True: @@ -927,7 +927,7 @@ class vim_thread(threading.Thread): instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("task={} new-VM: {}".format(task_id, e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -955,9 +955,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1006,7 +1006,7 @@ class vim_thread(threading.Thread): instance_element_update = self._get_net_internal(task, filter_param) return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("task={} get-net: {}".format(task_id, e)) task["status"] = "FAILED" task["vim_id"] = None @@ -1072,7 +1072,7 @@ class vim_thread(threading.Thread): instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD", "created": True, "error_msg": None} return instance_element_update - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e)) task["status"] = "FAILED" task["vim_id"] = vim_net_id @@ -1099,9 +1099,9 @@ class vim_thread(threading.Thread): task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1114,7 +1114,7 @@ class vim_thread(threading.Thread): connected_ports = task["extra"].get("connected_ports", []) new_connected_ports = [] last_update = task["extra"].get("last_update", 0) - sdn_status = "BUILD" + sdn_status = task["extra"].get("vim_status", "BUILD") sdn_info = None task_id = task["instance_action_id"] + "." + str(task["task_index"]) @@ -1212,18 +1212,21 @@ class vim_thread(threading.Thread): # if there are more ports to connect or they have been modified, call create/update try: - if (set(connected_ports) != set(new_connected_ports) or sdn_need_update) and len(sdn_ports) >= 2: + if set(connected_ports) != set(new_connected_ports) or sdn_need_update: last_update = time.time() if not wimconn_net_id: - if params[0] == "data": - net_type = "ELAN" - elif params[0] == "ptp": - net_type = "ELINE" + if len(sdn_ports) < 2: + if not pending_ports: + sdn_status = "ACTIVE" else: - net_type = "L3" - - wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service( - net_type, sdn_ports) + if params[0] == "data": + net_type = "ELAN" + elif params[0] == "ptp": + net_type = "ELINE" + else: + net_type = "L3" + wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service( + net_type, sdn_ports) else: created_items = self.sdnconnector.edit_connectivity_service( wimconn_net_id, conn_info=created_items, connection_points=sdn_ports) @@ -1251,7 +1254,7 @@ class vim_thread(threading.Thread): task["vim_id"] = wimconn_net_id instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status, "created": True, "error_msg": task["error_msg"] or None} - except (vimconn.vimconnException, SdnConnectorError) as e: + except (vimconn.VimConnException, SdnConnectorError) as e: self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e)) task["status"] = "FAILED" task["vim_id"] = wimconn_net_id @@ -1336,7 +1339,7 @@ class vim_thread(threading.Thread): instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1353,9 +1356,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1385,7 +1388,7 @@ class vim_thread(threading.Thread): instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1402,9 +1405,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1482,7 +1485,7 @@ class vim_thread(threading.Thread): "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1499,9 +1502,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1537,7 +1540,7 @@ class vim_thread(threading.Thread): instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, VimThreadException) as e: + except (vimconn.VimConnException, VimThreadException) as e: self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e)) error_text = self._format_vim_error_msg(str(e)) task["error_msg"] = error_text @@ -1554,9 +1557,9 @@ class vim_thread(threading.Thread): task["error_msg"] = None return None - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) - if isinstance(e, vimconn.vimconnNotFoundException): + if isinstance(e, vimconn.VimConnNotFoundException): # If not found mark as Done and fill error_msg task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing return None @@ -1573,7 +1576,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_sfps_status(sfp_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id, e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -1609,7 +1612,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_sfis_status(sfi_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id, e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -1645,7 +1648,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_sfs_status(sf_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id, e)) vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} @@ -1681,7 +1684,7 @@ class vim_thread(threading.Thread): try: vim_dict = self.vim.refresh_classifications_status(classification_to_refresh_list) vim_info = vim_dict[vim_id] - except vimconn.vimconnException as e: + except vimconn.VimConnException as e: # Mark all tasks at VIM_ERROR status self.logger.error("task={} get-classification: vimconnException when trying to refresh classifications {}" .format(task_id, e))