X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_ro%2Fvim_thread.py;h=cbcd31f7d6b7675241efdccf941f051214ccd0ca;hb=59760e72c3e3e19fd262dcae05ff08663c7fbac8;hp=ce0fc7f585d70d283a954d00067d320dd919159f;hpb=56d877dad3d7ce6a36e233a77d1ee7dc8295e233;p=osm%2FRO.git diff --git a/osm_ro/vim_thread.py b/osm_ro/vim_thread.py index ce0fc7f5..cbcd31f7 100644 --- a/osm_ro/vim_thread.py +++ b/osm_ro/vim_thread.py @@ -484,11 +484,11 @@ class vim_thread(threading.Thread): break elif task_dependency["status"] == "FAILED": raise VimThreadException( - "Cannot {} {}, (task {}.{}) because depends on failed {} {}, (task{}.{})".format( + "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format( task["action"], task["item"], task["instance_action_id"], task["task_index"], task_dependency["instance_action_id"], task_dependency["task_index"], - task_dependency["action"], task_dependency["item"])) + task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg"))) if dependency_not_completed: # Move this task to the end. task["extra"]["tries"] = task["extra"].get("tries", 0) + 1 @@ -530,6 +530,38 @@ class vim_thread(threading.Thread): result, database_update = self.get_net(task) else: raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_sfis': + if task["action"] == "CREATE": + result, database_update = self.new_sfi(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_sfi(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_sfs': + if task["action"] == "CREATE": + result, database_update = self.new_sf(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_sf(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_classifications': + if task["action"] == "CREATE": + result, database_update = self.new_classification(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_classification(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) + elif task["item"] == 'instance_sfps': + if task["action"] == "CREATE": + result, database_update = self.new_sfp(task) + nb_created += 1 + elif task["action"] == "DELETE": + result, database_update = self.del_sfp(task) + else: + raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"])) else: raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"])) # TODO @@ -543,11 +575,14 @@ class vim_thread(threading.Thread): elif task["item"] == 'instance_nets': database_update["vim_net_id"] = None + no_refresh_tasks = ['instance_sfis', 'instance_sfs', + 'instance_classifications', 'instance_sfps'] if task["action"] == "DELETE": action_key = task["item"] + task["item_id"] del self.grouped_tasks[action_key] elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"): - self._insert_refresh(task) + if task["item"] not in no_refresh_tasks: + self._insert_refresh(task) task_id = task["instance_action_id"] + "." + str(task["task_index"]) self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format( @@ -941,3 +976,234 @@ class vim_thread(threading.Thread): return True, None task["status"] = "FAILED" return False, None + + ## Service Function Instances + + def new_sfi(self, task): + vim_sfi_id = None + try: + params = task["params"] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + depends = task.get("depends") + error_text = "" + interfaces = task.get("depends").values()[0].get("extra").get("params")[5] + # At the moment, every port associated with the VM will be used both as ingress and egress ports. + # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the + # first ingress and first egress ports will be used to create the SFI (Port Pair). + port_id_list = [interfaces[0].get("vim_id")] + name = "sfi-%s" % task["item_id"][:8] + # By default no form of IETF SFC Encapsulation will be used + vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False) + + task["extra"]["created"] = True + task["error_msg"] = None + task["status"] = "DONE" + task["vim_id"] = vim_sfi_id + instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None} + return True, instance_element_update + + except (vimconn.vimconnException, VimThreadException) as e: + self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e)) + error_text = self._format_vim_error_msg(str(e)) + task["error_msg"] = error_text + task["status"] = "FAILED" + task["vim_id"] = None + instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text} + return False, instance_element_update + + def del_sfi(self, task): + sfi_vim_id = task["vim_id"] + try: + self.vim.delete_sfi(sfi_vim_id) + task["status"] = "DONE" + task["error_msg"] = None + return True, None + + except vimconn.vimconnException as e: + task["error_msg"] = self._format_vim_error_msg(str(e)) + if isinstance(e, vimconn.vimconnNotFoundException): + # If not found mark as Done and fill error_msg + task["status"] = "DONE" + return True, None + task["status"] = "FAILED" + return False, None + + def new_sf(self, task): + vim_sf_id = None + try: + params = task["params"] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + depends = task.get("depends") + error_text = "" + #sfis = task.get("depends").values()[0].get("extra").get("params")[5] + sfis = task.get("depends").values() + sfi_id_list = [] + for sfi in sfis: + sfi_id_list.append(sfi.get("vim_id")) + name = "sf-%s" % task["item_id"][:8] + # By default no form of IETF SFC Encapsulation will be used + vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False) + + task["extra"]["created"] = True + task["error_msg"] = None + task["status"] = "DONE" + task["vim_id"] = vim_sf_id + instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None} + return True, instance_element_update + + except (vimconn.vimconnException, VimThreadException) as e: + self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e)) + error_text = self._format_vim_error_msg(str(e)) + task["error_msg"] = error_text + task["status"] = "FAILED" + task["vim_id"] = None + instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text} + return False, instance_element_update + + def del_sf(self, task): + sf_vim_id = task["vim_id"] + try: + self.vim.delete_sf(sf_vim_id) + task["status"] = "DONE" + task["error_msg"] = None + return True, None + + except vimconn.vimconnException as e: + task["error_msg"] = self._format_vim_error_msg(str(e)) + if isinstance(e, vimconn.vimconnNotFoundException): + # If not found mark as Done and fill error_msg + task["status"] = "DONE" + return True, None + task["status"] = "FAILED" + return False, None + + def new_classification(self, task): + vim_classification_id = None + try: + params = task["params"] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + depends = task.get("depends") + error_text = "" + interfaces = task.get("depends").values()[0].get("extra").get("params")[5] + # Bear in mind that different VIM connectors might support Classifications differently. + # In the case of OpenStack, only the first VNF attached to the classifier will be used + # to create the Classification(s) (the "logical source port" of the "Flow Classifier"). + # Since the VNFFG classifier match lacks the ethertype, classification defaults to + # using the IPv4 flow classifier. + name = "c-%s" % task["item_id"][:8] + # if not CIDR is given for the IP addresses, add /32: + ip_proto = int(params.get("ip_proto")) + source_ip = params.get("source_ip") + destination_ip = params.get("destination_ip") + if ip_proto == 1: + ip_proto = 'icmp' + elif ip_proto == 6: + ip_proto = 'tcp' + elif ip_proto == 17: + ip_proto = 'udp' + if '/' not in source_ip: + source_ip += '/32' + if '/' not in destination_ip: + destination_ip += '/32' + definition = { + "logical_source_port": interfaces[0].get("vim_id"), + "protocol": ip_proto, + "source_ip_prefix": source_ip, + "destination_ip_prefix": destination_ip, + "source_port_range_min": params.get("source_port"), + "source_port_range_max": params.get("source_port"), + "destination_port_range_min": params.get("destination_port"), + "destination_port_range_max": params.get("destination_port"), + } + + vim_classification_id = self.vim.new_classification( + name, 'legacy_flow_classifier', definition) + + task["extra"]["created"] = True + task["error_msg"] = None + task["status"] = "DONE" + task["vim_id"] = vim_classification_id + instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None} + return True, instance_element_update + + except (vimconn.vimconnException, VimThreadException) as e: + self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e)) + error_text = self._format_vim_error_msg(str(e)) + task["error_msg"] = error_text + task["status"] = "FAILED" + task["vim_id"] = None + instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text} + return False, instance_element_update + + def del_classification(self, task): + classification_vim_id = task["vim_id"] + try: + self.vim.delete_classification(classification_vim_id) + task["status"] = "DONE" + task["error_msg"] = None + return True, None + + except vimconn.vimconnException as e: + task["error_msg"] = self._format_vim_error_msg(str(e)) + if isinstance(e, vimconn.vimconnNotFoundException): + # If not found mark as Done and fill error_msg + task["status"] = "DONE" + return True, None + task["status"] = "FAILED" + return False, None + + def new_sfp(self, task): + vim_sfp_id = None + try: + params = task["params"] + task_id = task["instance_action_id"] + "." + str(task["task_index"]) + depends = task.get("depends") + error_text = "" + deps = task.get("depends").values() + sf_id_list = [] + classification_id_list = [] + for dep in deps: + vim_id = dep.get("vim_id") + resource = dep.get("item") + if resource == "instance_sfs": + sf_id_list.append(vim_id) + elif resource == "instance_classifications": + classification_id_list.append(vim_id) + + name = "sfp-%s" % task["item_id"][:8] + # By default no form of IETF SFC Encapsulation will be used + vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False) + + task["extra"]["created"] = True + task["error_msg"] = None + task["status"] = "DONE" + task["vim_id"] = vim_sfp_id + instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None} + return True, instance_element_update + + except (vimconn.vimconnException, VimThreadException) as e: + self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e)) + error_text = self._format_vim_error_msg(str(e)) + task["error_msg"] = error_text + task["status"] = "FAILED" + task["vim_id"] = None + instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text} + return False, instance_element_update + return + + def del_sfp(self, task): + sfp_vim_id = task["vim_id"] + try: + self.vim.delete_sfp(sfp_vim_id) + task["status"] = "DONE" + task["error_msg"] = None + return True, None + + except vimconn.vimconnException as e: + task["error_msg"] = self._format_vim_error_msg(str(e)) + if isinstance(e, vimconn.vimconnNotFoundException): + # If not found mark as Done and fill error_msg + task["status"] = "DONE" + return True, None + task["status"] = "FAILED" + return False, None