import tarfile
import yaml
import json
+import copy
+import os
+import shutil
+import functools
+import re
+
# import logging
+from deepdiff import DeepDiff
from hashlib import md5
from osm_common.dbbase import DbException, deep_update_rfc7396
from http import HTTPStatus
from time import time
from uuid import uuid4
from re import fullmatch
-from osm_nbi.validation import ValidationError, pdu_new_schema, pdu_edit_schema, \
- validate_input, vnfpkgop_new_schema
-from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable
-from osm_im.vnfd import vnfd as vnfd_im
-from osm_im.nsd import nsd as nsd_im
+from zipfile import ZipFile
+from osm_nbi.validation import (
+ ValidationError,
+ pdu_new_schema,
+ pdu_edit_schema,
+ validate_input,
+ vnfpkgop_new_schema,
+)
+from osm_nbi.base_topic import (
+ BaseTopic,
+ EngineException,
+ get_iterable,
+ detect_descriptor_usage,
+)
+from osm_im import etsi_nfv_vnfd, etsi_nfv_nsd
from osm_im.nst import nst as nst_im
from pyangbind.lib.serialise import pybindJSONDecoder
import pyangbind.lib.pybindJSON as pybindJSON
+from osm_nbi import utils
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+valid_helm_chart_re = re.compile(
+ r"^[a-z0-9]([-a-z0-9]*[a-z0-9]/)?([a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+)
-class DescriptorTopic(BaseTopic):
+class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
- BaseTopic.__init__(self, db, fs, msg, auth)
+ super().__init__(db, fs, msg, auth)
+
+ def _validate_input_new(self, indata, storage_params, force=False):
+ return indata
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
def _check_unique_id_name(descriptor, position=""):
for desc_key, desc_item in descriptor.items():
desc_item_id = None
for index, list_item in enumerate(desc_item):
if isinstance(list_item, dict):
- _check_unique_id_name(list_item, "{}.{}[{}]"
- .format(position, desc_key, index))
+ _check_unique_id_name(
+ list_item, "{}.{}[{}]".format(position, desc_key, index)
+ )
# Base case
- if index == 0 and (list_item.get("id") or list_item.get("name")):
+ if index == 0 and (
+ list_item.get("id") or list_item.get("name")
+ ):
desc_item_id = "id" if list_item.get("id") else "name"
if desc_item_id and list_item.get(desc_item_id):
if list_item[desc_item_id] in used_ids:
- position = "{}.{}[{}]".format(position, desc_key, index)
- raise EngineException("Error: identifier {} '{}' is not unique and repeats at '{}'"
- .format(desc_item_id, list_item[desc_item_id],
- position), HTTPStatus.UNPROCESSABLE_ENTITY)
+ position = "{}.{}[{}]".format(
+ position, desc_key, index
+ )
+ raise EngineException(
+ "Error: identifier {} '{}' is not unique and repeats at '{}'".format(
+ desc_item_id,
+ list_item[desc_item_id],
+ position,
+ ),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
used_ids.append(list_item[desc_item_id])
+
_check_unique_id_name(final_content)
# 1. validate again with pyangbind
# 1.1. remove internal keys
if k in final_content:
internal_keys[k] = final_content.pop(k)
storage_params = internal_keys["_admin"].get("storage")
- serialized = self._validate_input_new(final_content, storage_params, session["force"])
+ serialized = self._validate_input_new(
+ final_content, storage_params, session["force"]
+ )
+
# 1.2. modify final_content with a serialized version
- final_content.clear()
- final_content.update(serialized)
+ final_content = copy.deepcopy(serialized)
# 1.3. restore internal keys
for k, v in internal_keys.items():
final_content[k] = v
-
if session["force"]:
- return
+ return final_content
+
# 2. check that this id is not present
if "id" in edit_content:
_filter = self._get_project_filter(session)
+
_filter["id"] = final_content["id"]
_filter["_id.neq"] = _id
+
if self.db.get_one(self.topic, _filter, fail_on_empty=False):
- raise EngineException("{} with id '{}' already exists for this project".format(self.topic[:-1],
- final_content["id"]),
- HTTPStatus.CONFLICT)
+ raise EngineException(
+ "{} with id '{}' already exists for this project".format(
+ (str(self.topic))[:-1], final_content["id"]
+ ),
+ HTTPStatus.CONFLICT,
+ )
+
+ return final_content
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
"""
self.fs.file_delete(_id, ignore_non_exist=True)
self.fs.file_delete(_id + "_", ignore_non_exist=True) # remove temp folder
+ # Remove file revisions
+ if "revision" in db_content["_admin"]:
+ revision = db_content["_admin"]["revision"]
+ while revision > 0:
+ self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
+ revision = revision - 1
@staticmethod
def get_one_by_id(db, session, topic, id):
if len(desc_list) == 1:
return desc_list[0]
elif len(desc_list) > 1:
- raise DbException("Found more than one {} with id='{}' belonging to this project".format(topic[:-1], id),
- HTTPStatus.CONFLICT)
+ raise DbException(
+ "Found more than one {} with id='{}' belonging to this project".format(
+ topic[:-1], id
+ ),
+ HTTPStatus.CONFLICT,
+ )
# not found any: try to find public
_filter = BaseTopic._get_project_filter(session)
_filter["id"] = id
desc_list = db.get_list(topic, _filter)
if not desc_list:
- raise DbException("Not found any {} with id='{}'".format(topic[:-1], id), HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with id='{}'".format(topic[:-1], id),
+ HTTPStatus.NOT_FOUND,
+ )
elif len(desc_list) == 1:
return desc_list[0]
else:
- raise DbException("Found more than one public {} with id='{}'; and no one belonging to this project".format(
- topic[:-1], id), HTTPStatus.CONFLICT)
+ raise DbException(
+ "Found more than one public {} with id='{}'; and no one belonging to this project".format(
+ topic[:-1], id
+ ),
+ HTTPStatus.CONFLICT,
+ )
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
# _remove_envelop
if indata:
if "userDefinedData" in indata:
- indata = indata['userDefinedData']
+ indata = indata["userDefinedData"]
# Override descriptor with query string kwargs
self._update_input_with_kwargs(indata, kwargs)
# Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
- content = {"_admin": {"userDefinedData": indata}}
- self.format_on_new(content, session["project_id"], make_public=session["public"])
+ content = {"_admin": {"userDefinedData": indata, "revision": 0}}
+
+ self.format_on_new(
+ content, session["project_id"], make_public=session["public"]
+ )
_id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": _id})
self._send_msg("created", {"_id": _id})
expected_md5 = headers.get("Content-File-MD5")
compressed = None
content_type = headers.get("Content-Type")
- if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \
- "application/zip" in content_type:
+ if (
+ content_type
+ and "application/gzip" in content_type
+ or "application/x-gzip" in content_type
+ ):
compressed = "gzip"
+ if content_type and "application/zip" in content_type:
+ compressed = "zip"
filename = headers.get("Content-Filename")
- if not filename:
- filename = "package.tar.gz" if compressed else "package"
+ if not filename and compressed:
+ filename = "package.tar.gz" if compressed == "gzip" else "package.zip"
+ elif not filename:
+ filename = "package"
+
+ revision = 1
+ if "revision" in current_desc["_admin"]:
+ revision = current_desc["_admin"]["revision"] + 1
+
# TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
file_pkg = None
error_text = ""
+ fs_rollback = []
+
try:
if content_range_text:
- content_range = content_range_text.replace("-", " ").replace("/", " ").split()
- if content_range[0] != "bytes": # TODO check x<y not negative < total....
+ content_range = (
+ content_range_text.replace("-", " ").replace("/", " ").split()
+ )
+ if (
+ content_range[0] != "bytes"
+ ): # TODO check x<y not negative < total....
raise IndexError()
start = int(content_range[1])
end = int(content_range[2]) + 1
total = int(content_range[3])
else:
start = 0
- temp_folder = _id + "_" # all the content is upload here and if ok, it is rename from id_ to is folder
+ # Rather than using a temp folder, we will store the package in a folder based on
+ # the current revision.
+ proposed_revision_path = (
+ _id + ":" + str(revision)
+ ) # all the content is upload here and if ok, it is rename from id_ to is folder
if start:
- if not self.fs.file_exists(temp_folder, 'dir'):
- raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND)
+ if not self.fs.file_exists(proposed_revision_path, "dir"):
+ raise EngineException(
+ "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
+ )
else:
- self.fs.file_delete(temp_folder, ignore_non_exist=True)
- self.fs.mkdir(temp_folder)
+ self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(proposed_revision_path)
+ fs_rollback.append(proposed_revision_path)
storage = self.fs.get_params()
- storage["folder"] = _id
+ storage["folder"] = proposed_revision_path
- file_path = (temp_folder, filename)
- if self.fs.file_exists(file_path, 'file'):
+ file_path = (proposed_revision_path, filename)
+ if self.fs.file_exists(file_path, "file"):
file_size = self.fs.file_size(file_path)
else:
file_size = 0
if file_size != start:
- raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
- file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
- file_pkg = self.fs.file_open(file_path, 'a+b')
+ raise EngineException(
+ "invalid Content-Range start sequence, expected '{}' but received '{}'".format(
+ file_size, start
+ ),
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
+ )
+ file_pkg = self.fs.file_open(file_path, "a+b")
if isinstance(indata, dict):
indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
file_pkg.write(indata_text.encode(encoding="utf-8"))
break
file_pkg.write(indata_text)
if content_range_text:
- if indata_len != end-start:
- raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
- start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
+ if indata_len != end - start:
+ raise EngineException(
+ "Mismatch between Content-Range header {}-{} and body length of {}".format(
+ start, end - 1, indata_len
+ ),
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
+ )
if end != total:
# TODO update to UPLOADING
return False
raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
file_pkg.seek(0, 0)
if compressed == "gzip":
- tar = tarfile.open(mode='r', fileobj=file_pkg)
+ tar = tarfile.open(mode="r", fileobj=file_pkg)
descriptor_file_name = None
for tarinfo in tar:
tarname = tarinfo.name
tarname_path = tarname.split("/")
- if not tarname_path[0] or ".." in tarname_path: # if start with "/" means absolute path
- raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
+ if (
+ not tarname_path[0] or ".." in tarname_path
+ ): # if start with "/" means absolute path
+ raise EngineException(
+ "Absolute path or '..' are not allowed for package descriptor tar.gz"
+ )
if len(tarname_path) == 1 and not tarinfo.isdir():
- raise EngineException("All files must be inside a dir for package descriptor tar.gz")
- if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"):
+ raise EngineException(
+ "All files must be inside a dir for package descriptor tar.gz"
+ )
+ if (
+ tarname.endswith(".yaml")
+ or tarname.endswith(".json")
+ or tarname.endswith(".yml")
+ ):
storage["pkg-dir"] = tarname_path[0]
if len(tarname_path) == 2:
if descriptor_file_name:
raise EngineException(
- "Found more than one descriptor file at package descriptor tar.gz")
+ "Found more than one descriptor file at package descriptor tar.gz"
+ )
descriptor_file_name = tarname
if not descriptor_file_name:
- raise EngineException("Not found any descriptor file at package descriptor tar.gz")
+ raise EngineException(
+ "Not found any descriptor file at package descriptor tar.gz"
+ )
storage["descriptor"] = descriptor_file_name
storage["zipfile"] = filename
- self.fs.file_extract(tar, temp_folder)
- with self.fs.file_open((temp_folder, descriptor_file_name), "r") as descriptor_file:
+ self.fs.file_extract(tar, proposed_revision_path)
+ with self.fs.file_open(
+ (proposed_revision_path, descriptor_file_name), "r"
+ ) as descriptor_file:
+ content = descriptor_file.read()
+ elif compressed == "zip":
+ zipfile = ZipFile(file_pkg)
+ descriptor_file_name = None
+ for package_file in zipfile.infolist():
+ zipfilename = package_file.filename
+ file_path = zipfilename.split("/")
+ if (
+ not file_path[0] or ".." in zipfilename
+ ): # if start with "/" means absolute path
+ raise EngineException(
+ "Absolute path or '..' are not allowed for package descriptor zip"
+ )
+
+ if (
+ zipfilename.endswith(".yaml")
+ or zipfilename.endswith(".json")
+ or zipfilename.endswith(".yml")
+ ) and (
+ zipfilename.find("/") < 0
+ or zipfilename.find("Definitions") >= 0
+ ):
+ storage["pkg-dir"] = ""
+ if descriptor_file_name:
+ raise EngineException(
+ "Found more than one descriptor file at package descriptor zip"
+ )
+ descriptor_file_name = zipfilename
+ if not descriptor_file_name:
+ raise EngineException(
+ "Not found any descriptor file at package descriptor zip"
+ )
+ storage["descriptor"] = descriptor_file_name
+ storage["zipfile"] = filename
+ self.fs.file_extract(zipfile, proposed_revision_path)
+
+ with self.fs.file_open(
+ (proposed_revision_path, descriptor_file_name), "r"
+ ) as descriptor_file:
content = descriptor_file.read()
else:
content = file_pkg.read()
indata = json.load(content)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(content, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(content)
- current_desc["_admin"]["storage"] = storage
- current_desc["_admin"]["onboardingState"] = "ONBOARDED"
- current_desc["_admin"]["operationalState"] = "ENABLED"
+ # Need to close the file package here so it can be copied from the
+ # revision to the current, unrevisioned record
+ if file_pkg:
+ file_pkg.close()
+ file_pkg = None
+
+ # Fetch both the incoming, proposed revision and the original revision so we
+ # can call a validate method to compare them
+ current_revision_path = _id + "/"
+ self.fs.sync(from_path=current_revision_path)
+ self.fs.sync(from_path=proposed_revision_path)
+
+ if revision > 1:
+ try:
+ self._validate_descriptor_changes(
+ _id,
+ descriptor_file_name,
+ current_revision_path,
+ proposed_revision_path,
+ )
+ except Exception as e:
+ shutil.rmtree(
+ self.fs.path + current_revision_path, ignore_errors=True
+ )
+ shutil.rmtree(
+ self.fs.path + proposed_revision_path, ignore_errors=True
+ )
+ # Only delete the new revision. We need to keep the original version in place
+ # as it has not been changed.
+ self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+ raise e
indata = self._remove_envelop(indata)
# Override descriptor with query string kwargs
if kwargs:
self._update_input_with_kwargs(indata, kwargs)
- # it will call overrides method at VnfdTopic or NsdTopic
- # indata = self._validate_input_edit(indata, force=session["force"])
- deep_update_rfc7396(current_desc, indata)
- self.check_conflict_on_edit(session, current_desc, indata, _id=_id)
+ current_desc["_admin"]["storage"] = storage
+ current_desc["_admin"]["onboardingState"] = "ONBOARDED"
+ current_desc["_admin"]["operationalState"] = "ENABLED"
current_desc["_admin"]["modified"] = time()
+ current_desc["_admin"]["revision"] = revision
+
+ deep_update_rfc7396(current_desc, indata)
+ current_desc = self.check_conflict_on_edit(
+ session, current_desc, indata, _id=_id
+ )
+
+ # Copy the revision to the active package name by its original id
+ shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+ os.rename(
+ self.fs.path + proposed_revision_path,
+ self.fs.path + current_revision_path,
+ )
+ self.fs.file_delete(current_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(current_revision_path)
+ self.fs.reverse_sync(from_path=current_revision_path)
+
+ shutil.rmtree(self.fs.path + _id)
+
self.db.replace(self.topic, _id, current_desc)
- self.fs.dir_rename(temp_folder, _id)
+
+ # Store a copy of the package as a point in time revision
+ revision_desc = dict(current_desc)
+ revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
+ self.db.create(self.topic + "_revisions", revision_desc)
+ fs_rollback = []
indata["_id"] = _id
self._send_msg("edited", indata)
except EngineException:
raise
except IndexError:
- raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
- HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
+ raise EngineException(
+ "invalid Content-Range header format. Expected 'bytes start-end/total'",
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
+ )
except IOError as e:
- raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST)
+ raise EngineException(
+ "invalid upload transaction sequence: '{}'".format(e),
+ HTTPStatus.BAD_REQUEST,
+ )
except tarfile.ReadError as e:
- raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST)
+ raise EngineException(
+ "invalid file content {}".format(e), HTTPStatus.BAD_REQUEST
+ )
except (ValueError, yaml.YAMLError) as e:
raise EngineException(error_text + str(e))
except ValidationError as e:
finally:
if file_pkg:
file_pkg.close()
+ for file in fs_rollback:
+ self.fs.file_delete(file, ignore_non_exist=True)
def get_file(self, session, _id, path=None, accept_header=None):
"""
"""
accept_text = accept_zip = False
if accept_header:
- if 'text/plain' in accept_header or '*/*' in accept_header:
+ if "text/plain" in accept_header or "*/*" in accept_header:
accept_text = True
- if 'application/zip' in accept_header or '*/*' in accept_header:
- accept_zip = 'application/zip'
- elif 'application/gzip' in accept_header:
- accept_zip = 'application/gzip'
+ if "application/zip" in accept_header or "*/*" in accept_header:
+ accept_zip = "application/zip"
+ elif "application/gzip" in accept_header:
+ accept_zip = "application/gzip"
if not accept_text and not accept_zip:
- raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
- http_code=HTTPStatus.NOT_ACCEPTABLE)
+ raise EngineException(
+ "provide request header 'Accept' with 'application/zip' or 'text/plain'",
+ http_code=HTTPStatus.NOT_ACCEPTABLE,
+ )
content = self.show(session, _id)
if content["_admin"]["onboardingState"] != "ONBOARDED":
- raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
- "onboardingState is {}".format(content["_admin"]["onboardingState"]),
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Cannot get content because this resource is not at 'ONBOARDED' state. "
+ "onboardingState is {}".format(content["_admin"]["onboardingState"]),
+ http_code=HTTPStatus.CONFLICT,
+ )
storage = content["_admin"]["storage"]
- if path is not None and path != "$DESCRIPTOR": # artifacts
- if not storage.get('pkg-dir'):
- raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST)
- if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'):
- folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path))
+ if path is not None and path != "$DESCRIPTOR": # artifacts
+ if not storage.get("pkg-dir") and not storage.get("folder"):
+ raise EngineException(
+ "Packages does not contains artifacts",
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+ if self.fs.file_exists(
+ (storage["folder"], storage["pkg-dir"], *path), "dir"
+ ):
+ folder_content = self.fs.dir_ls(
+ (storage["folder"], storage["pkg-dir"], *path)
+ )
return folder_content, "text/plain"
# TODO manage folders in http
else:
- return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"),\
- "application/octet-stream"
+ return (
+ self.fs.file_open(
+ (storage["folder"], storage["pkg-dir"], *path), "rb"
+ ),
+ "application/octet-stream",
+ )
# pkgtype accept ZIP TEXT -> result
# manyfiles yes X -> zip
# no yes -> error
# onefile yes no -> zip
# X yes -> text
-
- if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"):
- return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
- elif storage.get('pkg-dir') and not accept_zip:
- raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
- "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
+ contain_many_files = False
+ if storage.get("pkg-dir"):
+ # check if there are more than one file in the package, ignoring checksums.txt.
+ pkg_files = self.fs.dir_ls((storage["folder"], storage["pkg-dir"]))
+ if len(pkg_files) >= 3 or (
+ len(pkg_files) == 2 and "checksums.txt" not in pkg_files
+ ):
+ contain_many_files = True
+ if accept_text and (not contain_many_files or path == "$DESCRIPTOR"):
+ return (
+ self.fs.file_open((storage["folder"], storage["descriptor"]), "r"),
+ "text/plain",
+ )
+ elif contain_many_files and not accept_zip:
+ raise EngineException(
+ "Packages that contains several files need to be retrieved with 'application/zip'"
+ "Accept header",
+ http_code=HTTPStatus.NOT_ACCEPTABLE,
+ )
else:
- if not storage.get('zipfile'):
+ if not storage.get("zipfile"):
# TODO generate zipfile if not present
- raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
- "future versions", http_code=HTTPStatus.NOT_ACCEPTABLE)
- return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), accept_zip
+ raise EngineException(
+ "Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
+ "future versions",
+ http_code=HTTPStatus.NOT_ACCEPTABLE,
+ )
+ return (
+ self.fs.file_open((storage["folder"], storage["zipfile"]), "rb"),
+ accept_zip,
+ )
+
+ def _remove_yang_prefixes_from_descriptor(self, descriptor):
+ new_descriptor = {}
+ for k, v in descriptor.items():
+ new_v = v
+ if isinstance(v, dict):
+ new_v = self._remove_yang_prefixes_from_descriptor(v)
+ elif isinstance(v, list):
+ new_v = list()
+ for x in v:
+ if isinstance(x, dict):
+ new_v.append(self._remove_yang_prefixes_from_descriptor(x))
+ else:
+ new_v.append(x)
+ new_descriptor[k.split(":")[-1]] = new_v
+ return new_descriptor
def pyangbind_validation(self, item, data, force=False):
- try:
- if item == "vnfds":
- myvnfd = vnfd_im()
- pybindJSONDecoder.load_ietf_json({'vnfd:vnfd-catalog': {'vnfd': [data]}}, None, None, obj=myvnfd,
- path_helper=True, skip_unknown=force)
- out = pybindJSON.dumps(myvnfd, mode="ietf")
- elif item == "nsds":
- mynsd = nsd_im()
- pybindJSONDecoder.load_ietf_json({'nsd:nsd-catalog': {'nsd': [data]}}, None, None, obj=mynsd,
- path_helper=True, skip_unknown=force)
- out = pybindJSON.dumps(mynsd, mode="ietf")
- elif item == "nsts":
- mynst = nst_im()
- pybindJSONDecoder.load_ietf_json({'nst': [data]}, None, None, obj=mynst,
- path_helper=True, skip_unknown=force)
- out = pybindJSON.dumps(mynst, mode="ietf")
+ raise EngineException(
+ "Not possible to validate '{}' item".format(item),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
+
+ def _validate_input_edit(self, indata, content, force=False):
+ # not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
+ if "_id" in indata:
+ indata.pop("_id")
+ if "_admin" not in indata:
+ indata["_admin"] = {}
+
+ if "operationalState" in indata:
+ if indata["operationalState"] in ("ENABLED", "DISABLED"):
+ indata["_admin"]["operationalState"] = indata.pop("operationalState")
+ else:
+ raise EngineException(
+ "State '{}' is not a valid operational state".format(
+ indata["operationalState"]
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+
+ # In the case of user defined data, we need to put the data in the root of the object
+ # to preserve current expected behaviour
+ if "userDefinedData" in indata:
+ data = indata.pop("userDefinedData")
+ if isinstance(data, dict):
+ indata["_admin"]["userDefinedData"] = data
else:
- raise EngineException("Not possible to validate '{}' item".format(item),
- http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "userDefinedData should be an object, but is '{}' instead".format(
+ type(data)
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+
+ if (
+ "operationalState" in indata["_admin"]
+ and content["_admin"]["operationalState"]
+ == indata["_admin"]["operationalState"]
+ ):
+ raise EngineException(
+ "operationalState already {}".format(
+ content["_admin"]["operationalState"]
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
- desc_out = self._remove_envelop(yaml.safe_load(out))
- return desc_out
+ return indata
- except Exception as e:
- raise EngineException("Error in pyangbind validation: {}".format(str(e)),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id,
+ descriptor_file_name,
+ old_descriptor_directory,
+ new_descriptor_directory,
+ ):
+ # Example:
+ # raise EngineException(
+ # "Error in validating new descriptor: <NODE> cannot be modified",
+ # http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ # )
+ pass
class VnfdTopic(DescriptorTopic):
def __init__(self, db, fs, msg, auth):
DescriptorTopic.__init__(self, db, fs, msg, auth)
+ def pyangbind_validation(self, item, data, force=False):
+ if self._descriptor_data_is_in_old_format(data):
+ raise EngineException(
+ "ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ try:
+ myvnfd = etsi_nfv_vnfd.etsi_nfv_vnfd()
+ pybindJSONDecoder.load_ietf_json(
+ {"etsi-nfv-vnfd:vnfd": data},
+ None,
+ None,
+ obj=myvnfd,
+ path_helper=True,
+ skip_unknown=force,
+ )
+ out = pybindJSON.dumps(myvnfd, mode="ietf")
+ desc_out = self._remove_envelop(yaml.safe_load(out))
+ desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
+ return utils.deep_update_dict(data, desc_out)
+ except Exception as e:
+ raise EngineException(
+ "Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def _descriptor_data_is_in_old_format(data):
+ return ("vnfd-catalog" in data) or ("vnfd:vnfd-catalog" in data)
+
@staticmethod
def _remove_envelop(indata=None):
if not indata:
return {}
clean_indata = indata
- if clean_indata.get('vnfd:vnfd-catalog'):
- clean_indata = clean_indata['vnfd:vnfd-catalog']
- elif clean_indata.get('vnfd-catalog'):
- clean_indata = clean_indata['vnfd-catalog']
- if clean_indata.get('vnfd'):
- if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
- raise EngineException("'vnfd' must be a list of only one element")
- clean_indata = clean_indata['vnfd'][0]
- elif clean_indata.get('vnfd:vnfd'):
- if not isinstance(clean_indata['vnfd:vnfd'], list) or len(clean_indata['vnfd:vnfd']) != 1:
- raise EngineException("'vnfd:vnfd' must be a list of only one element")
- clean_indata = clean_indata['vnfd:vnfd'][0]
+
+ if clean_indata.get("etsi-nfv-vnfd:vnfd"):
+ if not isinstance(clean_indata["etsi-nfv-vnfd:vnfd"], dict):
+ raise EngineException("'etsi-nfv-vnfd:vnfd' must be a dict")
+ clean_indata = clean_indata["etsi-nfv-vnfd:vnfd"]
+ elif clean_indata.get("vnfd"):
+ if not isinstance(clean_indata["vnfd"], dict):
+ raise EngineException("'vnfd' must be dict")
+ clean_indata = clean_indata["vnfd"]
+
return clean_indata
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
# set type of vnfd
contains_pdu = False
elif contains_vdu:
final_content["_admin"]["type"] = "vnfd"
# if neither vud nor pdu do not fill type
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
# check vnfrs using this vnfd
_filter["vnfd-id"] = _id
if self.db.get_list("vnfrs", _filter):
- raise EngineException("There is at least one VNF using this descriptor", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VNF instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
# check NSD referencing this VNFD
del _filter["vnfd-id"]
- _filter["constituent-vnfd.ANYINDEX.vnfd-id-ref"] = descriptor_id
+ _filter["vnfd-id"] = descriptor_id
if self.db.get_list("nsds", _filter):
- raise EngineException("There is at least one NSD referencing this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one NS package referencing this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
def _validate_input_new(self, indata, storage_params, force=False):
+ indata.pop("onboardingState", None)
+ indata.pop("operationalState", None)
+ indata.pop("usageState", None)
+ indata.pop("links", None)
+
indata = self.pyangbind_validation("vnfds", indata, force)
# Cross references validation in the descriptor
- if indata.get("vdu"):
- if not indata.get("mgmt-interface"):
- raise EngineException("'mgmt-interface' is a mandatory field and it is not defined",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- if indata["mgmt-interface"].get("cp"):
- for cp in get_iterable(indata.get("connection-point")):
- if cp["name"] == indata["mgmt-interface"]["cp"]:
- break
- else:
- raise EngineException("mgmt-interface:cp='{}' must match an existing connection-point"
- .format(indata["mgmt-interface"]["cp"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ self.validate_mgmt_interface_connection_point(indata)
for vdu in get_iterable(indata.get("vdu")):
- icp_refs = []
- ecp_refs = []
- for interface in get_iterable(vdu.get("interface")):
- if interface.get("external-connection-point-ref"):
- if interface.get("external-connection-point-ref") in ecp_refs:
- raise EngineException("vdu[id='{}']:interface[name='{}']:external-connection-point-ref='{}' "
- "is referenced by other interface"
- .format(vdu["id"], interface["name"],
- interface["external-connection-point-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- ecp_refs.append(interface.get("external-connection-point-ref"))
- for cp in get_iterable(indata.get("connection-point")):
- if cp["name"] == interface["external-connection-point-ref"]:
- break
- else:
- raise EngineException("vdu[id='{}']:interface[name='{}']:external-connection-point-ref='{}' "
- "must match an existing connection-point"
- .format(vdu["id"], interface["name"],
- interface["external-connection-point-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- elif interface.get("internal-connection-point-ref"):
- if interface.get("internal-connection-point-ref") in icp_refs:
- raise EngineException("vdu[id='{}']:interface[name='{}']:internal-connection-point-ref='{}' "
- "is referenced by other interface"
- .format(vdu["id"], interface["name"],
- interface["internal-connection-point-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- icp_refs.append(interface.get("internal-connection-point-ref"))
- for internal_cp in get_iterable(vdu.get("internal-connection-point")):
- if interface["internal-connection-point-ref"] == internal_cp.get("id"):
- break
- else:
- raise EngineException("vdu[id='{}']:interface[name='{}']:internal-connection-point-ref='{}' "
- "must match an existing vdu:internal-connection-point"
- .format(vdu["id"], interface["name"],
- interface["internal-connection-point-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- # Validate that if descriptor contains charms, artifacts _admin.storage."pkg-dir" is not none
- if vdu.get("vdu-configuration"):
- if vdu["vdu-configuration"].get("juju"):
- if not self._validate_package_folders(storage_params, 'charms'):
- raise EngineException("Charm defined in vnf[id={}]:vdu[id={}] but not present in "
- "package".format(indata["id"], vdu["id"]))
- # Validate that if descriptor contains cloud-init, artifacts _admin.storage."pkg-dir" is not none
- if vdu.get("cloud-init-file"):
- if not self._validate_package_folders(storage_params, 'cloud_init', vdu["cloud-init-file"]):
- raise EngineException("Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
- "package".format(indata["id"], vdu["id"]))
- # Validate that if descriptor contains charms, artifacts _admin.storage."pkg-dir" is not none
- if indata.get("vnf-configuration"):
- if indata["vnf-configuration"].get("juju"):
- if not self._validate_package_folders(storage_params, 'charms'):
- raise EngineException("Charm defined in vnf[id={}] but not present in "
- "package".format(indata["id"]))
- vld_names = [] # For detection of duplicated VLD names
- for ivld in get_iterable(indata.get("internal-vld")):
- # BEGIN Detection of duplicated VLD names
- ivld_name = ivld.get("name")
- if ivld_name:
- if ivld_name in vld_names:
- raise EngineException("Duplicated VLD name '{}' in vnfd[id={}]:internal-vld[id={}]"
- .format(ivld["name"], indata["id"], ivld["id"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- else:
- vld_names.append(ivld_name)
- # END Detection of duplicated VLD names
- for icp in get_iterable(ivld.get("internal-connection-point")):
- icp_mark = False
- for vdu in get_iterable(indata.get("vdu")):
- for internal_cp in get_iterable(vdu.get("internal-connection-point")):
- if icp["id-ref"] == internal_cp["id"]:
- icp_mark = True
- break
- if icp_mark:
- break
- else:
- raise EngineException("internal-vld[id='{}']:internal-connection-point='{}' must match an existing "
- "vdu:internal-connection-point".format(ivld["id"], icp["id-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- if ivld.get("ip-profile-ref"):
- for ip_prof in get_iterable(indata.get("ip-profiles")):
- if ip_prof["name"] == get_iterable(ivld.get("ip-profile-ref")):
- break
- else:
- raise EngineException("internal-vld[id='{}']:ip-profile-ref='{}' does not exist".format(
- ivld["id"], ivld["ip-profile-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- for mp in get_iterable(indata.get("monitoring-param")):
- if mp.get("vdu-monitoring-param"):
- mp_vmp_mark = False
- for vdu in get_iterable(indata.get("vdu")):
- for vmp in get_iterable(vdu.get("monitoring-param")):
- if vmp["id"] == mp["vdu-monitoring-param"].get("vdu-monitoring-param-ref") and vdu["id"] ==\
- mp["vdu-monitoring-param"]["vdu-ref"]:
- mp_vmp_mark = True
- break
- if mp_vmp_mark:
- break
- else:
- raise EngineException("monitoring-param:vdu-monitoring-param:vdu-monitoring-param-ref='{}' not "
- "defined at vdu[id='{}'] or vdu does not exist"
- .format(mp["vdu-monitoring-param"]["vdu-monitoring-param-ref"],
- mp["vdu-monitoring-param"]["vdu-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- elif mp.get("vdu-metric"):
- mp_vm_mark = False
- for vdu in get_iterable(indata.get("vdu")):
- if vdu.get("vdu-configuration"):
- for metric in get_iterable(vdu["vdu-configuration"].get("metrics")):
- if metric["name"] == mp["vdu-metric"]["vdu-metric-name-ref"] and vdu["id"] == \
- mp["vdu-metric"]["vdu-ref"]:
- mp_vm_mark = True
- break
- if mp_vm_mark:
- break
- else:
- raise EngineException("monitoring-param:vdu-metric:vdu-metric-name-ref='{}' not defined at "
- "vdu[id='{}'] or vdu does not exist"
- .format(mp["vdu-metric"]["vdu-metric-name-ref"],
- mp["vdu-metric"]["vdu-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
-
- for sgd in get_iterable(indata.get("scaling-group-descriptor")):
- for sp in get_iterable(sgd.get("scaling-policy")):
- for sc in get_iterable(sp.get("scaling-criteria")):
- for mp in get_iterable(indata.get("monitoring-param")):
- if mp["id"] == get_iterable(sc.get("vnf-monitoring-param-ref")):
- break
- else:
- raise EngineException("scaling-group-descriptor[name='{}']:scaling-criteria[name='{}']:"
- "vnf-monitoring-param-ref='{}' not defined in any monitoring-param"
- .format(sgd["name"], sc["name"], sc["vnf-monitoring-param-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- for sgd_vdu in get_iterable(sgd.get("vdu")):
- sgd_vdu_mark = False
- for vdu in get_iterable(indata.get("vdu")):
- if vdu["id"] == sgd_vdu["vdu-id-ref"]:
- sgd_vdu_mark = True
- break
- if sgd_vdu_mark:
- break
- else:
- raise EngineException("scaling-group-descriptor[name='{}']:vdu-id-ref={} does not match any vdu"
- .format(sgd["name"], sgd_vdu["vdu-id-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- for sca in get_iterable(sgd.get("scaling-config-action")):
- if not indata.get("vnf-configuration"):
- raise EngineException("'vnf-configuration' not defined in the descriptor but it is referenced by "
- "scaling-group-descriptor[name='{}']:scaling-config-action"
- .format(sgd["name"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- for primitive in get_iterable(indata["vnf-configuration"].get("config-primitive")):
- if primitive["name"] == sca["vnf-config-primitive-name-ref"]:
- break
- else:
- raise EngineException("scaling-group-descriptor[name='{}']:scaling-config-action:vnf-config-"
- "primitive-name-ref='{}' does not match any "
- "vnf-configuration:config-primitive:name"
- .format(sgd["name"], sca["vnf-config-primitive-name-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- return indata
+ self.validate_vdu_internal_connection_points(vdu)
+ self._validate_vdu_cloud_init_in_package(storage_params, vdu, indata)
+ self._validate_vdu_charms_in_package(storage_params, indata)
+
+ self._validate_vnf_charms_in_package(storage_params, indata)
+
+ self.validate_external_connection_points(indata)
+ self.validate_internal_virtual_links(indata)
+ self.validate_monitoring_params(indata)
+ self.validate_scaling_group_descriptor(indata)
+ self.validate_helm_chart(indata)
- def _validate_input_edit(self, indata, force=False):
- # not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
return indata
+ @staticmethod
+ def validate_helm_chart(indata):
+ kdus = indata.get("kdu", [])
+ for kdu in kdus:
+ helm_chart_value = kdu.get("helm-chart")
+ if not helm_chart_value:
+ continue
+ if not valid_helm_chart_re.match(helm_chart_value):
+ raise EngineException(
+ "helm-chart '{}' is not valid".format(helm_chart_value),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_mgmt_interface_connection_point(indata):
+ if not indata.get("vdu"):
+ return
+ if not indata.get("mgmt-cp"):
+ raise EngineException(
+ "'mgmt-cp' is a mandatory field and it is not defined",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for cp in get_iterable(indata.get("ext-cpd")):
+ if cp["id"] == indata["mgmt-cp"]:
+ break
+ else:
+ raise EngineException(
+ "mgmt-cp='{}' must match an existing ext-cpd".format(indata["mgmt-cp"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_vdu_internal_connection_points(vdu):
+ int_cpds = set()
+ for cpd in get_iterable(vdu.get("int-cpd")):
+ cpd_id = cpd.get("id")
+ if cpd_id and cpd_id in int_cpds:
+ raise EngineException(
+ "vdu[id='{}']:int-cpd[id='{}'] is already used by other int-cpd".format(
+ vdu["id"], cpd_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ int_cpds.add(cpd_id)
+
+ @staticmethod
+ def validate_external_connection_points(indata):
+ all_vdus_int_cpds = set()
+ for vdu in get_iterable(indata.get("vdu")):
+ for int_cpd in get_iterable(vdu.get("int-cpd")):
+ all_vdus_int_cpds.add((vdu.get("id"), int_cpd.get("id")))
+
+ ext_cpds = set()
+ for cpd in get_iterable(indata.get("ext-cpd")):
+ cpd_id = cpd.get("id")
+ if cpd_id and cpd_id in ext_cpds:
+ raise EngineException(
+ "ext-cpd[id='{}'] is already used by other ext-cpd".format(cpd_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ ext_cpds.add(cpd_id)
+
+ int_cpd = cpd.get("int-cpd")
+ if int_cpd:
+ if (int_cpd.get("vdu-id"), int_cpd.get("cpd")) not in all_vdus_int_cpds:
+ raise EngineException(
+ "ext-cpd[id='{}']:int-cpd must match an existing vdu int-cpd".format(
+ cpd_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ # TODO: Validate k8s-cluster-net points to a valid k8s-cluster:nets ?
+
+ def _validate_vdu_charms_in_package(self, storage_params, indata):
+ for df in indata["df"]:
+ if (
+ "lcm-operations-configuration" in df
+ and "operate-vnf-op-config" in df["lcm-operations-configuration"]
+ ):
+ configs = df["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ].get("day1-2", [])
+ vdus = df.get("vdu-profile", [])
+ for vdu in vdus:
+ for config in configs:
+ if config["id"] == vdu["id"] and utils.find_in_list(
+ config.get("execution-environment-list", []),
+ lambda ee: "juju" in ee,
+ ):
+ if not self._validate_package_folders(
+ storage_params, "charms"
+ ) and not self._validate_package_folders(
+ storage_params, "Scripts/charms"
+ ):
+ raise EngineException(
+ "Charm defined in vnf[id={}] but not present in "
+ "package".format(indata["id"])
+ )
+
+ def _validate_vdu_cloud_init_in_package(self, storage_params, vdu, indata):
+ if not vdu.get("cloud-init-file"):
+ return
+ if not self._validate_package_folders(
+ storage_params, "cloud_init", vdu["cloud-init-file"]
+ ) and not self._validate_package_folders(
+ storage_params, "Scripts/cloud_init", vdu["cloud-init-file"]
+ ):
+ raise EngineException(
+ "Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
+ "package".format(indata["id"], vdu["id"])
+ )
+
+ def _validate_vnf_charms_in_package(self, storage_params, indata):
+ # Get VNF configuration through new container
+ for deployment_flavor in indata.get("df", []):
+ if "lcm-operations-configuration" not in deployment_flavor:
+ return
+ if (
+ "operate-vnf-op-config"
+ not in deployment_flavor["lcm-operations-configuration"]
+ ):
+ return
+ for day_1_2_config in deployment_flavor["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ]["day1-2"]:
+ if day_1_2_config["id"] == indata["id"]:
+ if utils.find_in_list(
+ day_1_2_config.get("execution-environment-list", []),
+ lambda ee: "juju" in ee,
+ ):
+ if not self._validate_package_folders(
+ storage_params, "charms"
+ ) and not self._validate_package_folders(
+ storage_params, "Scripts/charms"
+ ):
+ raise EngineException(
+ "Charm defined in vnf[id={}] but not present in "
+ "package".format(indata["id"])
+ )
+
def _validate_package_folders(self, storage_params, folder, file=None):
- if not storage_params or not storage_params.get("pkg-dir"):
+ if not storage_params:
+ return False
+ elif not storage_params.get("pkg-dir"):
+ if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
+ f = "{}_/{}".format(storage_params["folder"], folder)
+ else:
+ f = "{}/{}".format(storage_params["folder"], folder)
+ if file:
+ return self.fs.file_exists("{}/{}".format(f, file), "file")
+ else:
+ if self.fs.file_exists(f, "dir"):
+ if self.fs.dir_ls(f):
+ return True
return False
else:
- if self.fs.file_exists("{}_".format(storage_params["folder"]), 'dir'):
- f = "{}_/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
+ if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
+ f = "{}_/{}/{}".format(
+ storage_params["folder"], storage_params["pkg-dir"], folder
+ )
else:
- f = "{}/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
+ f = "{}/{}/{}".format(
+ storage_params["folder"], storage_params["pkg-dir"], folder
+ )
if file:
- return self.fs.file_exists("{}/{}".format(f, file), 'file')
+ return self.fs.file_exists("{}/{}".format(f, file), "file")
else:
- if self.fs.file_exists(f, 'dir'):
+ if self.fs.file_exists(f, "dir"):
if self.fs.dir_ls(f):
return True
return False
+ @staticmethod
+ def validate_internal_virtual_links(indata):
+ all_ivld_ids = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ ivld_id = ivld.get("id")
+ if ivld_id and ivld_id in all_ivld_ids:
+ raise EngineException(
+ "Duplicated VLD id in int-virtual-link-desc[id={}]".format(ivld_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_ivld_ids.add(ivld_id)
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for int_cpd in get_iterable(vdu.get("int-cpd")):
+ int_cpd_ivld_id = int_cpd.get("int-virtual-link-desc")
+ if int_cpd_ivld_id and int_cpd_ivld_id not in all_ivld_ids:
+ raise EngineException(
+ "vdu[id='{}']:int-cpd[id='{}']:int-virtual-link-desc='{}' must match an existing "
+ "int-virtual-link-desc".format(
+ vdu["id"], int_cpd["id"], int_cpd_ivld_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for df in get_iterable(indata.get("df")):
+ for vlp in get_iterable(df.get("virtual-link-profile")):
+ vlp_ivld_id = vlp.get("id")
+ if vlp_ivld_id and vlp_ivld_id not in all_ivld_ids:
+ raise EngineException(
+ "df[id='{}']:virtual-link-profile='{}' must match an existing "
+ "int-virtual-link-desc".format(df["id"], vlp_ivld_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_monitoring_params(indata):
+ all_monitoring_params = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ for mp in get_iterable(ivld.get("monitoring-parameters")):
+ mp_id = mp.get("id")
+ if mp_id and mp_id in all_monitoring_params:
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "int-virtual-link-desc[id='{}']:monitoring-parameters[id='{}']".format(
+ ivld["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_monitoring_params.add(mp_id)
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for mp in get_iterable(vdu.get("monitoring-parameter")):
+ mp_id = mp.get("id")
+ if mp_id and mp_id in all_monitoring_params:
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "vdu[id='{}']:monitoring-parameter[id='{}']".format(
+ vdu["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_monitoring_params.add(mp_id)
+
+ for df in get_iterable(indata.get("df")):
+ for mp in get_iterable(df.get("monitoring-parameter")):
+ mp_id = mp.get("id")
+ if mp_id and mp_id in all_monitoring_params:
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "df[id='{}']:monitoring-parameter[id='{}']".format(
+ df["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ else:
+ all_monitoring_params.add(mp_id)
+
+ @staticmethod
+ def validate_scaling_group_descriptor(indata):
+ all_monitoring_params = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ for mp in get_iterable(ivld.get("monitoring-parameters")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for mp in get_iterable(vdu.get("monitoring-parameter")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for df in get_iterable(indata.get("df")):
+ for mp in get_iterable(df.get("monitoring-parameter")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for df in get_iterable(indata.get("df")):
+ for sa in get_iterable(df.get("scaling-aspect")):
+ for sp in get_iterable(sa.get("scaling-policy")):
+ for sc in get_iterable(sp.get("scaling-criteria")):
+ sc_monitoring_param = sc.get("vnf-monitoring-param-ref")
+ if (
+ sc_monitoring_param
+ and sc_monitoring_param not in all_monitoring_params
+ ):
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:scaling-policy"
+ "[name='{}']:scaling-criteria[name='{}']: "
+ "vnf-monitoring-param-ref='{}' not defined in any monitoring-param".format(
+ df["id"],
+ sa["id"],
+ sp["name"],
+ sc["name"],
+ sc_monitoring_param,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for sca in get_iterable(sa.get("scaling-config-action")):
+ if (
+ "lcm-operations-configuration" not in df
+ or "operate-vnf-op-config"
+ not in df["lcm-operations-configuration"]
+ or not utils.find_in_list(
+ df["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ].get("day1-2", []),
+ lambda config: config["id"] == indata["id"],
+ )
+ ):
+ raise EngineException(
+ "'day1-2 configuration' not defined in the descriptor but it is "
+ "referenced by df[id='{}']:scaling-aspect[id='{}']:scaling-config-action".format(
+ df["id"], sa["id"]
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ for configuration in get_iterable(
+ df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2", []
+ )
+ ):
+ for primitive in get_iterable(
+ configuration.get("config-primitive")
+ ):
+ if (
+ primitive["name"]
+ == sca["vnf-config-primitive-name-ref"]
+ ):
+ break
+ else:
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:scaling-config-action:vnf-"
+ "config-primitive-name-ref='{}' does not match any "
+ "day1-2 configuration:config-primitive:name".format(
+ df["id"],
+ sa["id"],
+ sca["vnf-config-primitive-name-ref"],
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Deletes associate file system storage (via super)
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
+
+ def sol005_projection(self, data):
+ data["onboardingState"] = data["_admin"]["onboardingState"]
+ data["operationalState"] = data["_admin"]["operationalState"]
+ data["usageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/vnfpkgm/v1/vnf_packages/{}".format(data["_id"])}
+ links["vnfd"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(data["_id"])}
+ links["packageContent"] = {
+ "href": "/vnfpkgm/v1/vnf_packages/{}/package_content".format(data["_id"])
+ }
+ data["_links"] = links
+
+ return super().sol005_projection(data)
+
+ @staticmethod
+ def find_software_version(vnfd: dict) -> str:
+ """Find the sotware version in the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ software-version (str)
+ """
+ default_sw_version = "1.0"
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ if vnfd.get("software-version"):
+ return vnfd["software-version"]
+ else:
+ return default_sw_version
+
+ @staticmethod
+ def extract_policies(vnfd: dict) -> dict:
+ """Removes the policies from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): VNFD which does not include policies
+ """
+ for df in vnfd.get("df", {}):
+ for policy in ["scaling-aspect", "healing-aspect"]:
+ if df.get(policy, {}):
+ df.pop(policy)
+ for vdu in vnfd.get("vdu", {}):
+ for alarm_policy in ["alarm", "monitoring-parameter"]:
+ if vdu.get(alarm_policy, {}):
+ vdu.pop(alarm_policy)
+ return vnfd
+
+ @staticmethod
+ def extract_day12_primitives(vnfd: dict) -> dict:
+ """Removes the day12 primitives from the VNFD descriptors
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict)
+ """
+ for df_id, df in enumerate(vnfd.get("df", {})):
+ if (
+ df.get("lcm-operations-configuration", {})
+ .get("operate-vnf-op-config", {})
+ .get("day1-2")
+ ):
+ day12 = df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2"
+ )
+ for config_id, config in enumerate(day12):
+ for key in [
+ "initial-config-primitive",
+ "config-primitive",
+ "terminate-config-primitive",
+ ]:
+ config.pop(key, None)
+ day12[config_id] = config
+ df["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"
+ ] = day12
+ vnfd["df"][df_id] = df
+ return vnfd
+
+ def remove_modifiable_items(self, vnfd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from VNFD
+
+ Args:
+ vnfd (dict): Descriptor as a dictionary
+
+ Returns:
+ vnfd (dict): Descriptor which does not include modifiable contents
+ """
+ if vnfd.get("vnfd"):
+ vnfd = vnfd["vnfd"]
+ vnfd.pop("_admin", None)
+ # If the other extractions need to be done from VNFD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives, self.extract_policies]:
+ vnfd_temp = extract_function(vnfd)
+ vnfd = vnfd_temp
+ return vnfd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id: str,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new VNFD descriptors and validates the new descriptor.
+
+ Args:
+ old_descriptor_directory (str): Directory of descriptor which is in-use
+ new_descriptor_directory (str): Directory of descriptor which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error when there are unallowed changes
+ """
+ try:
+ # If VNFD does not exist in DB or it is not in use by any NS,
+ # validation is not required.
+ vnfd = self.db.get_one("vnfds", {"_id": descriptor_id})
+ if not vnfd or not detect_descriptor_usage(vnfd, "vnfds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
+ with self.fs.file_open(
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as old_descriptor_file:
+ with self.fs.file_open(
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as new_descriptor_file:
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
+ # If software version has changed, we do not need to validate
+ # the differences anymore.
+ if old_content and new_content:
+ if self.find_software_version(
+ old_content
+ ) != self.find_software_version(new_content):
+ return
+
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + " , " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the vnf descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "VNF Descriptor could not be processed with error: {}.".format(e)
+ )
class NsdTopic(DescriptorTopic):
topic_msg = "nsd"
def __init__(self, db, fs, msg, auth):
- DescriptorTopic.__init__(self, db, fs, msg, auth)
+ super().__init__(db, fs, msg, auth)
+
+ def pyangbind_validation(self, item, data, force=False):
+ if self._descriptor_data_is_in_old_format(data):
+ raise EngineException(
+ "ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ try:
+ nsd_vnf_profiles = data.get("df", [{}])[0].get("vnf-profile", [])
+ mynsd = etsi_nfv_nsd.etsi_nfv_nsd()
+ pybindJSONDecoder.load_ietf_json(
+ {"nsd": {"nsd": [data]}},
+ None,
+ None,
+ obj=mynsd,
+ path_helper=True,
+ skip_unknown=force,
+ )
+ out = pybindJSON.dumps(mynsd, mode="ietf")
+ desc_out = self._remove_envelop(yaml.safe_load(out))
+ desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
+ if nsd_vnf_profiles:
+ desc_out["df"][0]["vnf-profile"] = nsd_vnf_profiles
+ return desc_out
+ except Exception as e:
+ raise EngineException(
+ "Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def _descriptor_data_is_in_old_format(data):
+ return ("nsd-catalog" in data) or ("nsd:nsd-catalog" in data)
@staticmethod
def _remove_envelop(indata=None):
return {}
clean_indata = indata
- if clean_indata.get('nsd:nsd-catalog'):
- clean_indata = clean_indata['nsd:nsd-catalog']
- elif clean_indata.get('nsd-catalog'):
- clean_indata = clean_indata['nsd-catalog']
- if clean_indata.get('nsd'):
- if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
+ if clean_indata.get("nsd"):
+ clean_indata = clean_indata["nsd"]
+ elif clean_indata.get("etsi-nfv-nsd:nsd"):
+ clean_indata = clean_indata["etsi-nfv-nsd:nsd"]
+ if clean_indata.get("nsd"):
+ if (
+ not isinstance(clean_indata["nsd"], list)
+ or len(clean_indata["nsd"]) != 1
+ ):
raise EngineException("'nsd' must be a list of only one element")
- clean_indata = clean_indata['nsd'][0]
- elif clean_indata.get('nsd:nsd'):
- if not isinstance(clean_indata['nsd:nsd'], list) or len(clean_indata['nsd:nsd']) != 1:
- raise EngineException("'nsd:nsd' must be a list of only one element")
- clean_indata = clean_indata['nsd:nsd'][0]
+ clean_indata = clean_indata["nsd"][0]
return clean_indata
def _validate_input_new(self, indata, storage_params, force=False):
+ indata.pop("nsdOnboardingState", None)
+ indata.pop("nsdOperationalState", None)
+ indata.pop("nsdUsageState", None)
+
+ indata.pop("links", None)
+
indata = self.pyangbind_validation("nsds", indata, force)
# Cross references validation in the descriptor
# TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
- for vld in get_iterable(indata.get("vld")):
- if vld.get("mgmt-network") and vld.get("ip-profile-ref"):
- raise EngineException("Error at vld[id='{}']:ip-profile-ref"
- " You cannot set an ip-profile when mgmt-network is True"
- .format(vld["id"]), http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- for vnfd_cp in get_iterable(vld.get("vnfd-connection-point-ref")):
- for constituent_vnfd in get_iterable(indata.get("constituent-vnfd")):
- if vnfd_cp["member-vnf-index-ref"] == constituent_vnfd["member-vnf-index"]:
- if vnfd_cp.get("vnfd-id-ref") and vnfd_cp["vnfd-id-ref"] != constituent_vnfd["vnfd-id-ref"]:
- raise EngineException("Error at vld[id='{}']:vnfd-connection-point-ref[vnfd-id-ref='{}'] "
- "does not match constituent-vnfd[member-vnf-index='{}']:vnfd-id-ref"
- " '{}'".format(vld["id"], vnfd_cp["vnfd-id-ref"],
- constituent_vnfd["member-vnf-index"],
- constituent_vnfd["vnfd-id-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- break
- else:
- raise EngineException("Error at vld[id='{}']:vnfd-connection-point-ref[member-vnf-index-ref='{}'] "
- "does not match any constituent-vnfd:member-vnf-index"
- .format(vld["id"], vnfd_cp["member-vnf-index-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- # Check VNFFGD
- for fgd in get_iterable(indata.get("vnffgd")):
- for cls in get_iterable(fgd.get("classifier")):
- rspref = cls.get("rsp-id-ref")
- for rsp in get_iterable(fgd.get("rsp")):
- rspid = rsp.get("id")
- if rspid and rspref and rspid == rspref:
- break
- else:
- raise EngineException(
- "Error at vnffgd[id='{}']:classifier[id='{}']:rsp-id-ref '{}' does not match any rsp:id"
- .format(fgd["id"], cls["id"], rspref),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ for vld in get_iterable(indata.get("virtual-link-desc")):
+ self.validate_vld_mgmt_network_with_virtual_link_protocol_data(vld, indata)
+
+ self.validate_vnf_profiles_vnfd_id(indata)
+
return indata
- def _validate_input_edit(self, indata, force=False):
+ @staticmethod
+ def validate_vld_mgmt_network_with_virtual_link_protocol_data(vld, indata):
+ if not vld.get("mgmt-network"):
+ return
+ vld_id = vld.get("id")
+ for df in get_iterable(indata.get("df")):
+ for vlp in get_iterable(df.get("virtual-link-profile")):
+ if vld_id and vld_id == vlp.get("virtual-link-desc-id"):
+ if vlp.get("virtual-link-protocol-data"):
+ raise EngineException(
+ "Error at df[id='{}']:virtual-link-profile[id='{}']:virtual-link-"
+ "protocol-data You cannot set a virtual-link-protocol-data "
+ "when mgmt-network is True".format(df["id"], vlp["id"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_vnf_profiles_vnfd_id(indata):
+ all_vnfd_ids = set(get_iterable(indata.get("vnfd-id")))
+ for df in get_iterable(indata.get("df")):
+ for vnf_profile in get_iterable(df.get("vnf-profile")):
+ vnfd_id = vnf_profile.get("vnfd-id")
+ if vnfd_id and vnfd_id not in all_vnfd_ids:
+ raise EngineException(
+ "Error at df[id='{}']:vnf_profile[id='{}']:vnfd-id='{}' "
+ "does not match any vnfd-id".format(
+ df["id"], vnf_profile["id"], vnfd_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ def _validate_input_edit(self, indata, content, force=False):
# not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
+ """
+ indata looks as follows:
+ - In the new case (conformant)
+ {'nsdOperationalState': 'DISABLED', 'userDefinedData': {'id': 'string23',
+ '_id': 'c6ddc544-cede-4b94-9ebe-be07b298a3c1', 'name': 'simon46'}}
+ - In the old case (backwards-compatible)
+ {'id': 'string23', '_id': 'c6ddc544-cede-4b94-9ebe-be07b298a3c1', 'name': 'simon46'}
+ """
+ if "_admin" not in indata:
+ indata["_admin"] = {}
+
+ if "nsdOperationalState" in indata:
+ if indata["nsdOperationalState"] in ("ENABLED", "DISABLED"):
+ indata["_admin"]["operationalState"] = indata.pop("nsdOperationalState")
+ else:
+ raise EngineException(
+ "State '{}' is not a valid operational state".format(
+ indata["nsdOperationalState"]
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+
+ # In the case of user defined data, we need to put the data in the root of the object
+ # to preserve current expected behaviour
+ if "userDefinedData" in indata:
+ data = indata.pop("userDefinedData")
+ if isinstance(data, dict):
+ indata["_admin"]["userDefinedData"] = data
+ else:
+ raise EngineException(
+ "userDefinedData should be an object, but is '{}' instead".format(
+ type(data)
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+ if (
+ "operationalState" in indata["_admin"]
+ and content["_admin"]["operationalState"]
+ == indata["_admin"]["operationalState"]
+ ):
+ raise EngineException(
+ "nsdOperationalState already {}".format(
+ content["_admin"]["operationalState"]
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
return indata
def _check_descriptor_dependencies(self, session, descriptor):
"""
if session["force"]:
return
- member_vnfd_index = {}
- if descriptor.get("constituent-vnfd") and not session["force"]:
- for vnf in descriptor["constituent-vnfd"]:
- vnfd_id = vnf["vnfd-id-ref"]
- filter_q = self._get_project_filter(session)
- filter_q["id"] = vnfd_id
- vnf_list = self.db.get_list("vnfds", filter_q)
- if not vnf_list:
- raise EngineException("Descriptor error at 'constituent-vnfd':'vnfd-id-ref'='{}' references a non "
- "existing vnfd".format(vnfd_id), http_code=HTTPStatus.CONFLICT)
- # elif len(vnf_list) > 1:
- # raise EngineException("More than one vnfd found for id='{}'".format(vnfd_id),
- # http_code=HTTPStatus.CONFLICT)
- member_vnfd_index[vnf["member-vnf-index"]] = vnf_list[0]
+ vnfds_index = self._get_descriptor_constituent_vnfds_index(session, descriptor)
# Cross references validation in the descriptor and vnfd connection point validation
- for vld in get_iterable(descriptor.get("vld")):
- for referenced_vnfd_cp in get_iterable(vld.get("vnfd-connection-point-ref")):
- # look if this vnfd contains this connection point
- vnfd = member_vnfd_index.get(referenced_vnfd_cp["member-vnf-index-ref"])
- for vnfd_cp in get_iterable(vnfd.get("connection-point")):
- if referenced_vnfd_cp.get("vnfd-connection-point-ref") == vnfd_cp["name"]:
- break
- else:
+ for df in get_iterable(descriptor.get("df")):
+ self.validate_df_vnf_profiles_constituent_connection_points(df, vnfds_index)
+
+ def _get_descriptor_constituent_vnfds_index(self, session, descriptor):
+ vnfds_index = {}
+ if descriptor.get("vnfd-id") and not session["force"]:
+ for vnfd_id in get_iterable(descriptor.get("vnfd-id")):
+ query_filter = self._get_project_filter(session)
+ query_filter["id"] = vnfd_id
+ vnf_list = self.db.get_list("vnfds", query_filter)
+ if not vnf_list:
raise EngineException(
- "Error at vld[id='{}']:vnfd-connection-point-ref[member-vnf-index-ref='{}']:vnfd-"
- "connection-point-ref='{}' references a non existing conection-point:name inside vnfd '{}'"
- .format(vld["id"], referenced_vnfd_cp["member-vnf-index-ref"],
- referenced_vnfd_cp["vnfd-connection-point-ref"], vnfd["id"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ "Descriptor error at 'vnfd-id'='{}' references a non "
+ "existing vnfd".format(vnfd_id),
+ http_code=HTTPStatus.CONFLICT,
+ )
+ vnfds_index[vnfd_id] = vnf_list[0]
+ return vnfds_index
+
+ @staticmethod
+ def validate_df_vnf_profiles_constituent_connection_points(df, vnfds_index):
+ for vnf_profile in get_iterable(df.get("vnf-profile")):
+ vnfd = vnfds_index.get(vnf_profile["vnfd-id"])
+ all_vnfd_ext_cpds = set()
+ for ext_cpd in get_iterable(vnfd.get("ext-cpd")):
+ if ext_cpd.get("id"):
+ all_vnfd_ext_cpds.add(ext_cpd.get("id"))
+
+ for virtual_link in get_iterable(
+ vnf_profile.get("virtual-link-connectivity")
+ ):
+ for vl_cpd in get_iterable(virtual_link.get("constituent-cpd-id")):
+ vl_cpd_id = vl_cpd.get("constituent-cpd-id")
+ if vl_cpd_id and vl_cpd_id not in all_vnfd_ext_cpds:
+ raise EngineException(
+ "Error at df[id='{}']:vnf-profile[id='{}']:virtual-link-connectivity"
+ "[virtual-link-profile-id='{}']:constituent-cpd-id='{}' references a "
+ "non existing ext-cpd:id inside vnfd '{}'".format(
+ df["id"],
+ vnf_profile["id"],
+ virtual_link["virtual-link-profile-id"],
+ vl_cpd_id,
+ vnfd["id"],
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
self._check_descriptor_dependencies(session, final_content)
+ return final_content
+
def check_conflict_on_del(self, session, _id, db_content):
"""
Check that there is not any NSR that uses this NSD. Only NSRs belonging to this project are considered. Note
_filter = self._get_project_filter(session)
_filter["nsd-id"] = _id
if self.db.get_list("nsrs", _filter):
- raise EngineException("There is at least one NS using this descriptor", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one NS instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
# check NSD referenced by NST
del _filter["nsd-id"]
_filter["netslice-subnet.ANYINDEX.nsd-ref"] = descriptor_id
if self.db.get_list("nsts", _filter):
- raise EngineException("There is at least one NetSlice Template referencing this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one NetSlice Template referencing this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
+
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
+ """
+ Deletes associate file system storage (via super)
+ Deletes associated vnfpkgops from database.
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: server internal id
+ :param db_content: The database content of the descriptor
+ :return: None
+ :raises: FsException in case of error while deleting associated storage
+ """
+ super().delete_extra(session, _id, db_content, not_send_msg)
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
+
+ @staticmethod
+ def extract_day12_primitives(nsd: dict) -> dict:
+ """Removes the day12 primitives from the NSD descriptors
+
+ Args:
+ nsd (dict): Descriptor as a dictionary
+
+ Returns:
+ nsd (dict): Cleared NSD
+ """
+ if nsd.get("ns-configuration"):
+ for key in [
+ "config-primitive",
+ "initial-config-primitive",
+ "terminate-config-primitive",
+ ]:
+ nsd["ns-configuration"].pop(key, None)
+ return nsd
+
+ def remove_modifiable_items(self, nsd: dict) -> dict:
+ """Removes the modifiable parts from the VNFD descriptors
+
+ It calls different extract functions according to different update types
+ to clear all the modifiable items from NSD
+
+ Args:
+ nsd (dict): Descriptor as a dictionary
+
+ Returns:
+ nsd (dict): Descriptor which does not include modifiable contents
+ """
+ while isinstance(nsd, dict) and nsd.get("nsd"):
+ nsd = nsd["nsd"]
+ if isinstance(nsd, list):
+ nsd = nsd[0]
+ nsd.pop("_admin", None)
+ # If the more extractions need to be done from NSD,
+ # the new extract methods could be appended to below list.
+ for extract_function in [self.extract_day12_primitives]:
+ nsd_temp = extract_function(nsd)
+ nsd = nsd_temp
+ return nsd
+
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id: str,
+ descriptor_file_name: str,
+ old_descriptor_directory: str,
+ new_descriptor_directory: str,
+ ):
+ """Compares the old and new NSD descriptors and validates the new descriptor
+
+ Args:
+ old_descriptor_directory: Directory of descriptor which is in-use
+ new_descriptor_directory: Directory of descriptor which is proposed to update (new revision)
+
+ Returns:
+ None
+
+ Raises:
+ EngineException: In case of error if the changes are not allowed
+ """
+
+ try:
+ # If NSD does not exist in DB, or it is not in use by any NS,
+ # validation is not required.
+ nsd = self.db.get_one("nsds", {"_id": descriptor_id}, fail_on_empty=False)
+ if not nsd or not detect_descriptor_usage(nsd, "nsds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
+ with self.fs.file_open(
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as old_descriptor_file:
+ with self.fs.file_open(
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
+ ) as new_descriptor_file:
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
+ if old_content and new_content:
+ disallowed_change = DeepDiff(
+ self.remove_modifiable_items(old_content),
+ self.remove_modifiable_items(new_content),
+ )
+
+ if disallowed_change:
+ changed_nodes = functools.reduce(
+ lambda a, b: a + ", " + b,
+ [
+ node.lstrip("root")
+ for node in disallowed_change.get(
+ "values_changed"
+ ).keys()
+ ],
+ )
+
+ raise EngineException(
+ f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
+ "there are disallowed changes in the ns descriptor. ",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ except (
+ DbException,
+ AttributeError,
+ IndexError,
+ KeyError,
+ ValueError,
+ ) as e:
+ raise type(e)(
+ "NS Descriptor could not be processed with error: {}.".format(e)
+ )
+
+ def sol005_projection(self, data):
+ data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
+ data["nsdOperationalState"] = data["_admin"]["operationalState"]
+ data["nsdUsageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/nsd/v1/ns_descriptors/{}".format(data["_id"])}
+ links["nsd_content"] = {
+ "href": "/nsd/v1/ns_descriptors/{}/nsd_content".format(data["_id"])
+ }
+ data["_links"] = links
+
+ return super().sol005_projection(data)
class NstTopic(DescriptorTopic):
topic = "nsts"
topic_msg = "nst"
+ quota_name = "slice_templates"
def __init__(self, db, fs, msg, auth):
DescriptorTopic.__init__(self, db, fs, msg, auth)
+ def pyangbind_validation(self, item, data, force=False):
+ try:
+ mynst = nst_im()
+ pybindJSONDecoder.load_ietf_json(
+ {"nst": [data]},
+ None,
+ None,
+ obj=mynst,
+ path_helper=True,
+ skip_unknown=force,
+ )
+ out = pybindJSON.dumps(mynst, mode="ietf")
+ desc_out = self._remove_envelop(yaml.safe_load(out))
+ return desc_out
+ except Exception as e:
+ raise EngineException(
+ "Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
@staticmethod
def _remove_envelop(indata=None):
if not indata:
return {}
clean_indata = indata
- if clean_indata.get('nst'):
- if not isinstance(clean_indata['nst'], list) or len(clean_indata['nst']) != 1:
+ if clean_indata.get("nst"):
+ if (
+ not isinstance(clean_indata["nst"], list)
+ or len(clean_indata["nst"]) != 1
+ ):
raise EngineException("'nst' must be a list only one element")
- clean_indata = clean_indata['nst'][0]
- elif clean_indata.get('nst:nst'):
- if not isinstance(clean_indata['nst:nst'], list) or len(clean_indata['nst:nst']) != 1:
+ clean_indata = clean_indata["nst"][0]
+ elif clean_indata.get("nst:nst"):
+ if (
+ not isinstance(clean_indata["nst:nst"], list)
+ or len(clean_indata["nst:nst"]) != 1
+ ):
raise EngineException("'nst:nst' must be a list only one element")
- clean_indata = clean_indata['nst:nst'][0]
+ clean_indata = clean_indata["nst:nst"][0]
return clean_indata
- def _validate_input_edit(self, indata, force=False):
- # TODO validate with pyangbind, serialize
- return indata
-
def _validate_input_new(self, indata, storage_params, force=False):
+ indata.pop("onboardingState", None)
+ indata.pop("operationalState", None)
+ indata.pop("usageState", None)
indata = self.pyangbind_validation("nsts", indata, force)
return indata.copy()
filter_q = self._get_project_filter(session)
filter_q["id"] = nsd_id
if not self.db.get_list("nsds", filter_q):
- raise EngineException("Descriptor error at 'netslice-subnet':'nsd-ref'='{}' references a non "
- "existing nsd".format(nsd_id), http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Descriptor error at 'netslice-subnet':'nsd-ref'='{}' references a non "
+ "existing nsd".format(nsd_id),
+ http_code=HTTPStatus.CONFLICT,
+ )
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
self._check_descriptor_dependencies(session, final_content)
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
_filter = self._get_project_filter(session)
_filter["_admin.nst-id"] = _id
if self.db.get_list("nsis", _filter):
- raise EngineException("there is at least one Netslice Instance using this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "there is at least one Netslice Instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
+
+ def sol005_projection(self, data):
+ data["onboardingState"] = data["_admin"]["onboardingState"]
+ data["operationalState"] = data["_admin"]["operationalState"]
+ data["usageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/nst/v1/netslice_templates/{}".format(data["_id"])}
+ links["nst"] = {"href": "/nst/v1/netslice_templates/{}/nst".format(data["_id"])}
+ data["_links"] = links
+
+ return super().sol005_projection(data)
class PduTopic(BaseTopic):
topic = "pdus"
topic_msg = "pdu"
+ quota_name = "pduds"
schema_new = pdu_new_schema
schema_edit = pdu_edit_schema
_filter = self._get_project_filter(session)
_filter["vdur.pdu-id"] = _id
if self.db.get_list("vnfrs", _filter):
- raise EngineException("There is at least one VNF using this PDU", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VNF instance using this PDU",
+ http_code=HTTPStatus.CONFLICT,
+ )
class VnfPkgOpTopic(BaseTopic):
BaseTopic.__init__(self, db, fs, msg, auth)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
- raise EngineException("Method 'edit' not allowed for topic '{}'".format(self.topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise EngineException(
+ "Method 'edit' not allowed for topic '{}'".format(self.topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
def delete(self, session, _id, dry_run=False):
- raise EngineException("Method 'delete' not allowed for topic '{}'".format(self.topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise EngineException(
+ "Method 'delete' not allowed for topic '{}'".format(self.topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
def delete_list(self, session, filter_q=None):
- raise EngineException("Method 'delete_list' not allowed for topic '{}'".format(self.topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise EngineException(
+ "Method 'delete_list' not allowed for topic '{}'".format(self.topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
juju_bundle = kdu.get("juju-bundle")
break
else:
- raise EngineException("Not found vnfd[id='{}']:kdu[name='{}']".format(vnfpkg_id, kdu_name))
+ raise EngineException(
+ "Not found vnfd[id='{}']:kdu[name='{}']".format(vnfpkg_id, kdu_name)
+ )
if helm_chart:
indata["helm-chart"] = helm_chart
match = fullmatch(r"([^/]*)/([^/]*)", helm_chart)
match = fullmatch(r"([^/]*)/([^/]*)", juju_bundle)
repo_name = match.group(1) if match else None
else:
- raise EngineException("Found neither 'helm-chart' nor 'juju-bundle' in vnfd[id='{}']:kdu[name='{}']"
- .format(vnfpkg_id, kdu_name))
+ raise EngineException(
+ "Found neither 'helm-chart' nor 'juju-bundle' in vnfd[id='{}']:kdu[name='{}']".format(
+ vnfpkg_id, kdu_name
+ )
+ )
if repo_name:
del filter_q["_id"]
filter_q["name"] = repo_name
"links": {
"self": "/osm/vnfpkgm/v1/vnfpkg_op_occs/" + vnfpkgop_id,
"vnfpkg": "/osm/vnfpkgm/v1/vnf_packages/" + vnfpkg_id,
- }
+ },
}
- self.format_on_new(vnfpkgop_desc, session["project_id"], make_public=session["public"])
+ self.format_on_new(
+ vnfpkgop_desc, session["project_id"], make_public=session["public"]
+ )
ctime = vnfpkgop_desc["_admin"]["created"]
vnfpkgop_desc["statusEnteredTime"] = ctime
vnfpkgop_desc["startTime"] = ctime