import copy
import os
import shutil
+import functools
# import logging
+from deepdiff import DeepDiff
from hashlib import md5
from osm_common.dbbase import DbException, deep_update_rfc7396
from http import HTTPStatus
return super().sol005_projection(data)
+ @staticmethod
+ def find_software_version(vnfd: dict) -> str:
+ """Find the sotware version in the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ software-version (str)
+ """
+ default_sw_version = "1.0"
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ if vnfd.get("software-version"):
+ return vnfd["software-version"]
+ else:
+ return default_sw_version
+
+ @staticmethod
+ def extract_policies(vnfd: dict) -> dict:
+ """Removes the policies from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): VNFD which does not include policies
+ """
+ # TODO: Extract the policy related parts from the VNFD
+ return vnfd
+
+ @staticmethod
+ def extract_day12_primitives(vnfd: dict) -> dict:
+ """Removes the day12 primitives from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict)
+ """
+ for df_id, df in enumerate(vnfd.get("df", {})):
+ if (
+ df.get("lcm-operations-configuration", {})
+ .get("operate-vnf-op-config", {})
+ .get("day1-2")
+ ):
+ day12 = df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2"
+ )
+ for config_id, config in enumerate(day12):
+ for key in [
+ "initial-config-primitive",
+ "config-primitive",
+ "terminate-config-primitive",
+ ]:
+ config.pop(key, None)
+ day12[config_id] = config
+ df["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"
+ ] = day12
+ vnfd["df"][df_id] = df
+ return vnfd
+
+ def remove_modifiable_items(self, vnfd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from VNFD
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): Descriptor which does not include modifiable contents
+ """
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ vnfd.pop("_admin", None)
+ # If the other extractions need to be done from VNFD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives, self.extract_policies]:
+ vnfd_temp = extract_function(vnfd)
+ vnfd = vnfd_temp
+ return vnfd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new VNFD descriptors and validates the new descriptor.
+
+ Args:
+ old_descriptor_directory (str): Directory of descriptor which is in-use
+ new_descriptor_directory (str): Directory of directory which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error when there are unallowed changes
+ """
+ try:
+ with self.fs.file_open(
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as old_descriptor_file:
+ with self.fs.file_open(
+ (new_descriptor_directory, descriptor_file_name), "r"
+ ) as new_descriptor_file:
+ old_content = yaml.load(
+ old_descriptor_file.read(), Loader=yaml.SafeLoader
+ )
+ new_content = yaml.load(
+ new_descriptor_file.read(), Loader=yaml.SafeLoader
+ )
+ if old_content and new_content:
+ if self.find_software_version(
+ old_content
+ ) != self.find_software_version(new_content):
+ return
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + " , " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the vnf descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "VNF Descriptor could not be processed with error: {}.".format(e)
+ )
+
class NsdTopic(DescriptorTopic):
topic = "nsds"
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
+ @staticmethod
+ def extract_day12_primitives(nsd: dict) -> dict:
+ """Removes the day12 primitives from the NSD descriptors
+
+ Args:
+ nsd (dict): Descriptor as a dictionary
+
+ Returns:
+ nsd (dict): Cleared NSD
+ """
+ if nsd.get("ns-configuration"):
+ for key in [
+ "config-primitive",
+ "initial-config-primitive",
+ "terminate-config-primitive",
+ ]:
+ nsd["ns-configuration"].pop(key, None)
+ return nsd
+
+ def remove_modifiable_items(self, nsd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from NSD
+
+ Args:
+ nsd (dict): Descriptor as a dictionary
+
+ Returns:
+ nsd (dict): Descriptor which does not include modifiable contents
+ """
+ while isinstance(nsd, dict) and nsd.get("nsd"):
+ nsd = nsd["nsd"]
+ if isinstance(nsd, list):
+ nsd = nsd[0]
+ nsd.pop("_admin", None)
+ # If the more extractions need to be done from NSD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives]:
+ nsd_temp = extract_function(nsd)
+ nsd = nsd_temp
+ return nsd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new NSD descriptors and validates the new descriptor
+
+ Args:
+ old_descriptor_directory: Directory of descriptor which is in-use
+ new_descriptor_directory: Directory of directory which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error if the changes are not allowed
+ """
+
+ try:
+ with self.fs.file_open(
+ (old_descriptor_directory, descriptor_file_name), "r"
+ ) as old_descriptor_file:
+ with self.fs.file_open(
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as new_descriptor_file:
+ old_content = yaml.load(
+ old_descriptor_file.read(), Loader=yaml.SafeLoader
+ )
+ new_content = yaml.load(
+ new_descriptor_file.read(), Loader=yaml.SafeLoader
+ )
+ if old_content and new_content:
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + ", " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the ns descriptor. ",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "NS Descriptor could not be processed with error: {}.".format(e)
+ )
+
def sol005_projection(self, data):
data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
data["nsdOperationalState"] = data["_admin"]["operationalState"]
__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
__date__ = "2019-11-20"
+from contextlib import contextmanager
import unittest
from unittest import TestCase
from unittest.mock import Mock, patch
self.topic = VnfdTopic(self.db, self.fs, self.msg, self.auth)
self.topic.check_quota = Mock(return_value=None) # skip quota
+ @contextmanager
+ def assertNotRaises(self, exception_type):
+ try:
+ yield None
+ except exception_type:
+ raise self.failureException("{} raised".format(exception_type.__name__))
+
+ def create_desc_temp(self, template):
+ old_desc = deepcopy(template)
+ new_desc = deepcopy(template)
+ return old_desc, new_desc
+
@patch("osm_nbi.descriptor_topics.shutil")
@patch("osm_nbi.descriptor_topics.os.rename")
def test_new_vnfd(self, mock_rename, mock_shutil):
self.topic.upload_content(
fake_session, did, test_vnfd, {}, {"Content-Type": []}
)
- print(str(e.exception))
self.assertEqual(
e.exception.http_code, HTTPStatus.BAD_REQUEST, "Wrong HTTP status code"
)
self.fs.file_delete.assert_not_called()
return
+ @patch("osm_nbi.descriptor_topics.yaml")
+ def test_validate_descriptor_changes(self, mock_yaml):
+ descriptor_name = "test_descriptor"
+ self.fs.file_open.side_effect = lambda path, mode: open(
+ "/tmp/" + str(uuid4()), "a+b"
+ )
+ with self.subTest(i=1, t="VNFD has changes in day1-2 config primitive"):
+ old_vnfd, new_vnfd = self.create_desc_temp(db_vnfd_content)
+ new_vnfd["df"][0]["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"
+ ][0]["config-primitive"][0]["name"] = "new_action"
+ mock_yaml.load.side_effect = [old_vnfd, new_vnfd]
+ with self.assertNotRaises(EngineException):
+ self.topic._validate_descriptor_changes(
+ descriptor_name, "/tmp/", "/tmp:1/"
+ )
+ with self.subTest(i=2, t="VNFD sw version changed"):
+ # old vnfd uses the default software version: 1.0
+ new_vnfd["software-version"] = "1.3"
+ new_vnfd["sw-image-desc"][0]["name"] = "new-image"
+ mock_yaml.load.side_effect = [old_vnfd, new_vnfd]
+ with self.assertNotRaises(EngineException):
+ self.topic._validate_descriptor_changes(
+ descriptor_name, "/tmp/", "/tmp:1/"
+ )
+ with self.subTest(
+ i=3, t="VNFD sw version is not changed and mgmt-cp has changed"
+ ):
+ old_vnfd, new_vnfd = self.create_desc_temp(db_vnfd_content)
+ new_vnfd["mgmt-cp"] = "new-mgmt-cp"
+ mock_yaml.load.side_effect = [old_vnfd, new_vnfd]
+ with self.assertRaises(EngineException) as e:
+ self.topic._validate_descriptor_changes(
+ descriptor_name, "/tmp/", "/tmp:1/"
+ )
+ self.assertEqual(
+ e.exception.http_code,
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ "Wrong HTTP status code",
+ )
+ self.assertIn(
+ norm("there are disallowed changes in the vnf descriptor"),
+ norm(str(e.exception)),
+ "Wrong exception text",
+ )
+
def test_validate_mgmt_interface_connection_point_on_valid_descriptor(self):
indata = deepcopy(db_vnfd_content)
self.topic.validate_mgmt_interface_connection_point(indata)
self.topic = NsdTopic(self.db, self.fs, self.msg, self.auth)
self.topic.check_quota = Mock(return_value=None) # skip quota
+ @contextmanager
+ def assertNotRaises(self, exception_type):
+ try:
+ yield None
+ except exception_type:
+ raise self.failureException("{} raised".format(exception_type.__name__))
+
+ def create_desc_temp(self, template):
+ old_desc = deepcopy(template)
+ new_desc = deepcopy(template)
+ return old_desc, new_desc
+
@patch("osm_nbi.descriptor_topics.shutil")
@patch("osm_nbi.descriptor_topics.os.rename")
def test_new_nsd(self, mock_rename, mock_shutil):
self.fs.file_delete.assert_not_called()
return
+ @patch("osm_nbi.descriptor_topics.yaml")
+ def test_validate_descriptor_changes(self, mock_yaml):
+ descriptor_name = "test_ns_descriptor"
+ self.fs.file_open.side_effect = lambda path, mode: open(
+ "/tmp/" + str(uuid4()), "a+b"
+ )
+ with self.subTest(
+ i=1, t="NSD has changes in ns-configuration:config-primitive"
+ ):
+ old_nsd, new_nsd = self.create_desc_temp(db_nsd_content)
+ old_nsd.update(
+ {"ns-configuration": {"config-primitive": [{"name": "add-user"}]}}
+ )
+ new_nsd.update(
+ {"ns-configuration": {"config-primitive": [{"name": "del-user"}]}}
+ )
+ mock_yaml.load.side_effect = [old_nsd, new_nsd]
+ with self.assertNotRaises(EngineException):
+ self.topic._validate_descriptor_changes(
+ descriptor_name, "/tmp", "/tmp:1"
+ )
+ with self.subTest(i=2, t="NSD name has changed"):
+ old_nsd, new_nsd = self.create_desc_temp(db_nsd_content)
+ new_nsd["name"] = "nscharm-ns2"
+ mock_yaml.load.side_effect = [old_nsd, new_nsd]
+ with self.assertRaises(EngineException) as e:
+ self.topic._validate_descriptor_changes(
+ descriptor_name, "/tmp", "/tmp:1"
+ )
+ self.assertEqual(
+ e.exception.http_code,
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ "Wrong HTTP status code",
+ )
+ self.assertIn(
+ norm("there are disallowed changes in the ns descriptor"),
+ norm(str(e.exception)),
+ "Wrong exception text",
+ )
+
def test_validate_vld_mgmt_network_with_virtual_link_protocol_data_on_valid_descriptor(
self,
):