# limitations under the License.
##
+from copy import deepcopy
from http import HTTPStatus
from itertools import product
import logging
"image": Ns._process_image_params,
"flavor": Ns._process_flavor_params,
"vdu": Ns._process_vdu_params,
+ "classification": Ns._process_classification_params,
+ "sfi": Ns._process_sfi_params,
+ "sf": Ns._process_sf_params,
+ "sfp": Ns._process_sfp_params,
"affinity-or-anti-affinity-group": Ns._process_affinity_group_params,
"shared-volumes": Ns._process_shared_volumes_params,
}
"image": "image",
"flavor": "flavor",
"vdu": "vdur",
+ "classification": "classification",
+ "sfi": "sfi",
+ "sf": "sf",
+ "sfp": "sfp",
"affinity-or-anti-affinity-group": "affinity-or-anti-affinity-group",
"shared-volumes": "shared-volumes",
}
extra_dict["params"] = {"flavor_data": flavor_data_name}
return extra_dict
+ @staticmethod
+ def _prefix_ip_address(ip_address):
+ if "/" not in ip_address:
+ ip_address += "/32"
+ return ip_address
+
+ @staticmethod
+ def _process_ip_proto(ip_proto):
+ if ip_proto:
+ if ip_proto == 1:
+ ip_proto = "icmp"
+ elif ip_proto == 6:
+ ip_proto = "tcp"
+ elif ip_proto == 17:
+ ip_proto = "udp"
+ return ip_proto
+
+ @staticmethod
+ def _process_classification_params(
+ target_classification: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_classification (Dict[str, Any]): Classification dictionary parameters that needs to be processed to create resource on VIM
+ indata (Dict[str, Any]): Deployment info
+ vim_info (Dict[str, Any]):To add items created by OSM on the VIM.
+ target_record_id (str): Task record ID.
+ **kwargs (Dict[str, Any]): Used to send additional information to the task.
+
+ Returns:
+ Dict[str, Any]: Return parameters required to create classification and Items on which classification is dependent.
+ """
+ vnfr_id = target_classification["vnfr_id"]
+ vdur_id = target_classification["vdur_id"]
+ port_index = target_classification["ingress_port_index"]
+ extra_dict = {}
+
+ classification_data = {
+ "name": target_classification["id"],
+ "source_port_range_min": target_classification["source-port"],
+ "source_port_range_max": target_classification["source-port"],
+ "destination_port_range_min": target_classification["destination-port"],
+ "destination_port_range_max": target_classification["destination-port"],
+ }
+
+ classification_data["source_ip_prefix"] = Ns._prefix_ip_address(
+ target_classification["source-ip-address"]
+ )
+
+ classification_data["destination_ip_prefix"] = Ns._prefix_ip_address(
+ target_classification["destination-ip-address"]
+ )
+
+ classification_data["protocol"] = Ns._process_ip_proto(
+ int(target_classification["ip-proto"])
+ )
+
+ db = kwargs.get("db")
+ vdu_text = Ns._get_vnfr_vdur_text(db, vnfr_id, vdur_id)
+
+ extra_dict = {"depends_on": [vdu_text]}
+
+ extra_dict = {"depends_on": [vdu_text]}
+ classification_data["logical_source_port"] = "TASK-" + vdu_text
+ classification_data["logical_source_port_index"] = port_index
+
+ extra_dict["params"] = classification_data
+
+ return extra_dict
+
+ @staticmethod
+ def _process_sfi_params(
+ target_sfi: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_sfi (Dict[str, Any]): SFI dictionary parameters that needs to be processed to create resource on VIM
+ indata (Dict[str, Any]): deployment info
+ vim_info (Dict[str, Any]): To add items created by OSM on the VIM.
+ target_record_id (str): Task record ID.
+ **kwargs (Dict[str, Any]): Used to send additional information to the task.
+
+ Returns:
+ Dict[str, Any]: Return parameters required to create SFI and Items on which SFI is dependent.
+ """
+
+ vnfr_id = target_sfi["vnfr_id"]
+ vdur_id = target_sfi["vdur_id"]
+
+ sfi_data = {
+ "name": target_sfi["id"],
+ "ingress_port_index": target_sfi["ingress_port_index"],
+ "egress_port_index": target_sfi["egress_port_index"],
+ }
+
+ db = kwargs.get("db")
+ vdu_text = Ns._get_vnfr_vdur_text(db, vnfr_id, vdur_id)
+
+ extra_dict = {"depends_on": [vdu_text]}
+ sfi_data["ingress_port"] = "TASK-" + vdu_text
+ sfi_data["egress_port"] = "TASK-" + vdu_text
+
+ extra_dict["params"] = sfi_data
+
+ return extra_dict
+
+ @staticmethod
+ def _get_vnfr_vdur_text(db, vnfr_id, vdur_id):
+ vnf_preffix = "vnfrs:{}".format(vnfr_id)
+ db_vnfr = db.get_one("vnfrs", {"_id": vnfr_id})
+ vdur_list = []
+ vdu_text = ""
+
+ if db_vnfr:
+ vdur_list = [
+ vdur["id"] for vdur in db_vnfr["vdur"] if vdur["vdu-id-ref"] == vdur_id
+ ]
+
+ if vdur_list:
+ vdu_text = vnf_preffix + ":vdur." + vdur_list[0]
+
+ return vdu_text
+
+ @staticmethod
+ def _process_sf_params(
+ target_sf: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_sf (Dict[str, Any]): SF dictionary parameters that needs to be processed to create resource on VIM
+ indata (Dict[str, Any]): Deployment info.
+ vim_info (Dict[str, Any]):To add items created by OSM on the VIM.
+ target_record_id (str): Task record ID.
+ **kwargs (Dict[str, Any]): Used to send additional information to the task.
+
+ Returns:
+ Dict[str, Any]: Return parameters required to create SF and Items on which SF is dependent.
+ """
+
+ nsr_id = kwargs.get("nsr_id", "")
+ sfis = target_sf["sfis"]
+ ns_preffix = "nsrs:{}".format(nsr_id)
+ extra_dict = {"depends_on": [], "params": []}
+ sf_data = {"name": target_sf["id"], "sfis": sfis}
+
+ for count, sfi in enumerate(sfis):
+ sfi_text = ns_preffix + ":sfi." + sfi
+ sfis[count] = "TASK-" + sfi_text
+ extra_dict["depends_on"].append(sfi_text)
+
+ extra_dict["params"] = sf_data
+
+ return extra_dict
+
+ @staticmethod
+ def _process_sfp_params(
+ target_sfp: Dict[str, Any],
+ indata: Dict[str, Any],
+ vim_info: Dict[str, Any],
+ target_record_id: str,
+ **kwargs: Dict[str, Any],
+ ) -> Dict[str, Any]:
+ """[summary]
+
+ Args:
+ target_sfp (Dict[str, Any]): SFP dictionary parameters that needs to be processed to create resource on VIM.
+ indata (Dict[str, Any]): Deployment info
+ vim_info (Dict[str, Any]):To add items created by OSM on the VIM.
+ target_record_id (str): Task record ID.
+ **kwargs (Dict[str, Any]): Used to send additional information to the task.
+
+ Returns:
+ Dict[str, Any]: Return parameters required to create SFP and Items on which SFP is dependent.
+ """
+
+ nsr_id = kwargs.get("nsr_id")
+ sfs = target_sfp["sfs"]
+ classifications = target_sfp["classifications"]
+ ns_preffix = "nsrs:{}".format(nsr_id)
+ extra_dict = {"depends_on": [], "params": []}
+ sfp_data = {
+ "name": target_sfp["id"],
+ "sfs": sfs,
+ "classifications": classifications,
+ }
+
+ for count, sf in enumerate(sfs):
+ sf_text = ns_preffix + ":sf." + sf
+ sfs[count] = "TASK-" + sf_text
+ extra_dict["depends_on"].append(sf_text)
+
+ for count, classi in enumerate(classifications):
+ classi_text = ns_preffix + ":classification." + classi
+ classifications[count] = "TASK-" + classi_text
+ extra_dict["depends_on"].append(classi_text)
+
+ extra_dict["params"] = sfp_data
+
+ return extra_dict
+
@staticmethod
def _process_net_params(
target_vld: Dict[str, Any],
# the existing_list and the method to process params are set
db_path = self.db_path_map[item]
process_params = self.process_params_function_map[item]
- if item in ("net", "vdu"):
+
+ if item in ("sfp", "classification", "sf", "sfi"):
+ db_record = "nsrs:{}:{}".format(nsr_id, db_path)
+ target_vnffg = indata.get("vnffg", [])[0]
+ target_list = target_vnffg[item]
+ existing_list = db_nsr.get(item, [])
+ elif item in ("net", "vdu"):
# This case is specific for the NS VLD (not applied to VDU)
if vnfr is None:
db_record = "nsrs:{}:{}".format(nsr_id, db_path)
}
)
self.logger.debug("calculate_diff_items kwargs={}".format(kwargs))
+ if (
+ process_params == Ns._process_sfi_params
+ or Ns._process_sf_params
+ or Ns._process_classification_params
+ or Ns._process_sfp_params
+ ):
+ kwargs.update({"nsr_id": nsr_id, "db": self.db})
+
+ self.logger.debug("calculate_diff_items kwargs={}".format(kwargs))
+
extra_dict = process_params(
target_item,
indata,
return diff_items, task_index
+ def _process_vnfgd_sfp(self, sfp):
+ processed_sfp = {}
+ # getting sfp name, sfs and classifications in sfp to store it in processed_sfp
+ processed_sfp["id"] = sfp["id"]
+ sfs_in_sfp = [
+ sf["id"] for sf in sfp.get("position-desc-id", [])[0].get("cp-profile-id")
+ ]
+ classifications_in_sfp = [
+ classi["id"]
+ for classi in sfp.get("position-desc-id", [])[0].get("match-attributes")
+ ]
+
+ # creating a list of sfp with sfs and classifications
+ processed_sfp["sfs"] = sfs_in_sfp
+ processed_sfp["classifications"] = classifications_in_sfp
+
+ return processed_sfp
+
+ def _process_vnfgd_sf(self, sf):
+ processed_sf = {}
+ # getting name of sf
+ processed_sf["id"] = sf["id"]
+ # getting sfis in sf
+ sfis_in_sf = sf.get("constituent-profile-elements")
+ sorted_sfis = sorted(sfis_in_sf, key=lambda i: i["order"])
+ # getting sfis names
+ processed_sf["sfis"] = [sfi["id"] for sfi in sorted_sfis]
+
+ return processed_sf
+
+ def _process_vnfgd_sfi(self, sfi, db_vnfrs):
+ processed_sfi = {}
+ # getting name of sfi
+ processed_sfi["id"] = sfi["id"]
+
+ # getting ports in sfi
+ ingress_port = sfi["ingress-constituent-cpd-id"]
+ egress_port = sfi["egress-constituent-cpd-id"]
+ sfi_vnf_member_index = sfi["constituent-base-element-id"]
+
+ processed_sfi["ingress_port"] = ingress_port
+ processed_sfi["egress_port"] = egress_port
+
+ all_vnfrs = db_vnfrs.values()
+
+ sfi_vnfr = [
+ element
+ for element in all_vnfrs
+ if element["member-vnf-index-ref"] == sfi_vnf_member_index
+ ]
+ processed_sfi["vnfr_id"] = sfi_vnfr[0]["id"]
+
+ sfi_vnfr_cp = sfi_vnfr[0]["connection-point"]
+
+ ingress_port_index = [
+ c for c, element in enumerate(sfi_vnfr_cp) if element["id"] == ingress_port
+ ]
+ ingress_port_index = ingress_port_index[0]
+
+ processed_sfi["vdur_id"] = sfi_vnfr_cp[ingress_port_index][
+ "connection-point-vdu-id"
+ ]
+ processed_sfi["ingress_port_index"] = ingress_port_index
+ processed_sfi["egress_port_index"] = ingress_port_index
+
+ if egress_port != ingress_port:
+ egress_port_index = [
+ c
+ for c, element in enumerate(sfi_vnfr_cp)
+ if element["id"] == egress_port
+ ]
+ processed_sfi["egress_port_index"] = egress_port_index
+
+ return processed_sfi
+
+ def _process_vnfgd_classification(self, classification, db_vnfrs):
+ processed_classification = {}
+
+ processed_classification = deepcopy(classification)
+ classi_vnf_member_index = processed_classification[
+ "constituent-base-element-id"
+ ]
+ logical_source_port = processed_classification["constituent-cpd-id"]
+
+ all_vnfrs = db_vnfrs.values()
+
+ classi_vnfr = [
+ element
+ for element in all_vnfrs
+ if element["member-vnf-index-ref"] == classi_vnf_member_index
+ ]
+ processed_classification["vnfr_id"] = classi_vnfr[0]["id"]
+
+ classi_vnfr_cp = classi_vnfr[0]["connection-point"]
+
+ ingress_port_index = [
+ c
+ for c, element in enumerate(classi_vnfr_cp)
+ if element["id"] == logical_source_port
+ ]
+ ingress_port_index = ingress_port_index[0]
+
+ processed_classification["ingress_port_index"] = ingress_port_index
+ processed_classification["vdur_id"] = classi_vnfr_cp[ingress_port_index][
+ "connection-point-vdu-id"
+ ]
+
+ return processed_classification
+
+ def _update_db_nsr_with_vnffg(self, processed_vnffg, vim_info, nsr_id):
+ """This method used to add viminfo dict to sfi, sf sfp and classification in indata and count info in db_nsr.
+
+ Args:
+ processed_vnffg (Dict[str, Any]): deployment info
+ vim_info (Dict): dictionary to store VIM resource information
+ nsr_id (str): NSR id
+
+ Returns: None
+ """
+
+ nsr_sfi = {}
+ nsr_sf = {}
+ nsr_sfp = {}
+ nsr_classification = {}
+ db_nsr_vnffg = deepcopy(processed_vnffg)
+
+ for count, sfi in enumerate(processed_vnffg["sfi"]):
+ sfi["vim_info"] = vim_info
+ sfi_count = "sfi.{}".format(count)
+ nsr_sfi[sfi_count] = db_nsr_vnffg["sfi"][count]
+
+ self.db.set_list("nsrs", {"_id": nsr_id}, nsr_sfi)
+
+ for count, sf in enumerate(processed_vnffg["sf"]):
+ sf["vim_info"] = vim_info
+ sf_count = "sf.{}".format(count)
+ nsr_sf[sf_count] = db_nsr_vnffg["sf"][count]
+
+ self.db.set_list("nsrs", {"_id": nsr_id}, nsr_sf)
+
+ for count, sfp in enumerate(processed_vnffg["sfp"]):
+ sfp["vim_info"] = vim_info
+ sfp_count = "sfp.{}".format(count)
+ nsr_sfp[sfp_count] = db_nsr_vnffg["sfp"][count]
+
+ self.db.set_list("nsrs", {"_id": nsr_id}, nsr_sfp)
+
+ for count, classi in enumerate(processed_vnffg["classification"]):
+ classi["vim_info"] = vim_info
+ classification_count = "classification.{}".format(count)
+ nsr_classification[classification_count] = db_nsr_vnffg["classification"][
+ count
+ ]
+
+ self.db.set_list("nsrs", {"_id": nsr_id}, nsr_classification)
+
+ def process_vnffgd_descriptor(
+ self,
+ indata: dict,
+ nsr_id: str,
+ db_nsr: dict,
+ db_vnfrs: dict,
+ ) -> dict:
+ """This method used to process vnffgd parameters from descriptor.
+
+ Args:
+ indata (Dict[str, Any]): deployment info
+ nsr_id (str): NSR id
+ db_nsr: NSR record from DB
+ db_vnfrs: VNFRS record from DB
+
+ Returns:
+ Dict: Processed vnffg parameters.
+ """
+
+ processed_vnffg = {}
+ vnffgd = db_nsr.get("nsd", {}).get("vnffgd")
+ vnf_list = indata.get("vnf", [])
+ vim_text = ""
+
+ if vnf_list:
+ vim_text = "vim:" + vnf_list[0].get("vim-account-id", "")
+
+ vim_info = {}
+ vim_info[vim_text] = {}
+ processed_sfps = []
+ processed_classifications = []
+ processed_sfs = []
+ processed_sfis = []
+
+ # setting up intial empty entries for vnffg items in mongodb.
+ self.db.set_list(
+ "nsrs",
+ {"_id": nsr_id},
+ {
+ "sfi": [],
+ "sf": [],
+ "sfp": [],
+ "classification": [],
+ },
+ )
+
+ vnffg = vnffgd[0]
+ # getting sfps
+ sfps = vnffg.get("nfpd")
+ for sfp in sfps:
+ processed_sfp = self._process_vnfgd_sfp(sfp)
+ # appending the list of processed sfps
+ processed_sfps.append(processed_sfp)
+
+ # getting sfs in sfp
+ sfs = sfp.get("position-desc-id")[0].get("cp-profile-id")
+ for sf in sfs:
+ processed_sf = self._process_vnfgd_sf(sf)
+
+ # appending the list of processed sfs
+ processed_sfs.append(processed_sf)
+
+ # getting sfis in sf
+ sfis_in_sf = sf.get("constituent-profile-elements")
+ sorted_sfis = sorted(sfis_in_sf, key=lambda i: i["order"])
+
+ for sfi in sorted_sfis:
+ processed_sfi = self._process_vnfgd_sfi(sfi, db_vnfrs)
+
+ processed_sfis.append(processed_sfi)
+
+ classifications = sfp.get("position-desc-id")[0].get("match-attributes")
+ # getting classifications from sfp
+ for classification in classifications:
+ processed_classification = self._process_vnfgd_classification(
+ classification, db_vnfrs
+ )
+
+ processed_classifications.append(processed_classification)
+
+ processed_vnffg["sfi"] = processed_sfis
+ processed_vnffg["sf"] = processed_sfs
+ processed_vnffg["classification"] = processed_classifications
+ processed_vnffg["sfp"] = processed_sfps
+
+ # adding viminfo dict to sfi, sf sfp and classification
+ self._update_db_nsr_with_vnffg(processed_vnffg, vim_info, nsr_id)
+
+ # updating indata with vnffg porcessed parameters
+ indata["vnffg"].append(processed_vnffg)
+
def calculate_all_differences_to_deploy(
self,
indata,
# set list with diffs:
changes_list = []
+ # processing vnffg from descriptor parameter
+ vnffgd = db_nsr.get("nsd").get("vnffgd")
+ if vnffgd is not None:
+ indata["vnffg"] = []
+ vnf_list = indata["vnf"]
+ processed_vnffg = {}
+
+ # in case of ns-delete
+ if not vnf_list:
+ processed_vnffg["sfi"] = []
+ processed_vnffg["sf"] = []
+ processed_vnffg["classification"] = []
+ processed_vnffg["sfp"] = []
+
+ indata["vnffg"].append(processed_vnffg)
+
+ else:
+ self.process_vnffgd_descriptor(
+ indata=indata,
+ nsr_id=nsr_id,
+ db_nsr=db_nsr,
+ db_vnfrs=db_vnfrs,
+ )
+
+ # getting updated db_nsr having vnffg parameters
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ self.logger.debug(
+ "After processing vnffd parameters indata={} nsr={}".format(
+ indata, db_nsr
+ )
+ )
+
+ for item in ["sfp", "classification", "sf", "sfi"]:
+ self.logger.debug("process NS={} {}".format(nsr_id, item))
+ diff_items, task_index = self.calculate_diff_items(
+ indata=indata,
+ db_nsr=db_nsr,
+ db_ro_nsr=db_ro_nsr,
+ db_nsr_update=db_nsr_update,
+ item=item,
+ tasks_by_target_record_id=tasks_by_target_record_id,
+ action_id=action_id,
+ nsr_id=nsr_id,
+ task_index=task_index,
+ vnfr_id=None,
+ )
+ changes_list += diff_items
+
# NS vld, image and flavor
for item in [
"net",
return "DONE", ro_vim_item_update_ok
+class VimInteractionClassification(VimInteractionBase):
+ def new(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ try:
+ created = True
+ params = task["params"]
+ params_copy = deepcopy(params)
+
+ name = params_copy.pop("name")
+ logical_source_port_index = int(
+ params_copy.pop("logical_source_port_index")
+ )
+ logical_source_port = params_copy["logical_source_port"]
+
+ if logical_source_port.startswith("TASK-"):
+ vm_id = task_depends[logical_source_port]
+ params_copy["logical_source_port"] = target_vim.refresh_vms_status(
+ [vm_id]
+ )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"]
+
+ vim_classification_id = target_vim.new_classification(
+ name, "legacy_flow_classifier", params_copy
+ )
+
+ ro_vim_item_update = {
+ "vim_id": vim_classification_id,
+ "vim_status": "DONE",
+ "created": created,
+ "vim_details": None,
+ "vim_message": None,
+ }
+ self.logger.debug(
+ "task={} {} created={}".format(task_id, ro_task["target_id"], created)
+ )
+
+ return "DONE", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.error(
+ "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_message": str(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ classification_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {
+ "vim_status": "DELETED",
+ "created": False,
+ "vim_message": "DELETED",
+ "vim_id": None,
+ }
+
+ try:
+ if classification_vim_id:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_classification(classification_vim_id)
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_message"] = "already deleted"
+ except vimconn.VimConnException as e:
+ self.logger.error(
+ "ro_task={} vim={} del-classification={}: {}".format(
+ ro_task["_id"], ro_task["target_id"], classification_vim_id, e
+ )
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "vim_message": "Error while deleting: {}".format(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug(
+ "task={} {} del-classification={} {}".format(
+ task_id,
+ ro_task["target_id"],
+ classification_vim_id,
+ ro_vim_item_update_ok.get("vim_message", ""),
+ )
+ )
+
+ return "DONE", ro_vim_item_update_ok
+
+
+class VimInteractionSfi(VimInteractionBase):
+ def new(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ try:
+ created = True
+ params = task["params"]
+ params_copy = deepcopy(params)
+ name = params_copy["name"]
+ ingress_port = params_copy["ingress_port"]
+ egress_port = params_copy["egress_port"]
+ ingress_port_index = params_copy["ingress_port_index"]
+ egress_port_index = params_copy["egress_port_index"]
+
+ ingress_port_id = ingress_port
+ egress_port_id = egress_port
+
+ vm_id = task_depends[ingress_port]
+
+ if ingress_port.startswith("TASK-"):
+ ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
+ "interfaces"
+ ][ingress_port_index]["vim_interface_id"]
+
+ if ingress_port == egress_port:
+ egress_port_id = ingress_port_id
+ else:
+ if egress_port.startswith("TASK-"):
+ egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
+ "interfaces"
+ ][egress_port_index]["vim_interface_id"]
+
+ ingress_port_id_list = [ingress_port_id]
+ egress_port_id_list = [egress_port_id]
+
+ vim_sfi_id = target_vim.new_sfi(
+ name, ingress_port_id_list, egress_port_id_list, sfc_encap=False
+ )
+
+ ro_vim_item_update = {
+ "vim_id": vim_sfi_id,
+ "vim_status": "DONE",
+ "created": created,
+ "vim_details": None,
+ "vim_message": None,
+ }
+ self.logger.debug(
+ "task={} {} created={}".format(task_id, ro_task["target_id"], created)
+ )
+
+ return "DONE", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.error(
+ "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_message": str(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ sfi_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {
+ "vim_status": "DELETED",
+ "created": False,
+ "vim_message": "DELETED",
+ "vim_id": None,
+ }
+
+ try:
+ if sfi_vim_id:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_sfi(sfi_vim_id)
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_message"] = "already deleted"
+ except vimconn.VimConnException as e:
+ self.logger.error(
+ "ro_task={} vim={} del-sfi={}: {}".format(
+ ro_task["_id"], ro_task["target_id"], sfi_vim_id, e
+ )
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "vim_message": "Error while deleting: {}".format(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug(
+ "task={} {} del-sfi={} {}".format(
+ task_id,
+ ro_task["target_id"],
+ sfi_vim_id,
+ ro_vim_item_update_ok.get("vim_message", ""),
+ )
+ )
+
+ return "DONE", ro_vim_item_update_ok
+
+
+class VimInteractionSf(VimInteractionBase):
+ def new(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ try:
+ created = True
+ params = task["params"]
+ params_copy = deepcopy(params)
+ name = params_copy["name"]
+ sfi_list = params_copy["sfis"]
+ sfi_id_list = []
+
+ for sfi in sfi_list:
+ sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi
+ sfi_id_list.append(sfi_id)
+
+ vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False)
+
+ ro_vim_item_update = {
+ "vim_id": vim_sf_id,
+ "vim_status": "DONE",
+ "created": created,
+ "vim_details": None,
+ "vim_message": None,
+ }
+ self.logger.debug(
+ "task={} {} created={}".format(task_id, ro_task["target_id"], created)
+ )
+
+ return "DONE", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.error(
+ "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_message": str(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ sf_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {
+ "vim_status": "DELETED",
+ "created": False,
+ "vim_message": "DELETED",
+ "vim_id": None,
+ }
+
+ try:
+ if sf_vim_id:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_sf(sf_vim_id)
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_message"] = "already deleted"
+ except vimconn.VimConnException as e:
+ self.logger.error(
+ "ro_task={} vim={} del-sf={}: {}".format(
+ ro_task["_id"], ro_task["target_id"], sf_vim_id, e
+ )
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "vim_message": "Error while deleting: {}".format(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug(
+ "task={} {} del-sf={} {}".format(
+ task_id,
+ ro_task["target_id"],
+ sf_vim_id,
+ ro_vim_item_update_ok.get("vim_message", ""),
+ )
+ )
+
+ return "DONE", ro_vim_item_update_ok
+
+
+class VimInteractionSfp(VimInteractionBase):
+ def new(self, ro_task, task_index, task_depends):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ created = False
+ target_vim = self.my_vims[ro_task["target_id"]]
+
+ try:
+ created = True
+ params = task["params"]
+ params_copy = deepcopy(params)
+ name = params_copy["name"]
+ sf_list = params_copy["sfs"]
+ classification_list = params_copy["classifications"]
+
+ classification_id_list = []
+ sf_id_list = []
+
+ for classification in classification_list:
+ classi_id = (
+ task_depends[classification]
+ if classification.startswith("TASK-")
+ else classification
+ )
+ classification_id_list.append(classi_id)
+
+ for sf in sf_list:
+ sf_id = task_depends[sf] if sf.startswith("TASK-") else sf
+ sf_id_list.append(sf_id)
+
+ vim_sfp_id = target_vim.new_sfp(
+ name, classification_id_list, sf_id_list, sfc_encap=False
+ )
+
+ ro_vim_item_update = {
+ "vim_id": vim_sfp_id,
+ "vim_status": "DONE",
+ "created": created,
+ "vim_details": None,
+ "vim_message": None,
+ }
+ self.logger.debug(
+ "task={} {} created={}".format(task_id, ro_task["target_id"], created)
+ )
+
+ return "DONE", ro_vim_item_update
+ except (vimconn.VimConnException, NsWorkerException) as e:
+ self.logger.debug(traceback.format_exc())
+ self.logger.error(
+ "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "created": created,
+ "vim_message": str(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ def delete(self, ro_task, task_index):
+ task = ro_task["tasks"][task_index]
+ task_id = task["task_id"]
+ sfp_vim_id = ro_task["vim_info"]["vim_id"]
+ ro_vim_item_update_ok = {
+ "vim_status": "DELETED",
+ "created": False,
+ "vim_message": "DELETED",
+ "vim_id": None,
+ }
+
+ try:
+ if sfp_vim_id:
+ target_vim = self.my_vims[ro_task["target_id"]]
+ target_vim.delete_sfp(sfp_vim_id)
+ except vimconn.VimConnNotFoundException:
+ ro_vim_item_update_ok["vim_message"] = "already deleted"
+ except vimconn.VimConnException as e:
+ self.logger.error(
+ "ro_task={} vim={} del-sfp={}: {}".format(
+ ro_task["_id"], ro_task["target_id"], sfp_vim_id, e
+ )
+ )
+ ro_vim_item_update = {
+ "vim_status": "VIM_ERROR",
+ "vim_message": "Error while deleting: {}".format(e),
+ }
+
+ return "FAILED", ro_vim_item_update
+
+ self.logger.debug(
+ "task={} {} del-sfp={} {}".format(
+ task_id,
+ ro_task["target_id"],
+ sfp_vim_id,
+ ro_vim_item_update_ok.get("vim_message", ""),
+ )
+ )
+
+ return "DONE", ro_vim_item_update_ok
+
+
class VimInteractionVdu(VimInteractionBase):
max_retries_inject_ssh_key = 20 # 20 times
time_retries_inject_ssh_key = 30 # wevery 30 seconds
"shared-volumes": VimInteractionSharedVolume(
self.db, self.my_vims, self.db_vims, self.logger
),
+ "classification": VimInteractionClassification(
+ self.db, self.my_vims, self.db_vims, self.logger
+ ),
+ "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger),
+ "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger),
+ "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger),
"vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
"image": VimInteractionImage(
self.db, self.my_vims, self.db_vims, self.logger
vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"]
with self.assertRaises(AttributeError):
Ns._select_persistent_root_disk(vsd, vdu)
+
+
+class TestSFC(unittest.TestCase):
+ def setUp(self):
+ self.ns = Ns()
+ self.logger = CopyingMock(autospec=True)
+
+ @patch("osm_ng_ro.ns.Ns._prefix_ip_address")
+ @patch("osm_ng_ro.ns.Ns._process_ip_proto")
+ @patch("osm_ng_ro.ns.Ns._get_vnfr_vdur_text")
+ def test_process_classification_params(
+ self, mock_get_vnfr_vdur_text, mock_process_ip_proto, mock_prefix_ip_address
+ ):
+ db = Mock()
+ mock_prefix_ip_address.side_effect = ["10.10.10.10/32", "20.20.20.20/32"]
+ mock_process_ip_proto.return_value = "tcp"
+ mock_get_vnfr_vdur_text.return_value = "vdur_text"
+ vim_info, indata, target_record_id = {}, {}, ""
+ target_classification = {
+ "vnfr_id": "1234",
+ "source-ip-address": "10.10.10.10",
+ "destination-ip-address": "20.20.20.20",
+ "ip-proto": "6",
+ "id": "rule1",
+ "source-port": "0",
+ "destination-port": 5555,
+ "vdur_id": "5678",
+ "ingress_port_index": 0,
+ "vim_info": vim_info,
+ }
+ kwargs = {"db": db}
+
+ expected_result = {
+ "depends_on": ["vdur_text"],
+ "params": {
+ "destination_ip_prefix": "20.20.20.20/32",
+ "destination_port_range_max": 5555,
+ "destination_port_range_min": 5555,
+ "logical_source_port": "TASK-vdur_text",
+ "logical_source_port_index": 0,
+ "name": "rule1",
+ "protocol": "tcp",
+ "source_ip_prefix": "10.10.10.10/32",
+ "source_port_range_max": "0",
+ "source_port_range_min": "0",
+ },
+ }
+
+ result = self.ns._process_classification_params(
+ target_classification, indata, vim_info, target_record_id, **kwargs
+ )
+ self.assertEqual(expected_result, result)
+
+ def test_process_sfp_params(self):
+ sf_text = "nsrs:1234:sf.sf1"
+ classi_text = "nsrs:1234:classification.rule1"
+ vim_info, indata, target_record_id = {}, {}, ""
+ target_sfp = {
+ "id": "sfp1",
+ "sfs": ["sf1"],
+ "classifications": ["rule1"],
+ "vim_info": vim_info,
+ }
+
+ kwargs = {"nsr_id": "1234"}
+
+ expected_result = {
+ "depends_on": [sf_text, classi_text],
+ "params": {
+ "name": "sfp1",
+ "sfs": ["TASK-" + sf_text],
+ "classifications": ["TASK-" + classi_text],
+ },
+ }
+
+ result = self.ns._process_sfp_params(
+ target_sfp, indata, vim_info, target_record_id, **kwargs
+ )
+ self.assertEqual(expected_result, result)
+
+ def test_process_sf_params(self):
+ sfi_text = "nsrs::sfi.sfi1"
+ vim_info, indata, target_record_id = {}, {}, ""
+ target_sf = {"id": "sf1", "sfis": ["sfi1"], "vim_info": vim_info}
+
+ kwargs = {"ns_id": "1234"}
+
+ expected_result = {
+ "depends_on": [sfi_text],
+ "params": {
+ "name": "sf1",
+ "sfis": ["TASK-" + sfi_text],
+ },
+ }
+
+ result = self.ns._process_sf_params(
+ target_sf, indata, vim_info, target_record_id, **kwargs
+ )
+ self.assertEqual(expected_result, result)
+
+ @patch("osm_ng_ro.ns.Ns._get_vnfr_vdur_text")
+ def test_process_sfi_params(self, mock_get_vnfr_vdur_text):
+ db = Mock()
+ mock_get_vnfr_vdur_text.return_value = "vdur_text"
+ vim_info, indata, target_record_id = {}, {}, ""
+ target_sfi = {
+ "id": "sfi1",
+ "ingress_port": "vnf-cp0-ext",
+ "egress_port": "vnf-cp0-ext",
+ "vnfr_id": "1234",
+ "vdur_id": "5678",
+ "ingress_port_index": 0,
+ "egress_port_index": 0,
+ "vim_info": {},
+ }
+ kwargs = {"db": db}
+
+ expected_result = {
+ "depends_on": ["vdur_text"],
+ "params": {
+ "name": "sfi1",
+ "ingress_port": "TASK-vdur_text",
+ "egress_port": "TASK-vdur_text",
+ "ingress_port_index": 0,
+ "egress_port_index": 0,
+ },
+ }
+
+ result = self.ns._process_sfi_params(
+ target_sfi, indata, vim_info, target_record_id, **kwargs
+ )
+ self.assertEqual(expected_result, result)
+
+ def test_process_vnfgd_sfp(self):
+ sfp = {
+ "id": "sfp1",
+ "position-desc-id": [
+ {
+ "id": "position1",
+ "cp-profile-id": [{"id": "sf1"}],
+ "match-attributes": [{"id": "rule1"}],
+ }
+ ],
+ }
+ expected_result = {"id": "sfp1", "sfs": ["sf1"], "classifications": ["rule1"]}
+
+ result = self.ns._process_vnfgd_sfp(sfp)
+ self.assertEqual(expected_result, result)
+
+ def test_process_vnfgd_sf(self):
+ sf = {"id": "sf1", "constituent-profile-elements": [{"id": "sfi1", "order": 0}]}
+ expected_result = {"id": "sf1", "sfis": ["sfi1"]}
+
+ result = self.ns._process_vnfgd_sf(sf)
+ self.assertEqual(expected_result, result)
+
+ def test_process_vnfgd_sfi(self):
+ sfi = {
+ "id": "sfi1",
+ "constituent-base-element-id": "vnf",
+ "order": 0,
+ "ingress-constituent-cpd-id": "vnf-cp0-ext",
+ "egress-constituent-cpd-id": "vnf-cp0-ext",
+ }
+ db_vnfrs = {
+ "1234": {
+ "id": "1234",
+ "member-vnf-index-ref": "vnf",
+ "connection-point": [
+ {
+ "name": "vnf-cp0-ext",
+ "connection-point-id": "vdu-eth0-int",
+ "connection-point-vdu-id": "5678",
+ "id": "vnf-cp0-ext",
+ }
+ ],
+ }
+ }
+ expected_result = {
+ "id": "sfi1",
+ "ingress_port": "vnf-cp0-ext",
+ "egress_port": "vnf-cp0-ext",
+ "vnfr_id": "1234",
+ "vdur_id": "5678",
+ "ingress_port_index": 0,
+ "egress_port_index": 0,
+ }
+
+ result = self.ns._process_vnfgd_sfi(sfi, db_vnfrs)
+ self.assertEqual(expected_result, result)
+
+ def test_process_vnfgd_classification(self):
+ classification = {
+ "id": "rule1",
+ "ip-proto": 6,
+ "source-ip-address": "10.10.10.10",
+ "destination-ip-address": "20.20.20.20",
+ "constituent-base-element-id": "vnf",
+ "constituent-cpd-id": "vnf-cp0-ext",
+ "destination-port": 5555,
+ }
+ db_vnfrs = {
+ "1234": {
+ "id": "1234",
+ "member-vnf-index-ref": "vnf",
+ "connection-point": [
+ {
+ "name": "vnf-cp0-ext",
+ "connection-point-id": "vdu-eth0-int",
+ "connection-point-vdu-id": "5678",
+ "id": "vnf-cp0-ext",
+ }
+ ],
+ }
+ }
+
+ expected_result = {
+ "id": "rule1",
+ "ip-proto": 6,
+ "source-ip-address": "10.10.10.10",
+ "destination-ip-address": "20.20.20.20",
+ "destination-port": 5555,
+ "vnfr_id": "1234",
+ "vdur_id": "5678",
+ "ingress_port_index": 0,
+ "constituent-base-element-id": "vnf",
+ "constituent-cpd-id": "vnf-cp0-ext",
+ }
+
+ result = self.ns._process_vnfgd_classification(classification, db_vnfrs)
+ self.assertEqual(expected_result, result)
return error_value, error_text
+ def new_classification(self, name, ctype, definition):
+ self.logger.debug(
+ "Adding a new (Traffic) Classification to VIM, named %s", name
+ )
+
+ try:
+ new_class = None
+ self._reload_connection()
+
+ if ctype not in supportedClassificationTypes:
+ raise vimconn.VimConnNotSupportedException(
+ "OpenStack VIM connector does not support provided "
+ "Classification Type {}, supported ones are: {}".format(
+ ctype, supportedClassificationTypes
+ )
+ )
+
+ if not self._validate_classification(ctype, definition):
+ raise vimconn.VimConnException(
+ "Incorrect Classification definition for the type specified."
+ )
+
+ classification_dict = definition
+ classification_dict["name"] = name
+
+ self.logger.info(
+ "Adding a new (Traffic) Classification to VIM, named {} and {}.".format(
+ name, classification_dict
+ )
+ )
+ new_class = self.neutron.create_sfc_flow_classifier(
+ {"flow_classifier": classification_dict}
+ )
+
+ return new_class["flow_classifier"]["id"]
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self.logger.error("Creation of Classification failed.")
+ self._format_exception(e)
+
+ def get_classification(self, class_id):
+ self.logger.debug(" Getting Classification %s from VIM", class_id)
+ filter_dict = {"id": class_id}
+ class_list = self.get_classification_list(filter_dict)
+
+ if len(class_list) == 0:
+ raise vimconn.VimConnNotFoundException(
+ "Classification '{}' not found".format(class_id)
+ )
+ elif len(class_list) > 1:
+ raise vimconn.VimConnConflictException(
+ "Found more than one Classification with this criteria"
+ )
+
+ classification = class_list[0]
+
+ return classification
+
+ def get_classification_list(self, filter_dict={}):
+ self.logger.debug(
+ "Getting Classifications from VIM filter: '%s'", str(filter_dict)
+ )
+
+ try:
+ filter_dict_os = filter_dict.copy()
+ self._reload_connection()
+
+ if self.api_version3 and "tenant_id" in filter_dict_os:
+ filter_dict_os["project_id"] = filter_dict_os.pop("tenant_id")
+
+ classification_dict = self.neutron.list_sfc_flow_classifiers(
+ **filter_dict_os
+ )
+ classification_list = classification_dict["flow_classifiers"]
+ self.__classification_os2mano(classification_list)
+
+ return classification_list
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def delete_classification(self, class_id):
+ self.logger.debug("Deleting Classification '%s' from VIM", class_id)
+
+ try:
+ self._reload_connection()
+ self.neutron.delete_sfc_flow_classifier(class_id)
+
+ return class_id
+ except (
+ neExceptions.ConnectionFailed,
+ neExceptions.NeutronException,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def new_sfi(self, name, ingress_ports, egress_ports, sfc_encap=True):
+ self.logger.debug(
+ "Adding a new Service Function Instance to VIM, named '%s'", name
+ )
+
+ try:
+ new_sfi = None
+ self._reload_connection()
+ correlation = None
+
+ if sfc_encap:
+ correlation = "nsh"
+
+ if len(ingress_ports) != 1:
+ raise vimconn.VimConnNotSupportedException(
+ "OpenStack VIM connector can only have 1 ingress port per SFI"
+ )
+
+ if len(egress_ports) != 1:
+ raise vimconn.VimConnNotSupportedException(
+ "OpenStack VIM connector can only have 1 egress port per SFI"
+ )
+
+ sfi_dict = {
+ "name": name,
+ "ingress": ingress_ports[0],
+ "egress": egress_ports[0],
+ "service_function_parameters": {"correlation": correlation},
+ }
+ self.logger.info("Adding a new SFI to VIM, {}.".format(sfi_dict))
+ new_sfi = self.neutron.create_sfc_port_pair({"port_pair": sfi_dict})
+
+ return new_sfi["port_pair"]["id"]
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ if new_sfi:
+ try:
+ self.neutron.delete_sfc_port_pair(new_sfi["port_pair"]["id"])
+ except Exception:
+ self.logger.error(
+ "Creation of Service Function Instance failed, with "
+ "subsequent deletion failure as well."
+ )
+
+ self._format_exception(e)
+
+ def get_sfi(self, sfi_id):
+ self.logger.debug("Getting Service Function Instance %s from VIM", sfi_id)
+ filter_dict = {"id": sfi_id}
+ sfi_list = self.get_sfi_list(filter_dict)
+
+ if len(sfi_list) == 0:
+ raise vimconn.VimConnNotFoundException(
+ "Service Function Instance '{}' not found".format(sfi_id)
+ )
+ elif len(sfi_list) > 1:
+ raise vimconn.VimConnConflictException(
+ "Found more than one Service Function Instance with this criteria"
+ )
+
+ sfi = sfi_list[0]
+
+ return sfi
+
+ def get_sfi_list(self, filter_dict={}):
+ self.logger.debug(
+ "Getting Service Function Instances from VIM filter: '%s'", str(filter_dict)
+ )
+
+ try:
+ self._reload_connection()
+ filter_dict_os = filter_dict.copy()
+
+ if self.api_version3 and "tenant_id" in filter_dict_os:
+ filter_dict_os["project_id"] = filter_dict_os.pop("tenant_id")
+
+ sfi_dict = self.neutron.list_sfc_port_pairs(**filter_dict_os)
+ sfi_list = sfi_dict["port_pairs"]
+ self.__sfi_os2mano(sfi_list)
+
+ return sfi_list
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def delete_sfi(self, sfi_id):
+ self.logger.debug("Deleting Service Function Instance '%s' from VIM", sfi_id)
+
+ try:
+ self._reload_connection()
+ self.neutron.delete_sfc_port_pair(sfi_id)
+
+ return sfi_id
+ except (
+ neExceptions.ConnectionFailed,
+ neExceptions.NeutronException,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def new_sf(self, name, sfis, sfc_encap=True):
+ self.logger.debug("Adding a new Service Function to VIM, named '%s'", name)
+
+ new_sf = None
+
+ try:
+ self._reload_connection()
+
+ for instance in sfis:
+ sfi = self.get_sfi(instance)
+
+ if sfi.get("sfc_encap") != sfc_encap:
+ raise vimconn.VimConnNotSupportedException(
+ "OpenStack VIM connector requires all SFIs of the "
+ "same SF to share the same SFC Encapsulation"
+ )
+
+ sf_dict = {"name": name, "port_pairs": sfis}
+
+ self.logger.info("Adding a new SF to VIM, {}.".format(sf_dict))
+ new_sf = self.neutron.create_sfc_port_pair_group(
+ {"port_pair_group": sf_dict}
+ )
+
+ return new_sf["port_pair_group"]["id"]
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ if new_sf:
+ try:
+ new_sf_id = new_sf.get("port_pair_group").get("id")
+ self.neutron.delete_sfc_port_pair_group(new_sf_id)
+ except Exception:
+ self.logger.error(
+ "Creation of Service Function failed, with "
+ "subsequent deletion failure as well."
+ )
+
+ self._format_exception(e)
+
+ def get_sf(self, sf_id):
+ self.logger.debug("Getting Service Function %s from VIM", sf_id)
+ filter_dict = {"id": sf_id}
+ sf_list = self.get_sf_list(filter_dict)
+
+ if len(sf_list) == 0:
+ raise vimconn.VimConnNotFoundException(
+ "Service Function '{}' not found".format(sf_id)
+ )
+ elif len(sf_list) > 1:
+ raise vimconn.VimConnConflictException(
+ "Found more than one Service Function with this criteria"
+ )
+
+ sf = sf_list[0]
+
+ return sf
+
+ def get_sf_list(self, filter_dict={}):
+ self.logger.debug(
+ "Getting Service Function from VIM filter: '%s'", str(filter_dict)
+ )
+
+ try:
+ self._reload_connection()
+ filter_dict_os = filter_dict.copy()
+
+ if self.api_version3 and "tenant_id" in filter_dict_os:
+ filter_dict_os["project_id"] = filter_dict_os.pop("tenant_id")
+
+ sf_dict = self.neutron.list_sfc_port_pair_groups(**filter_dict_os)
+ sf_list = sf_dict["port_pair_groups"]
+ self.__sf_os2mano(sf_list)
+
+ return sf_list
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def delete_sf(self, sf_id):
+ self.logger.debug("Deleting Service Function '%s' from VIM", sf_id)
+
+ try:
+ self._reload_connection()
+ self.neutron.delete_sfc_port_pair_group(sf_id)
+
+ return sf_id
+ except (
+ neExceptions.ConnectionFailed,
+ neExceptions.NeutronException,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def new_sfp(self, name, classifications, sfs, sfc_encap=True, spi=None):
+ self.logger.debug("Adding a new Service Function Path to VIM, named '%s'", name)
+
+ new_sfp = None
+
+ try:
+ self._reload_connection()
+ # In networking-sfc the MPLS encapsulation is legacy
+ # should be used when no full SFC Encapsulation is intended
+ correlation = "mpls"
+
+ if sfc_encap:
+ correlation = "nsh"
+
+ sfp_dict = {
+ "name": name,
+ "flow_classifiers": classifications,
+ "port_pair_groups": sfs,
+ "chain_parameters": {"correlation": correlation},
+ }
+
+ if spi:
+ sfp_dict["chain_id"] = spi
+
+ self.logger.info("Adding a new SFP to VIM, {}.".format(sfp_dict))
+ new_sfp = self.neutron.create_sfc_port_chain({"port_chain": sfp_dict})
+
+ return new_sfp["port_chain"]["id"]
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ if new_sfp:
+ try:
+ new_sfp_id = new_sfp.get("port_chain").get("id")
+ self.neutron.delete_sfc_port_chain(new_sfp_id)
+ except Exception:
+ self.logger.error(
+ "Creation of Service Function Path failed, with "
+ "subsequent deletion failure as well."
+ )
+
+ self._format_exception(e)
+
+ def get_sfp(self, sfp_id):
+ self.logger.debug(" Getting Service Function Path %s from VIM", sfp_id)
+
+ filter_dict = {"id": sfp_id}
+ sfp_list = self.get_sfp_list(filter_dict)
+
+ if len(sfp_list) == 0:
+ raise vimconn.VimConnNotFoundException(
+ "Service Function Path '{}' not found".format(sfp_id)
+ )
+ elif len(sfp_list) > 1:
+ raise vimconn.VimConnConflictException(
+ "Found more than one Service Function Path with this criteria"
+ )
+
+ sfp = sfp_list[0]
+
+ return sfp
+
+ def get_sfp_list(self, filter_dict={}):
+ self.logger.debug(
+ "Getting Service Function Paths from VIM filter: '%s'", str(filter_dict)
+ )
+
+ try:
+ self._reload_connection()
+ filter_dict_os = filter_dict.copy()
+
+ if self.api_version3 and "tenant_id" in filter_dict_os:
+ filter_dict_os["project_id"] = filter_dict_os.pop("tenant_id")
+
+ sfp_dict = self.neutron.list_sfc_port_chains(**filter_dict_os)
+ sfp_list = sfp_dict["port_chains"]
+ self.__sfp_os2mano(sfp_list)
+
+ return sfp_list
+ except (
+ neExceptions.ConnectionFailed,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def delete_sfp(self, sfp_id):
+ self.logger.debug("Deleting Service Function Path '%s' from VIM", sfp_id)
+
+ try:
+ self._reload_connection()
+ self.neutron.delete_sfc_port_chain(sfp_id)
+
+ return sfp_id
+ except (
+ neExceptions.ConnectionFailed,
+ neExceptions.NeutronException,
+ ksExceptions.ClientException,
+ neExceptions.NeutronException,
+ ConnectionError,
+ ) as e:
+ self._format_exception(e)
+
+ def refresh_sfps_status(self, sfp_list):
+ """Get the status of the service function path
+ Params: the list of sfp identifiers
+ Returns a dictionary with:
+ vm_id: #VIM id of this service function path
+ status: #Mandatory. Text with one of:
+ # DELETED (not found at vim)
+ # VIM_ERROR (Cannot connect to VIM, VIM response error, ...)
+ # OTHER (Vim reported other status not understood)
+ # ERROR (VIM indicates an ERROR status)
+ # ACTIVE,
+ # CREATING (on building process)
+ error_msg: #Text with VIM error message, if any. Or the VIM connection ERROR
+ vim_info: #Text with plain information obtained from vim (yaml.safe_dump)F
+ """
+ sfp_dict = {}
+ self.logger.debug(
+ "refresh_sfps status: Getting tenant SFP information from VIM"
+ )
+
+ for sfp_id in sfp_list:
+ sfp = {}
+
+ try:
+ sfp_vim = self.get_sfp(sfp_id)
+
+ if sfp_vim["spi"]:
+ sfp["status"] = vmStatus2manoFormat["ACTIVE"]
+ else:
+ sfp["status"] = "OTHER"
+ sfp["error_msg"] = "VIM status reported " + sfp["status"]
+
+ sfp["vim_info"] = self.serialize(sfp_vim)
+
+ if sfp_vim.get("fault"):
+ sfp["error_msg"] = str(sfp_vim["fault"])
+ except vimconn.VimConnNotFoundException as e:
+ self.logger.error("Exception getting sfp status: %s", str(e))
+ sfp["status"] = "DELETED"
+ sfp["error_msg"] = str(e)
+ except vimconn.VimConnException as e:
+ self.logger.error("Exception getting sfp status: %s", str(e))
+ sfp["status"] = "VIM_ERROR"
+ sfp["error_msg"] = str(e)
+
+ sfp_dict[sfp_id] = sfp
+
+ return sfp_dict
+
+ def refresh_sfis_status(self, sfi_list):
+ """Get the status of the service function instances
+ Params: the list of sfi identifiers
+ Returns a dictionary with:
+ vm_id: #VIM id of this service function instance
+ status: #Mandatory. Text with one of:
+ # DELETED (not found at vim)
+ # VIM_ERROR (Cannot connect to VIM, VIM response error, ...)
+ # OTHER (Vim reported other status not understood)
+ # ERROR (VIM indicates an ERROR status)
+ # ACTIVE,
+ # CREATING (on building process)
+ error_msg: #Text with VIM error message, if any. Or the VIM connection ERROR
+ vim_info: #Text with plain information obtained from vim (yaml.safe_dump)
+ """
+ sfi_dict = {}
+ self.logger.debug(
+ "refresh_sfis status: Getting tenant sfi information from VIM"
+ )
+
+ for sfi_id in sfi_list:
+ sfi = {}
+
+ try:
+ sfi_vim = self.get_sfi(sfi_id)
+
+ if sfi_vim:
+ sfi["status"] = vmStatus2manoFormat["ACTIVE"]
+ else:
+ sfi["status"] = "OTHER"
+ sfi["error_msg"] = "VIM status reported " + sfi["status"]
+
+ sfi["vim_info"] = self.serialize(sfi_vim)
+
+ if sfi_vim.get("fault"):
+ sfi["error_msg"] = str(sfi_vim["fault"])
+ except vimconn.VimConnNotFoundException as e:
+ self.logger.error("Exception getting sfi status: %s", str(e))
+ sfi["status"] = "DELETED"
+ sfi["error_msg"] = str(e)
+ except vimconn.VimConnException as e:
+ self.logger.error("Exception getting sfi status: %s", str(e))
+ sfi["status"] = "VIM_ERROR"
+ sfi["error_msg"] = str(e)
+
+ sfi_dict[sfi_id] = sfi
+
+ return sfi_dict
+
+ def refresh_sfs_status(self, sf_list):
+ """Get the status of the service functions
+ Params: the list of sf identifiers
+ Returns a dictionary with:
+ vm_id: #VIM id of this service function
+ status: #Mandatory. Text with one of:
+ # DELETED (not found at vim)
+ # VIM_ERROR (Cannot connect to VIM, VIM response error, ...)
+ # OTHER (Vim reported other status not understood)
+ # ERROR (VIM indicates an ERROR status)
+ # ACTIVE,
+ # CREATING (on building process)
+ error_msg: #Text with VIM error message, if any. Or the VIM connection ERROR
+ vim_info: #Text with plain information obtained from vim (yaml.safe_dump)
+ """
+ sf_dict = {}
+ self.logger.debug("refresh_sfs status: Getting tenant sf information from VIM")
+
+ for sf_id in sf_list:
+ sf = {}
+
+ try:
+ sf_vim = self.get_sf(sf_id)
+
+ if sf_vim:
+ sf["status"] = vmStatus2manoFormat["ACTIVE"]
+ else:
+ sf["status"] = "OTHER"
+ sf["error_msg"] = "VIM status reported " + sf_vim["status"]
+
+ sf["vim_info"] = self.serialize(sf_vim)
+
+ if sf_vim.get("fault"):
+ sf["error_msg"] = str(sf_vim["fault"])
+ except vimconn.VimConnNotFoundException as e:
+ self.logger.error("Exception getting sf status: %s", str(e))
+ sf["status"] = "DELETED"
+ sf["error_msg"] = str(e)
+ except vimconn.VimConnException as e:
+ self.logger.error("Exception getting sf status: %s", str(e))
+ sf["status"] = "VIM_ERROR"
+ sf["error_msg"] = str(e)
+
+ sf_dict[sf_id] = sf
+
+ return sf_dict
+
+ def refresh_classifications_status(self, classification_list):
+ """Get the status of the classifications
+ Params: the list of classification identifiers
+ Returns a dictionary with:
+ vm_id: #VIM id of this classifier
+ status: #Mandatory. Text with one of:
+ # DELETED (not found at vim)
+ # VIM_ERROR (Cannot connect to VIM, VIM response error, ...)
+ # OTHER (Vim reported other status not understood)
+ # ERROR (VIM indicates an ERROR status)
+ # ACTIVE,
+ # CREATING (on building process)
+ error_msg: #Text with VIM error message, if any. Or the VIM connection ERROR
+ vim_info: #Text with plain information obtained from vim (yaml.safe_dump)
+ """
+ classification_dict = {}
+ self.logger.debug(
+ "refresh_classifications status: Getting tenant classification information from VIM"
+ )
+
+ for classification_id in classification_list:
+ classification = {}
+
+ try:
+ classification_vim = self.get_classification(classification_id)
+
+ if classification_vim:
+ classification["status"] = vmStatus2manoFormat["ACTIVE"]
+ else:
+ classification["status"] = "OTHER"
+ classification["error_msg"] = (
+ "VIM status reported " + classification["status"]
+ )
+
+ classification["vim_info"] = self.serialize(classification_vim)
+
+ if classification_vim.get("fault"):
+ classification["error_msg"] = str(classification_vim["fault"])
+ except vimconn.VimConnNotFoundException as e:
+ self.logger.error("Exception getting classification status: %s", str(e))
+ classification["status"] = "DELETED"
+ classification["error_msg"] = str(e)
+ except vimconn.VimConnException as e:
+ self.logger.error("Exception getting classification status: %s", str(e))
+ classification["status"] = "VIM_ERROR"
+ classification["error_msg"] = str(e)
+
+ classification_dict[classification_id] = classification
+
+ return classification_dict
+
@catch_any_exception
def new_affinity_group(self, affinity_group_data):
"""Adds a server group to VIM
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+---
+features:
+ - |
+ Feature 10980: Service Function Chaining
+ This feature will add the functionality to create the service function chaining in the OpenStack from OSM.