X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FRO.git;a=blobdiff_plain;f=RO%2Fosm_ro%2Fvim_thread.py;h=3039bab5901132102173c5abf47dea055353d711;hp=1e1e6d202a5be78cb37dca90319c8c2aab4d9e2d;hb=396e8135b7201bd35ced4abcd8514bf98a1e9bb9;hpb=7d782eff123e5b44d41437377ccca66ad1e8b21b diff --git a/RO/osm_ro/vim_thread.py b/RO/osm_ro/vim_thread.py index 1e1e6d20..3039bab5 100644 --- a/RO/osm_ro/vim_thread.py +++ b/RO/osm_ro/vim_thread.py @@ -81,12 +81,10 @@ import time import queue import logging from osm_ro import vimconn +from osm_ro.wim.sdnconn import SdnConnectorError import yaml from osm_ro.db_base import db_base_Exception -# TODO py3 BEGIN -class ovimException(Exception): - pass -# TODO py3 END +from http import HTTPStatus from copy import deepcopy __author__ = "Alfonso Tierno, Pablo Montes" @@ -111,8 +109,7 @@ class vim_thread(threading.Thread): REFRESH_ERROR = 600 REFRESH_DELETE = 3600 * 10 - def __init__(self, task_lock, plugins, name=None, datacenter_name=None, datacenter_tenant_id=None, - db=None, db_lock=None, ovim=None): + def __init__(self, task_lock, plugins, name=None, wim_account_id=None, datacenter_tenant_id=None, db=None): """Init a thread. Arguments: 'id' number of thead @@ -122,60 +119,114 @@ class vim_thread(threading.Thread): """ threading.Thread.__init__(self) self.plugins = plugins + self.plugin_name = "unknown" self.vim = None + self.sdnconnector = None + self.sdnconn_config = None self.error_status = None - self.datacenter_name = datacenter_name + self.wim_account_id = wim_account_id self.datacenter_tenant_id = datacenter_tenant_id - self.ovim = ovim + self.port_mappings = None + if self.wim_account_id: + self.target_k = "wim_account_id" + self.target_v = self.wim_account_id + else: + self.target_k = "datacenter_vim_id" + self.target_v = self.datacenter_tenant_id if not name: - self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"] + self.name = wim_account_id or str(datacenter_tenant_id) else: self.name = name self.vim_persistent_info = {} self.my_id = self.name[:64] - self.logger = logging.getLogger('openmano.vim.' + self.name) + self.logger = logging.getLogger('openmano.{}.{}'.format("vim" if self.datacenter_tenant_id else "sdn", + self.name)) self.db = db - self.db_lock = db_lock self.task_lock = task_lock self.task_queue = queue.Queue(2000) - def get_vimconnector(self): - try: - from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid" - select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin', - 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id', - 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id', - 'user', 'passwd', 'dt.config as dt_config') - where_ = {"dt.uuid": self.datacenter_tenant_id} - vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_) - vim = vims[0] - vim_config = {} - if vim["config"]: - vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader)) - if vim["dt_config"]: - vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader)) - vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id') - vim_config['datacenter_id'] = vim.get('datacenter_id') - - # get port_mapping - with self.db_lock: - vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings( - db_filter={"region": vim_config['datacenter_id'], "pci": None}) - - self.vim = self.plugins["rovim_" + vim["type"]].vimconnector( - uuid=vim['datacenter_id'], name=vim['datacenter_name'], - tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'], - url=vim['vim_url'], url_admin=vim['vim_url_admin'], - user=vim['user'], passwd=vim['passwd'], - config=vim_config, persistent_info=self.vim_persistent_info - ) - self.error_status = None - except Exception as e: - self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e)) - self.vim = None - self.error_status = "Error loading vimconnector: {}".format(e) + def _proccess_sdn_exception(self, exc): + if isinstance(exc, SdnConnectorError): + raise + else: + self.logger.error("plugin={} throws a non SdnConnectorError exception {}".format(self.plugin_name, exc), + exc_info=True) + raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc + + def _proccess_vim_exception(self, exc): + if isinstance(exc, vimconn.vimconnException): + raise + else: + self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc), + exc_info=True) + raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc + + def get_vim_sdn_connector(self): + if self.datacenter_tenant_id: + try: + from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid" + select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin', + 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id', + 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id', + 'user', 'passwd', 'dt.config as dt_config') + where_ = {"dt.uuid": self.datacenter_tenant_id} + vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_) + vim = vims[0] + vim_config = {} + if vim["config"]: + vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader)) + if vim["dt_config"]: + vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader)) + vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id') + vim_config['datacenter_id'] = vim.get('datacenter_id') + + # get port_mapping + # vim_port_mappings = self.ovim.get_of_port_mappings( + # db_filter={"datacenter_id": vim_config['datacenter_id']}) + # vim_config["wim_external_ports"] = [x for x in vim_port_mappings + # if x["service_mapping_info"].get("wim")] + self.plugin_name = "rovim_" + vim["type"] + self.vim = self.plugins[self.plugin_name].vimconnector( + uuid=vim['datacenter_id'], name=vim['datacenter_name'], + tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'], + url=vim['vim_url'], url_admin=vim['vim_url_admin'], + user=vim['user'], passwd=vim['passwd'], + config=vim_config, persistent_info=self.vim_persistent_info + ) + self.error_status = None + self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format( + self.datacenter_tenant_id, self.plugin_name)) + except Exception as e: + self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format( + self.datacenter_tenant_id, self.plugin_name, e)) + self.vim = None + self.error_status = "Error loading vimconnector: {}".format(e) + else: + try: + wim_account = self.db.get_rows(FROM="wim_accounts", WHERE={"uuid": self.wim_account_id})[0] + wim = self.db.get_rows(FROM="wims", WHERE={"uuid": wim_account["wim_id"]})[0] + if wim["config"]: + self.sdnconn_config = yaml.load(wim["config"], Loader=yaml.Loader) + else: + self.sdnconn_config = {} + if wim_account["config"]: + self.sdnconn_config.update(yaml.load(wim_account["config"], Loader=yaml.Loader)) + self.port_mappings = self.db.get_rows(FROM="wim_port_mappings", WHERE={"wim_id": wim_account["wim_id"]}) + if self.port_mappings: + self.sdnconn_config["service_endpoint_mapping"] = self.port_mappings + self.plugin_name = "rosdn_" + wim["type"] + self.sdnconnector = self.plugins[self.plugin_name]( + wim, wim_account, config=self.sdnconn_config) + self.error_status = None + self.logger.info("Sdn Connector loaded for wim_account={}, plugin={}".format( + self.wim_account_id, self.plugin_name)) + except Exception as e: + self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format( + self.wim_account_id, self.plugin_name, e)) + self.sdnconnector = None + self.error_status = "Error loading sdn connector: {}".format(e) def _get_db_task(self): """ @@ -189,7 +240,7 @@ class vim_thread(threading.Thread): while True: # get 20 (database_limit) entries each time vim_actions = self.db.get_rows(FROM="vim_wim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "status": ['SCHEDULED', 'BUILD', 'DONE'], "worker": [None, self.my_id], "modified_at<=": now }, @@ -206,7 +257,7 @@ class vim_thread(threading.Thread): task_related = task["related"] # lock ... self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], "worker": [None, self.my_id], "related": task_related, @@ -214,7 +265,7 @@ class vim_thread(threading.Thread): }) # ... and read all related and check if locked related_tasks = self.db.get_rows(FROM="vim_wim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], "related": task_related, "item": task["item"], @@ -235,16 +286,13 @@ class vim_thread(threading.Thread): if some_tasks_not_locked: if some_tasks_locked: # unlock self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "worker": self.my_id, "related": task_related, "item": task["item"], }) continue - # task of creation must be the first in the list of related_task - assert(related_tasks[0]["action"] in ("CREATE", "FIND")) - task["params"] = None if task["extra"]: extra = yaml.load(task["extra"], Loader=yaml.Loader) @@ -272,6 +320,8 @@ class vim_thread(threading.Thread): copy_to["sdn_net_id"] = copy_from["sdn_net_id"] if copy_from.get("interfaces"): copy_to["interfaces"] = copy_from["interfaces"] + if copy_from.get("sdn-ports"): + copy_to["sdn-ports"] = copy_from["sdn-ports"] if copy_from.get("created_items"): if not copy_to.get("created_items"): copy_to["created_items"] = {} @@ -279,13 +329,15 @@ class vim_thread(threading.Thread): task_create = None dependency_task = None - deletion_needed = False + deletion_needed = task["extra"].get("created", False) if task["status"] == "FAILED": return # TODO need to be retry?? try: - # get all related tasks + # get all related tasks. task of creation must be the first in the list of related_task, + # unless the deletion fails and it is pendingit fails + # TODO this should be removed, passing related_tasks related_tasks = self.db.get_rows(FROM="vim_wim_actions", - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'], "action": ["FIND", "CREATE"], "related": task["related"], @@ -307,13 +359,14 @@ class vim_thread(threading.Thread): break # mark task_create as FINISHED - self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"}, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, - "instance_action_id": task_create["instance_action_id"], - "task_index": task_create["task_index"] - }) + if task_create: + self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"}, + WHERE={self.target_k: self.target_v, + "instance_action_id": task_create["instance_action_id"], + "task_index": task_create["task_index"] + }) if not deletion_needed: - return + return False elif dependency_task: # move create information from task_create to relate_task extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {} @@ -324,15 +377,24 @@ class vim_thread(threading.Thread): UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True, width=256), "vim_id": task_create.get("vim_id")}, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "instance_action_id": dependency_task["instance_action_id"], "task_index": dependency_task["task_index"] }) return False - else: + elif task_create: task["vim_id"] = task_create["vim_id"] copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"]) + # Ensure this task extra information is stored at database + self.db.update_rows("vim_wim_actions", + UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True, + width=256)}, + WHERE={self.target_k: self.target_v, + "instance_action_id": task["instance_action_id"], + "task_index": task["task_index"], + }) return True + return deletion_needed except Exception as e: self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True) @@ -366,40 +428,38 @@ class vim_thread(threading.Thread): task_vim_interface = task_interface.get("vim_info") if task_vim_interface != interface: # delete old port - if task_interface.get("sdn_port_id"): - try: - with self.db_lock: - self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True) - task_interface["sdn_port_id"] = None - except ovimException as e: - error_text = "ovimException deleting external_port={}: {}".format( - task_interface["sdn_port_id"], e) - self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) - task_warning_msg += error_text - # TODO Set error_msg at instance_nets instead of instance VMs + # if task_interface.get("sdn_port_id"): + # try: + # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True) + # task_interface["sdn_port_id"] = None + # except ovimException as e: + # error_text = "ovimException deleting external_port={}: {}".format( + # task_interface["sdn_port_id"], e) + # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) + # task_warning_msg += error_text + # # TODO Set error_msg at instance_nets instead of instance VMs # Create SDN port - sdn_net_id = task_interface.get("sdn_net_id") - if sdn_net_id and interface.get("compute_node") and interface.get("pci"): - sdn_port_name = sdn_net_id + "." + task["vim_id"] - sdn_port_name = sdn_port_name[:63] - try: - with self.db_lock: - sdn_port_id = self.ovim.new_external_port( - {"compute_node": interface["compute_node"], - "pci": interface["pci"], - "vlan": interface.get("vlan"), - "net_id": sdn_net_id, - "region": self.vim["config"]["datacenter_id"], - "name": sdn_port_name, - "mac": interface.get("mac_address")}) - task_interface["sdn_port_id"] = sdn_port_id - except (ovimException, Exception) as e: - error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\ - format(interface["compute_node"], interface["pci"], interface.get("vlan"), e) - self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) - task_warning_msg += error_text - # TODO Set error_msg at instance_nets instead of instance VMs + # sdn_net_id = task_interface.get("sdn_net_id") + # if sdn_net_id and interface.get("compute_node") and interface.get("pci"): + # sdn_port_name = sdn_net_id + "." + task["vim_id"] + # sdn_port_name = sdn_port_name[:63] + # try: + # sdn_port_id = self.ovim.new_external_port( + # {"compute_node": interface["compute_node"], + # "pci": interface["pci"], + # "vlan": interface.get("vlan"), + # "net_id": sdn_net_id, + # "region": self.vim["config"]["datacenter_id"], + # "name": sdn_port_name, + # "mac": interface.get("mac_address")}) + # task_interface["sdn_port_id"] = sdn_port_id + # except (ovimException, Exception) as e: + # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\ + # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e) + # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True) + # task_warning_msg += error_text + # # TODO Set error_msg at instance_nets instead of instance VMs self.db.update_rows('instance_interfaces', UPDATE={"mac_address": interface.get("mac_address"), @@ -412,6 +472,8 @@ class vim_thread(threading.Thread): "vlan": interface.get("vlan")}, WHERE={'uuid': task_interface["iface_id"]}) task_interface["vim_info"] = interface + # if sdn_net_id and interface.get("compute_node") and interface.get("pci"): + # # TODO Send message to task SDN to update # check and update task and instance_vms database vim_info_error_msg = None @@ -455,30 +517,29 @@ class vim_thread(threading.Thread): task_vim_info = task["extra"].get("vim_info") task_vim_status = task["extra"].get("vim_status") task_error_msg = task.get("error_msg") - task_sdn_net_id = task["extra"].get("sdn_net_id") + # task_sdn_net_id = task["extra"].get("sdn_net_id") vim_info_status = vim_info["status"] vim_info_error_msg = vim_info.get("error_msg") # get ovim status - if task_sdn_net_id: - try: - with self.db_lock: - sdn_net = self.ovim.show_network(task_sdn_net_id) - except (ovimException, Exception) as e: - text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e) - self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True) - sdn_net = {"status": "ERROR", "last_error": text_error} - if sdn_net["status"] == "ERROR": - if not vim_info_error_msg: - vim_info_error_msg = str(sdn_net.get("last_error")) - else: - vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format( - self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14), - self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14)) - vim_info_status = "ERROR" - elif sdn_net["status"] == "BUILD": - if vim_info_status == "ACTIVE": - vim_info_status = "BUILD" + # if task_sdn_net_id: + # try: + # sdn_net = self.ovim.show_network(task_sdn_net_id) + # except (ovimException, Exception) as e: + # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e) + # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True) + # sdn_net = {"status": "ERROR", "last_error": text_error} + # if sdn_net["status"] == "ERROR": + # if not vim_info_error_msg: + # vim_info_error_msg = str(sdn_net.get("last_error")) + # else: + # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format( + # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14), + # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14)) + # vim_info_status = "ERROR" + # elif sdn_net["status"] == "BUILD": + # if vim_info_status == "ACTIVE": + # vim_info_status = "BUILD" # update database if vim_info_error_msg: @@ -498,6 +559,7 @@ class vim_thread(threading.Thread): old_task_status = task["status"] create_or_find = False # if as result of processing this task something is created or found next_refresh = 0 + task_id = task["instance_action_id"] + "." + str(task["task_index"]) try: if task["status"] == "SCHEDULED": @@ -530,7 +592,7 @@ class vim_thread(threading.Thread): # Move this task to the time dependency is going to be modified plus 10 seconds. self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10, UPDATE={"worker": None}, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id, + WHERE={self.target_k: self.target_v, "worker": self.my_id, "related": task["related"], }) # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1 @@ -553,12 +615,13 @@ class vim_thread(threading.Thread): if task["status"] == "SUPERSEDED": # not needed to do anything but update database with the new status database_update = None - elif not self.vim: + elif not self.vim and not self.sdnconnector: task["status"] = "FAILED" task["error_msg"] = self.error_status - database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} + database_update = {"status": "VIM_ERROR" if self.datacenter_tenant_id else "WIM_ERROR", + "error_msg": task["error_msg"]} elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"): - # Do nothing, just copy values from one to another and updata database + # Do nothing, just copy values from one to another and update database task["status"] = related_tasks[0]["status"] task["error_msg"] = related_tasks[0]["error_msg"] task["vim_id"] = related_tasks[0]["vim_id"] @@ -595,8 +658,24 @@ class vim_thread(threading.Thread): database_update = self.get_net(task) else: raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_wim_nets': + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self.new_or_update_sdn_net(task) + create_or_find = True + elif task["action"] == "CREATE": + create_or_find = True + database_update = self.new_or_update_sdn_net(task) + elif task["action"] == "DELETE": + self.del_sdn_net(task) + elif task["action"] == "FIND": + database_update = self.get_sdn_net(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfis': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_sfis(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_sfi(task) elif task["action"] == "DELETE": @@ -604,7 +683,10 @@ class vim_thread(threading.Thread): else: raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfs': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_sfs(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_sf(task) elif task["action"] == "DELETE": @@ -612,7 +694,10 @@ class vim_thread(threading.Thread): else: raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_classifications': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_classifications(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_classification(task) elif task["action"] == "DELETE": @@ -620,7 +705,10 @@ class vim_thread(threading.Thread): else: raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) elif task["item"] == 'instance_sfps': - if task["action"] == "CREATE": + if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"): + database_update = self._refres_sfps(task) + create_or_find = True + elif task["action"] == "CREATE": create_or_find = True database_update = self.new_sfp(task) elif task["action"] == "DELETE": @@ -630,16 +718,17 @@ class vim_thread(threading.Thread): else: raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) # TODO - except VimThreadException as e: + except Exception as e: + if not isinstance(e, VimThreadException): + self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True) task["error_msg"] = str(e) task["status"] = "FAILED" database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]} - if task["item"] == 'instance_vms': - database_update["vim_vm_id"] = None - elif task["item"] == 'instance_nets': - database_update["vim_net_id"] = None + # if task["item"] == 'instance_vms': + # database_update["vim_vm_id"] = None + # elif task["item"] == 'instance_nets': + # database_update["vim_net_id"] = None - task_id = task["instance_action_id"] + "." + str(task["task_index"]) self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format( task_id, task["item"], task["action"], task["status"], task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"])) @@ -649,7 +738,7 @@ class vim_thread(threading.Thread): next_refresh = time.time() if task["extra"].get("vim_status") == "BUILD": next_refresh += self.REFRESH_BUILD - elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"): + elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"): next_refresh += self.REFRESH_ERROR elif task["extra"].get("vim_status") == "DELETED": next_refresh += self.REFRESH_DELETE @@ -666,7 +755,7 @@ class vim_thread(threading.Thread): "error_msg": task["error_msg"], }, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "worker": self.my_id, "action": ["FIND", "CREATE"], "related": task["related"], @@ -683,7 +772,7 @@ class vim_thread(threading.Thread): self.db.update_rows( table="vim_wim_actions", modified_time=0, UPDATE={"worker": None}, - WHERE={"datacenter_vim_id": self.datacenter_tenant_id, + WHERE={self.target_k: self.target_v, "worker": self.my_id, "related": task["related"], }) @@ -697,7 +786,7 @@ class vim_thread(threading.Thread): if database_update: where_filter = {"related": task["related"]} if task["item"] == "instance_nets" and task["datacenter_vim_id"]: - where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"] + where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"] self.db.update_rows(table=task["item"], UPDATE=database_update, WHERE=where_filter) @@ -723,7 +812,7 @@ class vim_thread(threading.Thread): def run(self): self.logger.debug("Starting") while True: - self.get_vimconnector() + self.get_vim_sdn_connector() self.logger.debug("Vimconnector loaded") reload_thread = False @@ -848,19 +937,18 @@ class vim_thread(threading.Thread): return instance_element_update def del_vm(self, task): - task_id = task["instance_action_id"] + "." + str(task["task_index"]) + # task_id = task["instance_action_id"] + "." + str(task["task_index"]) vm_vim_id = task["vim_id"] - interfaces = task["extra"].get("interfaces", ()) + # interfaces = task["extra"].get("interfaces", ()) try: - for iface in interfaces.values(): - if iface.get("sdn_port_id"): - try: - with self.db_lock: - self.ovim.delete_port(iface["sdn_port_id"], idempotent=True) - except ovimException as e: - self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format( - task_id, iface["sdn_port_id"], e), exc_info=True) - # TODO Set error_msg at instance_nets + # for iface in interfaces.values(): + # if iface.get("sdn_port_id"): + # try: + # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True) + # except ovimException as e: + # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format( + # task_id, iface["sdn_port_id"], e), exc_info=True) + # # TODO Set error_msg at instance_nets self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items")) task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing @@ -929,7 +1017,6 @@ class vim_thread(threading.Thread): def new_net(self, task): vim_net_id = None - sdn_net_id = None task_id = task["instance_action_id"] + "." + str(task["task_index"]) action_text = "" try: @@ -945,91 +1032,73 @@ class vim_thread(threading.Thread): # CREATE params = task["params"] action_text = "creating VIM" - vim_net_id, created_items = self.vim.new_network(*params[0:3]) - - net_name = params[0] - net_type = params[1] - wim_account_name = None - if len(params) >= 4: - wim_account_name = params[3] - - sdn_controller = self.vim.config.get('sdn-controller') - if sdn_controller and (net_type == "data" or net_type == "ptp"): - network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]} - - vim_net = self.vim.get_network(vim_net_id) - if vim_net.get('encapsulation') != 'vlan': - raise vimconn.vimconnException( - "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format( - net_name, net_type, vim_net['encapsulation'])) - network["vlan"] = vim_net.get('segmentation_id') - action_text = "creating SDN" - with self.db_lock: - sdn_net_id = self.ovim.new_network(network) - - if wim_account_name and self.vim.config["wim_external_ports"]: - # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM - action_text = "attaching external port to ovim network" - sdn_port_name = "external_port" - sdn_port_data = { - "compute_node": "__WIM:" + wim_account_name[0:58], - "pci": None, - "vlan": network["vlan"], - "net_id": sdn_net_id, - "region": self.vim["config"]["datacenter_id"], - "name": sdn_port_name, - } - try: - with self.db_lock: - sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) - except ovimException: - sdn_port_data["compute_node"] = "__WIM" - with self.db_lock: - sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) - self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id, - sdn_net_id)) + + vim_net_id, created_items = self.vim.new_network(*params[0:5]) + + # net_name = params[0] + # net_type = params[1] + # wim_account_name = None + # if len(params) >= 6: + # wim_account_name = params[5] + + # TODO fix at nfvo adding external port + # if wim_account_name and self.vim.config["wim_external_ports"]: + # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM + # action_text = "attaching external port to ovim network" + # sdn_port_name = "external_port" + # sdn_port_data = { + # "compute_node": "__WIM:" + wim_account_name[0:58], + # "pci": None, + # "vlan": network["vlan"], + # "net_id": sdn_net_id, + # "region": self.vim["config"]["datacenter_id"], + # "name": sdn_port_name, + # } + # try: + # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) + # except ovimException: + # sdn_port_data["compute_node"] = "__WIM" + # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data) + # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id, + # sdn_net_id)) task["status"] = "DONE" task["extra"]["vim_info"] = {} - task["extra"]["sdn_net_id"] = sdn_net_id + # task["extra"]["sdn_net_id"] = sdn_net_id task["extra"]["vim_status"] = "BUILD" task["extra"]["created"] = True task["extra"]["created_items"] = created_items task["error_msg"] = None task["vim_id"] = vim_net_id - instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD", + instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD", "created": True, "error_msg": None} return instance_element_update - except (vimconn.vimconnException, ovimException) as e: + except vimconn.vimconnException as e: self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e)) task["status"] = "FAILED" task["vim_id"] = vim_net_id task["error_msg"] = self._format_vim_error_msg(str(e)) - task["extra"]["sdn_net_id"] = sdn_net_id - instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR", + # task["extra"]["sdn_net_id"] = sdn_net_id + instance_element_update = {"vim_net_id": vim_net_id, "status": "VIM_ERROR", "error_msg": task["error_msg"]} return instance_element_update def del_net(self, task): net_vim_id = task["vim_id"] - sdn_net_id = task["extra"].get("sdn_net_id") + # sdn_net_id = task["extra"].get("sdn_net_id") try: if net_vim_id: self.vim.delete_network(net_vim_id, task["extra"].get("created_items")) - if sdn_net_id: - # Delete any attached port to this sdn network. There can be ports associated to this network in case - # it was manually done using 'openmano vim-net-sdn-attach' - with self.db_lock: - port_list = self.ovim.get_ports(columns={'uuid'}, - filter={'name': 'external_port', 'net_id': sdn_net_id}) - for port in port_list: - self.ovim.delete_port(port['uuid'], idempotent=True) - self.ovim.delete_network(sdn_net_id, idempotent=True) + # if sdn_net_id: + # # Delete any attached port to this sdn network. There can be ports associated to this network in case + # # it was manually done using 'openmano vim-net-sdn-attach' + # port_list = self.ovim.get_ports(columns={'uuid'}, + # filter={'name': 'external_port', 'net_id': sdn_net_id}) + # for port in port_list: + # self.ovim.delete_port(port['uuid'], idempotent=True) + # self.ovim.delete_network(sdn_net_id, idempotent=True) task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing task["error_msg"] = None return None - except ovimException as e: - task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external " - "ports for net {}: {}".format(sdn_net_id, str(e))) except vimconn.vimconnException as e: task["error_msg"] = self._format_vim_error_msg(str(e)) if isinstance(e, vimconn.vimconnNotFoundException): @@ -1039,6 +1108,183 @@ class vim_thread(threading.Thread): task["status"] = "FAILED" return None + def new_or_update_sdn_net(self, task): + wimconn_net_id = task["vim_id"] + created_items = task["extra"].get("created_items") + connected_ports = task["extra"].get("connected_ports", []) + new_connected_ports = [] + last_update = task["extra"].get("last_update", 0) + sdn_status = "BUILD" + sdn_info = None + + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + error_list = [] + try: + # FIND + if task["extra"].get("find"): + wimconn_id = task["extra"]["find"][0] + try: + instance_element_update = self.sdnconnector.get_connectivity_service_status(wimconn_id) + wimconn_net_id = wimconn_id + instance_element_update = {"wim_internal_id": wimconn_net_id, "created": False, "status": "BUILD", + "error_msg": None, } + return instance_element_update + except Exception as e: + if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value: + pass + else: + self._proccess_sdn_exception(e) + + params = task["params"] + # CREATE + # look for ports + sdn_ports = [] + pending_ports = 0 + vlan_used = None + + ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]}) + sdn_need_update = False + for port in ports: + vlan_used = port.get("vlan") or vlan_used + # TODO. Do not connect if already done + if port.get("compute_node") and port.get("pci"): + for pmap in self.port_mappings: + if pmap.get("device_id") == port["compute_node"] and \ + pmap.get("device_interface_id") == port["pci"]: + break + else: + if self.sdnconn_config.get("mapping_not_needed"): + pmap = { + "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]), + "service_endpoint_encapsulation_info": { + "vlan": port["vlan"], + "mac": port["mac_address"], + "device_id": port["compute_node"], + "device_interface_id": port["pci"] + } + } + else: + pmap = None + error_list.append("Port mapping not found for compute_node={} pci={}".format( + port["compute_node"], port["pci"])) + + if pmap: + if port["modified_at"] > last_update: + sdn_need_update = True + new_connected_ports.append(port["uuid"]) + sdn_ports.append({ + "service_endpoint_id": pmap["service_endpoint_id"], + "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None, + "service_endpoint_encapsulation_info": { + "vlan": port["vlan"], + "mac": port["mac_address"], + "device_id": pmap.get("device_id"), + "device_interface_id": pmap.get("device_interface_id"), + "switch_dpid": pmap.get("switch_dpid"), + "switch_port": pmap.get("switch_port"), + "service_mapping_info": pmap.get("service_mapping_info"), + } + }) + + else: + pending_ports += 1 + if pending_ports: + error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}" + .format(len(ports)-pending_ports, len(ports))) + + # connect external ports + for index, external_port in enumerate(task["extra"].get("sdn-ports") or ()): + external_port_id = external_port.get("service_endpoint_id") or str(index) + sdn_ports.append({ + "service_endpoint_id": external_port_id, + "service_endpoint_encapsulation_type": external_port.get("service_endpoint_encapsulation_type", + "dot1q"), + "service_endpoint_encapsulation_info": { + "vlan": external_port.get("vlan") or vlan_used, + "mac": external_port.get("mac_address"), + "device_id": external_port.get("device_id"), + "device_interface_id": external_port.get("device_interface_id"), + "switch_dpid": external_port.get("switch_dpid") or external_port.get("switch_id"), + "switch_port": external_port.get("switch_port"), + "service_mapping_info": external_port.get("service_mapping_info"), + }}) + new_connected_ports.append(external_port_id) + + # if there are more ports to connect or they have been modified, call create/update + try: + if (set(connected_ports) != set(new_connected_ports) or sdn_need_update) and len(sdn_ports) >= 2: + last_update = time.time() + if not wimconn_net_id: + if params[0] == "data": + net_type = "ELAN" + elif params[0] == "ptp": + net_type = "ELINE" + else: + net_type = "L3" + + wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service( + net_type, sdn_ports) + else: + created_items = self.sdnconnector.edit_connectivity_service( + wimconn_net_id, conn_info=created_items, connection_points=sdn_ports) + connected_ports = new_connected_ports + elif wimconn_net_id: + wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id, + conn_info=created_items) + sdn_status = wim_status_dict["sdn_status"] + if wim_status_dict.get("error_msg"): + error_list.append(wim_status_dict.get("error_msg")) + if wim_status_dict.get("sdn_info"): + sdn_info = str(wim_status_dict.get("sdn_info")) + except Exception as e: + self._proccess_sdn_exception(e) + + task["status"] = "DONE" + task["extra"]["vim_info"] = {} + # task["extra"]["sdn_net_id"] = sdn_net_id + task["extra"]["vim_status"] = sdn_status + task["extra"]["created"] = True + task["extra"]["created_items"] = created_items + task["extra"]["connected_ports"] = connected_ports + task["extra"]["last_update"] = last_update + task["error_msg"] = self._format_vim_error_msg(" ; ".join(error_list)) + task["vim_id"] = wimconn_net_id + instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status, + "created": True, "error_msg": task["error_msg"] or None} + except (vimconn.vimconnException, SdnConnectorError) as e: + self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e)) + task["status"] = "FAILED" + task["vim_id"] = wimconn_net_id + task["error_msg"] = self._format_vim_error_msg(str(e)) + # task["extra"]["sdn_net_id"] = sdn_net_id + instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR", + "error_msg": task["error_msg"]} + + if sdn_info: + instance_element_update["wim_info"] = sdn_info + return instance_element_update + + def del_sdn_net(self, task): + wimconn_net_id = task["vim_id"] + try: + try: + if wimconn_net_id: + self.sdnconnector.delete_connectivity_service(wimconn_net_id, task["extra"].get("created_items")) + task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["error_msg"] = None + return None + except Exception as e: + self._proccess_sdn_exception(e) + except SdnConnectorError as e: + task["error_msg"] = self._format_vim_error_msg(str(e)) + if e.http_code == HTTPStatus.NOT_FOUND.value: + # If not found mark as Done and fill error_msg + task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["error_msg"] = None + return None + task["status"] = "FAILED" + return None + # Service Function Instances def new_sfi(self, task): vim_sfi_id = None @@ -1085,7 +1331,7 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_sfi_id instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None} return instance_element_update @@ -1134,7 +1380,7 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_sf_id instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None} return instance_element_update @@ -1172,12 +1418,27 @@ class vim_thread(threading.Thread): task_id = task["instance_action_id"] + "." + str(task["task_index"]) dep_id = "TASK-" + str(task["extra"]["depends_on"][0]) error_text = "" - interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys() + interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces") # Bear in mind that different VIM connectors might support Classifications differently. # In the case of OpenStack, only the first VNF attached to the classifier will be used # to create the Classification(s) (the "logical source port" of the "Flow Classifier"). # Since the VNFFG classifier match lacks the ethertype, classification defaults to # using the IPv4 flow classifier. + logical_source_port_vim_id = None + logical_source_port_id = params.get("logical_source_port") + for vim_interface, interface_data in interfaces.items(): + if interface_data.get("interface_id") == logical_source_port_id: + logical_source_port_vim_id = vim_interface + break + if not logical_source_port_vim_id: + error_text = "Error creating Flow Classifier, Logical Source Port id {}".format( + logical_source_port_id) + self.logger.error(error_text) + task["error_msg"] = error_text + task["status"] = "FAILED" + task["vim_id"] = None + return None + name = "c-{}".format(task["item_id"][:8]) # if not CIDR is given for the IP addresses, add /32: ip_proto = int(params.get("ip_proto")) @@ -1185,7 +1446,7 @@ class vim_thread(threading.Thread): destination_ip = params.get("destination_ip") source_port = params.get("source_port") destination_port = params.get("destination_port") - definition = {"logical_source_port": interfaces[0]} + definition = {"logical_source_port": logical_source_port_vim_id} if ip_proto: if ip_proto == 1: ip_proto = 'icmp' @@ -1215,7 +1476,7 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_classification_id instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None} @@ -1271,7 +1532,7 @@ class vim_thread(threading.Thread): task["extra"]["created"] = True task["extra"]["vim_status"] = "ACTIVE" task["error_msg"] = None - task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing + task["status"] = "DONE" task["vim_id"] = vim_sfp_id instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None} return instance_element_update @@ -1301,3 +1562,149 @@ class vim_thread(threading.Thread): return None task["status"] = "FAILED" return None + + def _refres_sfps(self, task): + """Call VIM to get SFPs status""" + database_update = None + + vim_id = task["vim_id"] + sfp_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_sfps_status(sfp_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.vimconnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id, task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update + + def _refres_sfis(self, task): + """Call VIM to get sfis status""" + database_update = None + + vim_id = task["vim_id"] + sfi_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_sfis_status(sfi_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.vimconnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id, task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update + + def _refres_sfs(self, task): + """Call VIM to get sfs status""" + database_update = None + + vim_id = task["vim_id"] + sf_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_sfs_status(sf_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.vimconnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id, task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update + + def _refres_classifications(self, task): + """Call VIM to get classifications status""" + database_update = None + + vim_id = task["vim_id"] + classification_to_refresh_list = [vim_id] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + try: + vim_dict = self.vim.refresh_classifications_status(classification_to_refresh_list) + vim_info = vim_dict[vim_id] + except vimconn.vimconnException as e: + # Mark all tasks at VIM_ERROR status + self.logger.error("task={} get-classification: vimconnException when trying to refresh classifications {}" + .format(task_id, e)) + vim_info = {"status": "VIM_ERROR", "error_msg": str(e)} + + self.logger.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id, + task["vim_id"], vim_info)) + #TODO: Revise this part + vim_info_error_msg = None + if vim_info.get("error_msg"): + vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"]) + task_vim_info = task["extra"].get("vim_info") + task_error_msg = task.get("error_msg") + task_vim_status = task["extra"].get("vim_status") + if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \ + (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]): + database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg} + if vim_info.get("vim_info"): + database_update["vim_info"] = vim_info["vim_info"] + + task["extra"]["vim_status"] = vim_info["status"] + task["error_msg"] = vim_info_error_msg + if vim_info.get("vim_info"): + task["extra"]["vim_info"] = vim_info["vim_info"] + + return database_update