+ @staticmethod
+ def validate_internal_virtual_links(indata):
+ all_ivld_ids = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ ivld_id = ivld.get("id")
+ if ivld_id and ivld_id in all_ivld_ids:
+ raise EngineException(
+ "Duplicated VLD id in int-virtual-link-desc[id={}]".format(ivld_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_ivld_ids.add(ivld_id)
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for int_cpd in get_iterable(vdu.get("int-cpd")):
+ int_cpd_ivld_id = int_cpd.get("int-virtual-link-desc")
+ if int_cpd_ivld_id and int_cpd_ivld_id not in all_ivld_ids:
+ raise EngineException(
+ "vdu[id='{}']:int-cpd[id='{}']:int-virtual-link-desc='{}' must match an existing "
+ "int-virtual-link-desc".format(
+ vdu["id"], int_cpd["id"], int_cpd_ivld_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for df in get_iterable(indata.get("df")):
+ for vlp in get_iterable(df.get("virtual-link-profile")):
+ vlp_ivld_id = vlp.get("id")
+ if vlp_ivld_id and vlp_ivld_id not in all_ivld_ids:
+ raise EngineException(
+ "df[id='{}']:virtual-link-profile='{}' must match an existing "
+ "int-virtual-link-desc".format(df["id"], vlp_ivld_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_monitoring_params(indata):
+ all_monitoring_params = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ for mp in get_iterable(ivld.get("monitoring-parameters")):
+ mp_id = mp.get("id")
+ if mp_id and mp_id in all_monitoring_params:
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "int-virtual-link-desc[id='{}']:monitoring-parameters[id='{}']".format(
+ ivld["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_monitoring_params.add(mp_id)
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for mp in get_iterable(vdu.get("monitoring-parameter")):
+ mp_id = mp.get("id")
+ if mp_id and mp_id in all_monitoring_params:
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "vdu[id='{}']:monitoring-parameter[id='{}']".format(
+ vdu["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_monitoring_params.add(mp_id)
+
+ for df in get_iterable(indata.get("df")):
+ for mp in get_iterable(df.get("monitoring-parameter")):
+ mp_id = mp.get("id")
+ if mp_id and mp_id in all_monitoring_params:
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "df[id='{}']:monitoring-parameter[id='{}']".format(
+ df["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_monitoring_params.add(mp_id)
+
+ @staticmethod
+ def validate_scaling_group_descriptor(indata):
+ all_monitoring_params = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ for mp in get_iterable(ivld.get("monitoring-parameters")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for mp in get_iterable(vdu.get("monitoring-parameter")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for df in get_iterable(indata.get("df")):
+ for mp in get_iterable(df.get("monitoring-parameter")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for df in get_iterable(indata.get("df")):
+ for sa in get_iterable(df.get("scaling-aspect")):
+ for sp in get_iterable(sa.get("scaling-policy")):
+ for sc in get_iterable(sp.get("scaling-criteria")):
+ sc_monitoring_param = sc.get("vnf-monitoring-param-ref")
+ if (
+ sc_monitoring_param
+ and sc_monitoring_param not in all_monitoring_params
+ ):
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:scaling-policy"
+ "[name='{}']:scaling-criteria[name='{}']: "
+ "vnf-monitoring-param-ref='{}' not defined in any monitoring-param".format(
+ df["id"],
+ sa["id"],
+ sp["name"],
+ sc["name"],
+ sc_monitoring_param,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for sca in get_iterable(sa.get("scaling-config-action")):
+ if (
+ "lcm-operations-configuration" not in df
+ or "operate-vnf-op-config"
+ not in df["lcm-operations-configuration"]
+ or not utils.find_in_list(
+ df["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ].get("day1-2", []),
+ lambda config: config["id"] == indata["id"],
+ )
+ ):
+ raise EngineException(
+ "'day1-2 configuration' not defined in the descriptor but it is "
+ "referenced by df[id='{}']:scaling-aspect[id='{}']:scaling-config-action".format(
+ df["id"], sa["id"]
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ for configuration in get_iterable(
+ df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2", []
+ )
+ ):
+ for primitive in get_iterable(
+ configuration.get("config-primitive")
+ ):
+ if (
+ primitive["name"]
+ == sca["vnf-config-primitive-name-ref"]
+ ):
+ break
+ else:
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:scaling-config-action:vnf-"
+ "config-primitive-name-ref='{}' does not match any "
+ "day1-2 configuration:config-primitive:name".format(
+ df["id"],
+ sa["id"],
+ sca["vnf-config-primitive-name-ref"],
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
+ """
+ Deletes associate file system storage (via super)
+ Deletes associated vnfpkgops from database.
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: server internal id
+ :param db_content: The database content of the descriptor
+ :return: None
+ :raises: FsException in case of error while deleting associated storage
+ """
+ super().delete_extra(session, _id, db_content, not_send_msg)
+ self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
+
+ def sol005_projection(self, data):
+ data["onboardingState"] = data["_admin"]["onboardingState"]
+ data["operationalState"] = data["_admin"]["operationalState"]
+ data["usageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/vnfpkgm/v1/vnf_packages/{}".format(data["_id"])}
+ links["vnfd"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(data["_id"])}
+ links["packageContent"] = {
+ "href": "/vnfpkgm/v1/vnf_packages/{}/package_content".format(data["_id"])
+ }
+ data["_links"] = links
+
+ return super().sol005_projection(data)
+
+ @staticmethod
+ def find_software_version(vnfd: dict) -> str:
+ """Find the sotware version in the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ software-version (str)
+ """
+ default_sw_version = "1.0"
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ if vnfd.get("software-version"):
+ return vnfd["software-version"]
+ else:
+ return default_sw_version
+
+ @staticmethod
+ def extract_policies(vnfd: dict) -> dict:
+ """Removes the policies from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): VNFD which does not include policies
+ """
+ for df in vnfd.get("df", {}):
+ for policy in ["scaling-aspect", "healing-aspect"]:
+ if df.get(policy, {}):
+ df.pop(policy)
+ for vdu in vnfd.get("vdu", {}):
+ for alarm_policy in ["alarm", "monitoring-parameter"]:
+ if vdu.get(alarm_policy, {}):
+ vdu.pop(alarm_policy)
+ return vnfd
+
+ @staticmethod
+ def extract_day12_primitives(vnfd: dict) -> dict:
+ """Removes the day12 primitives from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict)
+ """
+ for df_id, df in enumerate(vnfd.get("df", {})):
+ if (
+ df.get("lcm-operations-configuration", {})
+ .get("operate-vnf-op-config", {})
+ .get("day1-2")
+ ):
+ day12 = df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2"
+ )
+ for config_id, config in enumerate(day12):
+ for key in [
+ "initial-config-primitive",
+ "config-primitive",
+ "terminate-config-primitive",
+ ]:
+ config.pop(key, None)
+ day12[config_id] = config
+ df["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"
+ ] = day12
+ vnfd["df"][df_id] = df
+ return vnfd
+
+ def remove_modifiable_items(self, vnfd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from VNFD
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): Descriptor which does not include modifiable contents
+ """
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ vnfd.pop("_admin", None)
+ # If the other extractions need to be done from VNFD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives, self.extract_policies]:
+ vnfd_temp = extract_function(vnfd)
+ vnfd = vnfd_temp
+ return vnfd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id: str,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new VNFD descriptors and validates the new descriptor.
+
+ Args:
+ old_descriptor_directory (str): Directory of descriptor which is in-use
+ new_descriptor_directory (str): Directory of descriptor which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error when there are unallowed changes
+ """
+ try:
+ # If VNFD does not exist in DB or it is not in use by any NS,
+ # validation is not required.
+ vnfd = self.db.get_one("vnfds", {"_id": descriptor_id})
+ if not vnfd or not detect_descriptor_usage(vnfd, "vnfds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
+ with self.fs.file_open(
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as old_descriptor_file:
+ with self.fs.file_open(
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as new_descriptor_file:
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
+ # If software version has changed, we do not need to validate
+ # the differences anymore.
+ if old_content and new_content:
+ if self.find_software_version(
+ old_content
+ ) != self.find_software_version(new_content):
+ return
+
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + " , " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the vnf descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "VNF Descriptor could not be processed with error: {}.".format(e)
+ )
+