import tarfile
import yaml
import json
-import importlib
import copy
+import os
+import shutil
+import functools
# import logging
+from deepdiff import DeepDiff
from hashlib import md5
from osm_common.dbbase import DbException, deep_update_rfc7396
from http import HTTPStatus
from time import time
from uuid import uuid4
from re import fullmatch
+from zipfile import ZipFile
from osm_nbi.validation import (
ValidationError,
pdu_new_schema,
validate_input,
vnfpkgop_new_schema,
)
-from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable
+from osm_nbi.base_topic import (
+ BaseTopic,
+ EngineException,
+ get_iterable,
+ detect_descriptor_usage,
+)
from osm_im import etsi_nfv_vnfd, etsi_nfv_nsd
from osm_im.nst import nst as nst_im
from pyangbind.lib.serialise import pybindJSONDecoder
class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
+
BaseTopic.__init__(self, db, fs, msg, auth)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
"""
self.fs.file_delete(_id, ignore_non_exist=True)
self.fs.file_delete(_id + "_", ignore_non_exist=True) # remove temp folder
+ # Remove file revisions
+ if "revision" in db_content["_admin"]:
+ revision = db_content["_admin"]["revision"]
+ while revision > 0:
+ self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
+ revision = revision - 1
+
@staticmethod
def get_one_by_id(db, session, topic, id):
# Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
- content = {"_admin": {"userDefinedData": indata}}
+ content = {"_admin": {
+ "userDefinedData": indata,
+ "revision": 0
+ }}
+
self.format_on_new(
content, session["project_id"], make_public=session["public"]
)
content_type
and "application/gzip" in content_type
or "application/x-gzip" in content_type
- or "application/zip" in content_type
):
compressed = "gzip"
+ if (
+ content_type
+ and "application/zip" in content_type
+ ):
+ compressed = "zip"
filename = headers.get("Content-Filename")
- if not filename:
- filename = "package.tar.gz" if compressed else "package"
+ if not filename and compressed:
+ filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
+ elif not filename:
+ filename = "package"
+
+ revision = 1
+ if "revision" in current_desc["_admin"]:
+ revision = current_desc["_admin"]["revision"] + 1
+
# TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
file_pkg = None
error_text = ""
+ fs_rollback = []
+
try:
if content_range_text:
content_range = (
total = int(content_range[3])
else:
start = 0
- temp_folder = (
- _id + "_"
+ # Rather than using a temp folder, we will store the package in a folder based on
+ # the current revision.
+ proposed_revision_path = (
+ _id + ":" + str(revision)
) # all the content is upload here and if ok, it is rename from id_ to is folder
if start:
- if not self.fs.file_exists(temp_folder, "dir"):
+ if not self.fs.file_exists(proposed_revision_path, "dir"):
raise EngineException(
"invalid Transaction-Id header", HTTPStatus.NOT_FOUND
)
else:
- self.fs.file_delete(temp_folder, ignore_non_exist=True)
- self.fs.mkdir(temp_folder)
+ self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(proposed_revision_path)
+ fs_rollback.append(proposed_revision_path)
storage = self.fs.get_params()
- storage["folder"] = _id
+ storage["folder"] = proposed_revision_path
- file_path = (temp_folder, filename)
+ file_path = (proposed_revision_path, filename)
if self.fs.file_exists(file_path, "file"):
file_size = self.fs.file_size(file_path)
else:
)
storage["descriptor"] = descriptor_file_name
storage["zipfile"] = filename
- self.fs.file_extract(tar, temp_folder)
+ self.fs.file_extract(tar, proposed_revision_path)
+ with self.fs.file_open(
+ (proposed_revision_path, descriptor_file_name), "r"
+ ) as descriptor_file:
+ content = descriptor_file.read()
+ elif compressed == "zip":
+ zipfile = ZipFile(file_pkg)
+ descriptor_file_name = None
+ for package_file in zipfile.infolist():
+ zipfilename = package_file.filename
+ file_path = zipfilename.split("/")
+ if (
+ not file_path[0] or ".." in zipfilename
+ ): # if start with "/" means absolute path
+ raise EngineException(
+ "Absolute path or '..' are not allowed for package descriptor zip"
+ )
+
+ if (
+ (
+ zipfilename.endswith(".yaml")
+ or zipfilename.endswith(".json")
+ or zipfilename.endswith(".yml")
+ ) and (
+ zipfilename.find("/") < 0
+ or zipfilename.find("Definitions") >= 0
+ )
+ ):
+ storage["pkg-dir"] = ""
+ if descriptor_file_name:
+ raise EngineException(
+ "Found more than one descriptor file at package descriptor zip"
+ )
+ descriptor_file_name = zipfilename
+ if not descriptor_file_name:
+ raise EngineException(
+ "Not found any descriptor file at package descriptor zip"
+ )
+ storage["descriptor"] = descriptor_file_name
+ storage["zipfile"] = filename
+ self.fs.file_extract(zipfile, proposed_revision_path)
+
with self.fs.file_open(
- (temp_folder, descriptor_file_name), "r"
+ (proposed_revision_path, descriptor_file_name), "r"
) as descriptor_file:
content = descriptor_file.read()
else:
error_text = "Invalid yaml format "
indata = yaml.load(content, Loader=yaml.SafeLoader)
- current_desc["_admin"]["storage"] = storage
- current_desc["_admin"]["onboardingState"] = "ONBOARDED"
- current_desc["_admin"]["operationalState"] = "ENABLED"
+ # Need to close the file package here so it can be copied from the
+ # revision to the current, unrevisioned record
+ if file_pkg:
+ file_pkg.close()
+ file_pkg = None
+
+ # Fetch both the incoming, proposed revision and the original revision so we
+ # can call a validate method to compare them
+ current_revision_path = _id + "/"
+ self.fs.sync(from_path=current_revision_path)
+ self.fs.sync(from_path=proposed_revision_path)
+
+ if revision > 1:
+ try:
+ self._validate_descriptor_changes(
+ _id,
+ descriptor_file_name,
+ current_revision_path,
+ proposed_revision_path,
+ )
+ except Exception as e:
+ shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+ shutil.rmtree(self.fs.path + proposed_revision_path, ignore_errors=True)
+ # Only delete the new revision. We need to keep the original version in place
+ # as it has not been changed.
+ self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+ raise e
+
indata = self._remove_envelop(indata)
if kwargs:
self._update_input_with_kwargs(indata, kwargs)
+ current_desc["_admin"]["storage"] = storage
+ current_desc["_admin"]["onboardingState"] = "ONBOARDED"
+ current_desc["_admin"]["operationalState"] = "ENABLED"
+ current_desc["_admin"]["modified"] = time()
+ current_desc["_admin"]["revision"] = revision
+
deep_update_rfc7396(current_desc, indata)
current_desc = self.check_conflict_on_edit(
session, current_desc, indata, _id=_id
)
- current_desc["_admin"]["modified"] = time()
+
+ # Copy the revision to the active package name by its original id
+ shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+ os.rename(self.fs.path + proposed_revision_path, self.fs.path + current_revision_path)
+ self.fs.file_delete(current_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(current_revision_path)
+ self.fs.reverse_sync(from_path=current_revision_path)
+
+ shutil.rmtree(self.fs.path + _id)
+
self.db.replace(self.topic, _id, current_desc)
- self.fs.dir_rename(temp_folder, _id)
+
+ # Store a copy of the package as a point in time revision
+ revision_desc = dict(current_desc)
+ revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
+ self.db.create(self.topic + "_revisions", revision_desc)
+ fs_rollback = []
indata["_id"] = _id
self._send_msg("edited", indata)
finally:
if file_pkg:
file_pkg.close()
+ for file in fs_rollback:
+ self.fs.file_delete(file, ignore_non_exist=True)
def get_file(self, session, _id, path=None, accept_header=None):
"""
)
storage = content["_admin"]["storage"]
if path is not None and path != "$DESCRIPTOR": # artifacts
- if not storage.get("pkg-dir"):
+ if not storage.get("pkg-dir") and not storage.get("folder"):
raise EngineException(
"Packages does not contains artifacts",
http_code=HTTPStatus.BAD_REQUEST,
return indata
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id,
+ descriptor_file_name,
+ old_descriptor_directory,
+ new_descriptor_directory
+ ):
+ # Example:
+ # raise EngineException(
+ # "Error in validating new descriptor: <NODE> cannot be modified",
+ # http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ # )
+ pass
class VnfdTopic(DescriptorTopic):
topic = "vnfds"
):
if not self._validate_package_folders(
storage_params, "charms"
+ ) and not self._validate_package_folders(
+ storage_params, "Scripts/charms"
):
raise EngineException(
"Charm defined in vnf[id={}] but not present in "
return
if not self._validate_package_folders(
storage_params, "cloud_init", vdu["cloud-init-file"]
+ ) and not self._validate_package_folders(
+ storage_params, "Scripts/cloud_init", vdu["cloud-init-file"]
):
raise EngineException(
"Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
day_1_2_config.get("execution-environment-list", []),
lambda ee: "juju" in ee,
):
- if not self._validate_package_folders(storage_params, "charms"):
+ if not self._validate_package_folders(
+ storage_params, "charms"
+ ) and not self._validate_package_folders(
+ storage_params, "Scripts/charms"
+ ):
raise EngineException(
"Charm defined in vnf[id={}] but not present in "
"package".format(indata["id"])
)
def _validate_package_folders(self, storage_params, folder, file=None):
- if not storage_params or not storage_params.get("pkg-dir"):
+ if not storage_params:
+ return False
+ elif not storage_params.get("pkg-dir"):
+ if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
+ f = "{}_/{}".format(
+ storage_params["folder"], folder
+ )
+ else:
+ f = "{}/{}".format(
+ storage_params["folder"], folder
+ )
+ if file:
+ return self.fs.file_exists("{}/{}".format(f, file), "file")
+ else:
+ if self.fs.file_exists(f, "dir"):
+ if self.fs.dir_ls(f):
+ return True
return False
else:
if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
+ self.db.del_list(self.topic+"_revisions", {"_id": {"$regex": _id}})
def sol005_projection(self, data):
data["onboardingState"] = data["_admin"]["onboardingState"]
return super().sol005_projection(data)
+ @staticmethod
+ def find_software_version(vnfd: dict) -> str:
+ """Find the sotware version in the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ software-version (str)
+ """
+ default_sw_version = "1.0"
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ if vnfd.get("software-version"):
+ return vnfd["software-version"]
+ else:
+ return default_sw_version
+
+ @staticmethod
+ def extract_policies(vnfd: dict) -> dict:
+ """Removes the policies from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): VNFD which does not include policies
+ """
+ for df in vnfd.get("df", {}):
+ for policy in ["scaling-aspect", "healing-aspect"]:
+ if (df.get(policy, {})):
+ df.pop(policy)
+ for vdu in vnfd.get("vdu", {}):
+ for alarm_policy in ["alarm", "monitoring-parameter"]:
+ if (vdu.get(alarm_policy, {})):
+ vdu.pop(alarm_policy)
+ return vnfd
+
+ @staticmethod
+ def extract_day12_primitives(vnfd: dict) -> dict:
+ """Removes the day12 primitives from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict)
+ """
+ for df_id, df in enumerate(vnfd.get("df", {})):
+ if (
+ df.get("lcm-operations-configuration", {})
+ .get("operate-vnf-op-config", {})
+ .get("day1-2")
+ ):
+ day12 = df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2"
+ )
+ for config_id, config in enumerate(day12):
+ for key in [
+ "initial-config-primitive",
+ "config-primitive",
+ "terminate-config-primitive",
+ ]:
+ config.pop(key, None)
+ day12[config_id] = config
+ df["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"
+ ] = day12
+ vnfd["df"][df_id] = df
+ return vnfd
+
+ def remove_modifiable_items(self, vnfd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from VNFD
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): Descriptor which does not include modifiable contents
+ """
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ vnfd.pop("_admin", None)
+ # If the other extractions need to be done from VNFD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives, self.extract_policies]:
+ vnfd_temp = extract_function(vnfd)
+ vnfd = vnfd_temp
+ return vnfd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id: str,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new VNFD descriptors and validates the new descriptor.
+
+ Args:
+ old_descriptor_directory (str): Directory of descriptor which is in-use
+ new_descriptor_directory (str): Directory of descriptor which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error when there are unallowed changes
+ """
+ try:
+ # If VNFD does not exist in DB or it is not in use by any NS,
+ # validation is not required.
+ vnfd = self.db.get_one("vnfds", {"_id": descriptor_id})
+ if not vnfd or not detect_descriptor_usage(vnfd, "vnfds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
+ with self.fs.file_open(
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as old_descriptor_file:
+
+ with self.fs.file_open(
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as new_descriptor_file:
+
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
+ # If software version has changed, we do not need to validate
+ # the differences anymore.
+ if old_content and new_content:
+ if self.find_software_version(
+ old_content
+ ) != self.find_software_version(new_content):
+ return
+
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + " , " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the vnf descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "VNF Descriptor could not be processed with error: {}.".format(e)
+ )
+
class NsdTopic(DescriptorTopic):
topic = "nsds"
http_code=HTTPStatus.CONFLICT,
)
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
+ """
+ Deletes associate file system storage (via super)
+ Deletes associated vnfpkgops from database.
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: server internal id
+ :param db_content: The database content of the descriptor
+ :return: None
+ :raises: FsException in case of error while deleting associated storage
+ """
+ super().delete_extra(session, _id, db_content, not_send_msg)
+ self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
+
+ @staticmethod
+ def extract_day12_primitives(nsd: dict) -> dict:
+ """Removes the day12 primitives from the NSD descriptors
+
+ Args:
+ nsd (dict): Descriptor as a dictionary
+
+ Returns:
+ nsd (dict): Cleared NSD
+ """
+ if nsd.get("ns-configuration"):
+ for key in [
+ "config-primitive",
+ "initial-config-primitive",
+ "terminate-config-primitive",
+ ]:
+ nsd["ns-configuration"].pop(key, None)
+ return nsd
+
+ def remove_modifiable_items(self, nsd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from NSD
+
+ Args:
+ nsd (dict): Descriptor as a dictionary
+
+ Returns:
+ nsd (dict): Descriptor which does not include modifiable contents
+ """
+ while isinstance(nsd, dict) and nsd.get("nsd"):
+ nsd = nsd["nsd"]
+ if isinstance(nsd, list):
+ nsd = nsd[0]
+ nsd.pop("_admin", None)
+ # If the more extractions need to be done from NSD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives]:
+ nsd_temp = extract_function(nsd)
+ nsd = nsd_temp
+ return nsd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id: str,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new NSD descriptors and validates the new descriptor
+
+ Args:
+ old_descriptor_directory: Directory of descriptor which is in-use
+ new_descriptor_directory: Directory of descriptor which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error if the changes are not allowed
+ """
+
+ try:
+ # If NSD does not exist in DB, or it is not in use by any NS,
+ # validation is not required.
+ nsd = self.db.get_one("nsds", {"_id": descriptor_id}, fail_on_empty=False)
+ if not nsd or not detect_descriptor_usage(nsd, "nsds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
+ with self.fs.file_open(
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as old_descriptor_file:
+
+ with self.fs.file_open(
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as new_descriptor_file:
+
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
+ if old_content and new_content:
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + ", " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the ns descriptor. ",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "NS Descriptor could not be processed with error: {}.".format(e)
+ )
+
def sol005_projection(self, data):
data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
data["nsdOperationalState"] = data["_admin"]["operationalState"]