VNFFG support
[osm/RO.git] / osm_ro / vim_thread.py
index ce0fc7f..d5574b4 100644 (file)
@@ -530,6 +530,38 @@ class vim_thread(threading.Thread):
                         result, database_update = self.get_net(task)
                     else:
                         raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+                elif task["item"] == 'instance_sfis':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_sfi(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_sfi(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+                elif task["item"] == 'instance_sfs':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_sf(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_sf(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+                elif task["item"] == 'instance_classifications':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_classification(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_classification(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
+                elif task["item"] == 'instance_sfps':
+                    if task["action"] == "CREATE":
+                        result, database_update = self.new_sfp(task)
+                        nb_created += 1
+                    elif task["action"] == "DELETE":
+                        result, database_update = self.del_sfp(task)
+                    else:
+                        raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
                 else:
                     raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
                     # TODO
@@ -543,11 +575,14 @@ class vim_thread(threading.Thread):
                 elif task["item"] == 'instance_nets':
                     database_update["vim_net_id"] = None
 
+            no_refresh_tasks = ['instance_sfis', 'instance_sfs',
+                'instance_classifications', 'instance_sfps']
             if task["action"] == "DELETE":
                 action_key = task["item"] + task["item_id"]
                 del self.grouped_tasks[action_key]
             elif task["action"] in ("CREATE", "FIND") and task["status"] in ("DONE", "BUILD"):
-                self._insert_refresh(task)
+                if task["item"] not in no_refresh_tasks:
+                    self._insert_refresh(task)
 
             task_id = task["instance_action_id"] + "." + str(task["task_index"])
             self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
@@ -941,3 +976,234 @@ class vim_thread(threading.Thread):
                 return True, None
         task["status"] = "FAILED"
         return False, None
+
+    ## Service Function Instances
+
+    def new_sfi(self, task):
+        vim_sfi_id = None
+        try:
+            params = task["params"]
+            task_id = task["instance_action_id"] + "." + str(task["task_index"])
+            depends = task.get("depends")
+            error_text = ""
+            interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
+            # At the moment, every port associated with the VM will be used both as ingress and egress ports.
+            # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack, only the
+            # first ingress and first egress ports will be used to create the SFI (Port Pair).
+            port_id_list = [interfaces[0].get("vim_id")]
+            name = "sfi-%s" % task["item_id"][:8]
+            # By default no form of IETF SFC Encapsulation will be used
+            vim_sfi_id = self.vim.new_sfi(name, port_id_list, port_id_list, sfc_encap=False)
+
+            task["extra"]["created"] = True
+            task["error_msg"] = None
+            task["status"] = "DONE"
+            task["vim_id"] = vim_sfi_id
+            instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
+            return True, instance_element_update
+
+        except (vimconn.vimconnException, VimThreadException) as e:
+            self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
+            error_text = self._format_vim_error_msg(str(e))
+            task["error_msg"] = error_text
+            task["status"] = "FAILED"
+            task["vim_id"] = None
+            instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
+            return False, instance_element_update
+
+    def del_sfi(self, task):
+        sfi_vim_id = task["vim_id"]
+        try:
+            self.vim.delete_sfi(sfi_vim_id)
+            task["status"] = "DONE"
+            task["error_msg"] = None
+            return True, None
+
+        except vimconn.vimconnException as e:
+            task["error_msg"] = self._format_vim_error_msg(str(e))
+            if isinstance(e, vimconn.vimconnNotFoundException):
+                # If not found mark as Done and fill error_msg
+                task["status"] = "DONE"
+                return True, None
+            task["status"] = "FAILED"
+            return False, None
+
+    def new_sf(self, task):
+        vim_sf_id = None
+        try:
+            params = task["params"]
+            task_id = task["instance_action_id"] + "." + str(task["task_index"])
+            depends = task.get("depends")
+            error_text = ""
+            #sfis = task.get("depends").values()[0].get("extra").get("params")[5]
+            sfis = task.get("depends").values()
+            sfi_id_list = []
+            for sfi in sfis:
+                sfi_id_list.append(sfi.get("vim_id"))
+            name = "sf-%s" % task["item_id"][:8]
+            # By default no form of IETF SFC Encapsulation will be used
+            vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
+
+            task["extra"]["created"] = True
+            task["error_msg"] = None
+            task["status"] = "DONE"
+            task["vim_id"] = vim_sf_id
+            instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
+            return True, instance_element_update
+
+        except (vimconn.vimconnException, VimThreadException) as e:
+            self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
+            error_text = self._format_vim_error_msg(str(e))
+            task["error_msg"] = error_text
+            task["status"] = "FAILED"
+            task["vim_id"] = None
+            instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
+            return False, instance_element_update
+
+    def del_sf(self, task):
+        sf_vim_id = task["vim_id"]
+        try:
+            self.vim.delete_sf(sf_vim_id)
+            task["status"] = "DONE"
+            task["error_msg"] = None
+            return True, None
+
+        except vimconn.vimconnException as e:
+            task["error_msg"] = self._format_vim_error_msg(str(e))
+            if isinstance(e, vimconn.vimconnNotFoundException):
+                # If not found mark as Done and fill error_msg
+                task["status"] = "DONE"
+                return True, None
+            task["status"] = "FAILED"
+            return False, None
+
+    def new_classification(self, task):
+        vim_classification_id = None
+        try:
+            params = task["params"]
+            task_id = task["instance_action_id"] + "." + str(task["task_index"])
+            depends = task.get("depends")
+            error_text = ""
+            interfaces = task.get("depends").values()[0].get("extra").get("params")[5]
+            # Bear in mind that different VIM connectors might support Classifications differently.
+            # In the case of OpenStack, only the first VNF attached to the classifier will be used
+            # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
+            # Since the VNFFG classifier match lacks the ethertype, classification defaults to
+            # using the IPv4 flow classifier.
+            name = "c-%s" % task["item_id"][:8]
+            # if not CIDR is given for the IP addresses, add /32:
+            ip_proto = int(params.get("ip_proto"))
+            source_ip = params.get("source_ip")
+            destination_ip = params.get("destination_ip")
+            if ip_proto == 1:
+                ip_proto = 'icmp'
+            elif ip_proto == 6:
+                ip_proto = 'tcp'
+            elif ip_proto == 17:
+                ip_proto = 'udp'
+            if '/' not in source_ip:
+                source_ip += '/32'
+            if '/' not in destination_ip:
+                destination_ip += '/32'
+            definition = {
+                    "logical_source_port": interfaces[0].get("vim_id"),
+                    "protocol": ip_proto,
+                    "source_ip_prefix": source_ip,
+                    "destination_ip_prefix": destination_ip,
+                    "source_port_range_min": params.get("source_port"),
+                    "source_port_range_max": params.get("source_port"),
+                    "destination_port_range_min": params.get("destination_port"),
+                    "destination_port_range_max": params.get("destination_port"),
+            }
+
+            vim_classification_id = self.vim.new_classification(
+                name, 'legacy_flow_classifier', definition)
+
+            task["extra"]["created"] = True
+            task["error_msg"] = None
+            task["status"] = "DONE"
+            task["vim_id"] = vim_classification_id
+            instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id, "error_msg": None}
+            return True, instance_element_update
+
+        except (vimconn.vimconnException, VimThreadException) as e:
+            self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
+            error_text = self._format_vim_error_msg(str(e))
+            task["error_msg"] = error_text
+            task["status"] = "FAILED"
+            task["vim_id"] = None
+            instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
+            return False, instance_element_update
+
+    def del_classification(self, task):
+        classification_vim_id = task["vim_id"]
+        try:
+            self.vim.delete_classification(classification_vim_id)
+            task["status"] = "DONE"
+            task["error_msg"] = None
+            return True, None
+
+        except vimconn.vimconnException as e:
+            task["error_msg"] = self._format_vim_error_msg(str(e))
+            if isinstance(e, vimconn.vimconnNotFoundException):
+                # If not found mark as Done and fill error_msg
+                task["status"] = "DONE"
+                return True, None
+            task["status"] = "FAILED"
+            return False, None
+
+    def new_sfp(self, task):
+        vim_sfp_id = None
+        try:
+            params = task["params"]
+            task_id = task["instance_action_id"] + "." + str(task["task_index"])
+            depends = task.get("depends")
+            error_text = ""
+            deps = task.get("depends").values()
+            sf_id_list = []
+            classification_id_list = []
+            for dep in deps:
+                vim_id = dep.get("vim_id")
+                resource = dep.get("item")
+                if resource == "instance_sfs":
+                    sf_id_list.append(vim_id)
+                elif resource == "instance_classifications":
+                    classification_id_list.append(vim_id)
+
+            name = "sfp-%s" % task["item_id"][:8]
+            # By default no form of IETF SFC Encapsulation will be used
+            vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
+
+            task["extra"]["created"] = True
+            task["error_msg"] = None
+            task["status"] = "DONE"
+            task["vim_id"] = vim_sfp_id
+            instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
+            return True, instance_element_update
+
+        except (vimconn.vimconnException, VimThreadException) as e:
+            self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
+            error_text = self._format_vim_error_msg(str(e))
+            task["error_msg"] = error_text
+            task["status"] = "FAILED"
+            task["vim_id"] = None
+            instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
+            return False, instance_element_update
+        return
+
+    def del_sfp(self, task):
+        sfp_vim_id = task["vim_id"]
+        try:
+            self.vim.delete_sfp(sfp_vim_id)
+            task["status"] = "DONE"
+            task["error_msg"] = None
+            return True, None
+
+        except vimconn.vimconnException as e:
+            task["error_msg"] = self._format_vim_error_msg(str(e))
+            if isinstance(e, vimconn.vimconnNotFoundException):
+                # If not found mark as Done and fill error_msg
+                task["status"] = "DONE"
+                return True, None
+            task["status"] = "FAILED"
+            return False, None