import os
import shutil
import functools
+import re
# import logging
from deepdiff import DeepDiff
from uuid import uuid4
from re import fullmatch
from zipfile import ZipFile
+from urllib.parse import urlparse
from osm_nbi.validation import (
ValidationError,
pdu_new_schema,
pdu_edit_schema,
validate_input,
vnfpkgop_new_schema,
+ ns_config_template,
+ vnf_schema,
+ vld_schema,
+ additional_params_for_vnf,
+)
+from osm_nbi.base_topic import (
+ BaseTopic,
+ EngineException,
+ get_iterable,
+ detect_descriptor_usage,
)
-from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable
from osm_im import etsi_nfv_vnfd, etsi_nfv_nsd
from osm_im.nst import nst as nst_im
from pyangbind.lib.serialise import pybindJSONDecoder
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+valid_helm_chart_re = re.compile(
+ r"^[a-z0-9]([-a-z0-9]*[a-z0-9]/)?([a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
+)
+
class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
+ super().__init__(db, fs, msg, auth)
- BaseTopic.__init__(self, db, fs, msg, auth)
+ def _validate_input_new(self, indata, storage_params, force=False):
+ return indata
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
final_content = super().check_conflict_on_edit(
if self.db.get_one(self.topic, _filter, fail_on_empty=False):
raise EngineException(
"{} with id '{}' already exists for this project".format(
- self.topic[:-1], final_content["id"]
+ (str(self.topic))[:-1], final_content["id"]
),
HTTPStatus.CONFLICT,
)
self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
revision = revision - 1
-
@staticmethod
def get_one_by_id(db, session, topic, id):
# find owned by this project
# Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
- content = {"_admin": {
- "userDefinedData": indata,
- "revision": 0
- }}
+ content = {"_admin": {"userDefinedData": indata, "revision": 0}}
self.format_on_new(
content, session["project_id"], make_public=session["public"]
or "application/x-gzip" in content_type
):
compressed = "gzip"
- if (
- content_type
- and "application/zip" in content_type
- ):
+ if content_type and "application/zip" in content_type:
compressed = "zip"
filename = headers.get("Content-Filename")
if not filename and compressed:
# TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
file_pkg = None
error_text = ""
+ fs_rollback = []
+
try:
if content_range_text:
content_range = (
else:
self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
self.fs.mkdir(proposed_revision_path)
+ fs_rollback.append(proposed_revision_path)
storage = self.fs.get_params()
- storage["folder"] = _id
+ storage["folder"] = proposed_revision_path
file_path = (proposed_revision_path, filename)
if self.fs.file_exists(file_path, "file"):
HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
)
file_pkg = self.fs.file_open(file_path, "a+b")
+
if isinstance(indata, dict):
indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
file_pkg.write(indata_text.encode(encoding="utf-8"))
)
if (
- (
- zipfilename.endswith(".yaml")
- or zipfilename.endswith(".json")
- or zipfilename.endswith(".yml")
- ) and (
- zipfilename.find("/") < 0
- or zipfilename.find("Definitions") >= 0
- )
+ zipfilename.endswith(".yaml")
+ or zipfilename.endswith(".json")
+ or zipfilename.endswith(".yml")
+ ) and (
+ zipfilename.find("/") < 0
+ or zipfilename.find("Definitions") >= 0
):
storage["pkg-dir"] = ""
if descriptor_file_name:
indata = json.load(content)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(content, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(content)
# Need to close the file package here so it can be copied from the
# revision to the current, unrevisioned record
if revision > 1:
try:
self._validate_descriptor_changes(
+ _id,
descriptor_file_name,
current_revision_path,
- proposed_revision_path)
+ proposed_revision_path,
+ )
except Exception as e:
- shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
- shutil.rmtree(self.fs.path + proposed_revision_path, ignore_errors=True)
+ shutil.rmtree(
+ self.fs.path + current_revision_path, ignore_errors=True
+ )
+ shutil.rmtree(
+ self.fs.path + proposed_revision_path, ignore_errors=True
+ )
# Only delete the new revision. We need to keep the original version in place
# as it has not been changed.
self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
raise e
- # Copy the revision to the active package name by its original id
- shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
- os.rename(self.fs.path + proposed_revision_path, self.fs.path + current_revision_path)
- self.fs.file_delete(current_revision_path, ignore_non_exist=True)
- self.fs.mkdir(current_revision_path)
- self.fs.reverse_sync(from_path=current_revision_path)
- shutil.rmtree(self.fs.path + _id)
-
- current_desc["_admin"]["storage"] = storage
- current_desc["_admin"]["onboardingState"] = "ONBOARDED"
- current_desc["_admin"]["operationalState"] = "ENABLED"
-
indata = self._remove_envelop(indata)
# Override descriptor with query string kwargs
if kwargs:
self._update_input_with_kwargs(indata, kwargs)
+ current_desc["_admin"]["storage"] = storage
+ current_desc["_admin"]["onboardingState"] = "ONBOARDED"
+ current_desc["_admin"]["operationalState"] = "ENABLED"
+ current_desc["_admin"]["modified"] = time()
+ current_desc["_admin"]["revision"] = revision
+
deep_update_rfc7396(current_desc, indata)
current_desc = self.check_conflict_on_edit(
session, current_desc, indata, _id=_id
)
- current_desc["_admin"]["modified"] = time()
- current_desc["_admin"]["revision"] = revision
+
+ # Copy the revision to the active package name by its original id
+ shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+ os.rename(
+ self.fs.path + proposed_revision_path,
+ self.fs.path + current_revision_path,
+ )
+ self.fs.file_delete(current_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(current_revision_path)
+ self.fs.reverse_sync(from_path=current_revision_path)
+
+ shutil.rmtree(self.fs.path + _id)
+
self.db.replace(self.topic, _id, current_desc)
# Store a copy of the package as a point in time revision
revision_desc = dict(current_desc)
revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
self.db.create(self.topic + "_revisions", revision_desc)
+ fs_rollback = []
indata["_id"] = _id
self._send_msg("edited", indata)
finally:
if file_pkg:
file_pkg.close()
+ for file in fs_rollback:
+ self.fs.file_delete(file, ignore_non_exist=True)
def get_file(self, session, _id, path=None, accept_header=None):
"""
)
storage = content["_admin"]["storage"]
if path is not None and path != "$DESCRIPTOR": # artifacts
- if not storage.get("pkg-dir"):
+ if not storage.get("pkg-dir") and not storage.get("folder"):
raise EngineException(
"Packages does not contains artifacts",
http_code=HTTPStatus.BAD_REQUEST,
# to preserve current expected behaviour
if "userDefinedData" in indata:
data = indata.pop("userDefinedData")
- if type(data) == dict:
+ if isinstance(data, dict):
indata["_admin"]["userDefinedData"] = data
else:
raise EngineException(
return indata
- def _validate_descriptor_changes(self,
+ def _validate_descriptor_changes(
+ self,
+ descriptor_id,
descriptor_file_name,
old_descriptor_directory,
- new_descriptor_directory):
- # Todo: compare changes and throw a meaningful exception for the user to understand
+ new_descriptor_directory,
+ ):
# Example:
# raise EngineException(
# "Error in validating new descriptor: <NODE> cannot be modified",
# )
pass
+
class VnfdTopic(DescriptorTopic):
topic = "vnfds"
topic_msg = "vnfd"
return
_filter = self._get_project_filter(session)
-
# check vnfrs using this vnfd
_filter["vnfd-id"] = _id
+
if self.db.get_list("vnfrs", _filter):
raise EngineException(
"There is at least one VNF instance using this descriptor",
# check NSD referencing this VNFD
del _filter["vnfd-id"]
_filter["vnfd-id"] = descriptor_id
+
if self.db.get_list("nsds", _filter):
raise EngineException(
"There is at least one NS package referencing this descriptor",
self.validate_internal_virtual_links(indata)
self.validate_monitoring_params(indata)
self.validate_scaling_group_descriptor(indata)
+ self.validate_healing_group_descriptor(indata)
+ self.validate_alarm_group_descriptor(indata)
+ self.validate_storage_compute_descriptor(indata)
+ self.validate_helm_chart(indata)
return indata
+ @staticmethod
+ def validate_helm_chart(indata):
+ def is_url(url):
+ result = urlparse(url)
+ return all([result.scheme, result.netloc])
+
+ kdus = indata.get("kdu", [])
+ for kdu in kdus:
+ helm_chart_value = kdu.get("helm-chart")
+ if not helm_chart_value:
+ continue
+ if not (
+ valid_helm_chart_re.match(helm_chart_value) or is_url(helm_chart_value)
+ ):
+ raise EngineException(
+ "helm-chart '{}' is not valid".format(helm_chart_value),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
@staticmethod
def validate_mgmt_interface_connection_point(indata):
if not indata.get("vdu"):
return False
elif not storage_params.get("pkg-dir"):
if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
- f = "{}_/{}".format(
- storage_params["folder"], folder
- )
+ f = "{}_/{}".format(storage_params["folder"], folder)
else:
- f = "{}/{}".format(
- storage_params["folder"], folder
- )
+ f = "{}/{}".format(storage_params["folder"], folder)
if file:
return self.fs.file_exists("{}/{}".format(f, file), "file")
else:
@staticmethod
def validate_scaling_group_descriptor(indata):
all_monitoring_params = set()
+ all_vdu_ids = set()
+ for df in get_iterable(indata.get("df")):
+ for il in get_iterable(df.get("instantiation-level")):
+ for vl in get_iterable(il.get("vdu-level")):
+ all_vdu_ids.add(vl.get("vdu-id"))
+
for ivld in get_iterable(indata.get("int-virtual-link-desc")):
for mp in get_iterable(ivld.get("monitoring-parameters")):
all_monitoring_params.add(mp.get("id"))
for mp in get_iterable(df.get("monitoring-parameter")):
all_monitoring_params.add(mp.get("id"))
+ for df in get_iterable(indata.get("df")):
+ for sa in get_iterable(df.get("scaling-aspect")):
+ for deltas in get_iterable(
+ sa.get("aspect-delta-details").get("deltas")
+ ):
+ for vds in get_iterable(deltas.get("vdu-delta")):
+ sa_vdu_id = vds.get("id")
+ if sa_vdu_id and sa_vdu_id not in all_vdu_ids:
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:aspect-delta-details"
+ "[delta='{}']: "
+ "vdu-id='{}' not defined in vdu".format(
+ df["id"],
+ sa["id"],
+ deltas["id"],
+ sa_vdu_id,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
for df in get_iterable(indata.get("df")):
for sa in get_iterable(df.get("scaling-aspect")):
for sp in get_iterable(sa.get("scaling-policy")):
http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
)
+ @staticmethod
+ def validate_healing_group_descriptor(indata):
+ all_vdu_ids = set()
+ for df in get_iterable(indata.get("df")):
+ for il in get_iterable(df.get("instantiation-level")):
+ for vl in get_iterable(il.get("vdu-level")):
+ all_vdu_ids.add(vl.get("vdu-id"))
+
+ for df in get_iterable(indata.get("df")):
+ for ha in get_iterable(df.get("healing-aspect")):
+ for hp in get_iterable(ha.get("healing-policy")):
+ hp_monitoring_param = hp.get("vdu-id")
+ if hp_monitoring_param and hp_monitoring_param not in all_vdu_ids:
+ raise EngineException(
+ "df[id='{}']:healing-aspect[id='{}']:healing-policy"
+ "[name='{}']: "
+ "vdu-id='{}' not defined in vdu".format(
+ df["id"],
+ ha["id"],
+ hp["event-name"],
+ hp_monitoring_param,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_alarm_group_descriptor(indata):
+ all_monitoring_params = set()
+ for ivld in get_iterable(indata.get("int-virtual-link-desc")):
+ for mp in get_iterable(ivld.get("monitoring-parameters")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for mp in get_iterable(vdu.get("monitoring-parameter")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for df in get_iterable(indata.get("df")):
+ for mp in get_iterable(df.get("monitoring-parameter")):
+ all_monitoring_params.add(mp.get("id"))
+
+ for vdus in get_iterable(indata.get("vdu")):
+ for alarms in get_iterable(vdus.get("alarm")):
+ alarm_monitoring_param = alarms.get("vnf-monitoring-param-ref")
+ if (
+ alarm_monitoring_param
+ and alarm_monitoring_param not in all_monitoring_params
+ ):
+ raise EngineException(
+ "vdu[id='{}']:alarm[id='{}']:"
+ "vnf-monitoring-param-ref='{}' not defined in any monitoring-param".format(
+ vdus["id"],
+ alarms["alarm-id"],
+ alarm_monitoring_param,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ @staticmethod
+ def validate_storage_compute_descriptor(indata):
+ all_vsd_ids = set()
+ for vsd in get_iterable(indata.get("virtual-storage-desc")):
+ all_vsd_ids.add(vsd.get("id"))
+
+ all_vcd_ids = set()
+ for vcd in get_iterable(indata.get("virtual-compute-desc")):
+ all_vcd_ids.add(vcd.get("id"))
+
+ for vdus in get_iterable(indata.get("vdu")):
+ for vsd_ref in vdus.get("virtual-storage-desc"):
+ if vsd_ref and vsd_ref not in all_vsd_ids:
+ raise EngineException(
+ "vdu[virtual-storage-desc='{}']"
+ "not defined in vnfd".format(
+ vsd_ref,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for vdus in get_iterable(indata.get("vdu")):
+ vcd_ref = vdus.get("virtual-compute-desc")
+ if vcd_ref and vcd_ref not in all_vcd_ids:
+ raise EngineException(
+ "vdu[virtual-compute-desc='{}']"
+ "not defined in vnfd".format(
+ vdus["virtual-compute-desc"],
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
Deletes associate file system storage (via super)
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
- self.db.del_list(self.topic+"_revisions", {"_id": {"$regex": _id}})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
def sol005_projection(self, data):
data["onboardingState"] = data["_admin"]["onboardingState"]
Returns:
vnfd (dict): VNFD which does not include policies
"""
- # TODO: Extract the policy related parts from the VNFD
+ for df in vnfd.get("df", {}):
+ for policy in ["scaling-aspect", "healing-aspect"]:
+ if df.get(policy, {}):
+ df.pop(policy)
+ for vdu in vnfd.get("vdu", {}):
+ for alarm_policy in ["alarm", "monitoring-parameter"]:
+ if vdu.get(alarm_policy, {}):
+ vdu.pop(alarm_policy)
return vnfd
@staticmethod
def _validate_descriptor_changes(
self,
+ descriptor_id: str,
descriptor_file_name: str,
old_descriptor_directory: str,
new_descriptor_directory: str,
Args:
old_descriptor_directory (str): Directory of descriptor which is in-use
- new_descriptor_directory (str): Directory of directory which is proposed to update (new revision)
+ new_descriptor_directory (str): Directory of descriptor which is proposed to update (new revision)
Returns:
None
EngineException: In case of error when there are unallowed changes
"""
try:
+ # If VNFD does not exist in DB or it is not in use by any NS,
+ # validation is not required.
+ vnfd = self.db.get_one("vnfds", {"_id": descriptor_id})
+ if not vnfd or not detect_descriptor_usage(vnfd, "vnfds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
with self.fs.file_open(
(old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as old_descriptor_file:
with self.fs.file_open(
- (new_descriptor_directory, descriptor_file_name), "r"
+ (new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as new_descriptor_file:
- old_content = yaml.load(
- old_descriptor_file.read(), Loader=yaml.SafeLoader
- )
- new_content = yaml.load(
- new_descriptor_file.read(), Loader=yaml.SafeLoader
- )
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
+ # If software version has changed, we do not need to validate
+ # the differences anymore.
if old_content and new_content:
if self.find_software_version(
old_content
) != self.find_software_version(new_content):
return
+
disallowed_change = DeepDiff(
self.remove_modifiable_items(old_content),
self.remove_modifiable_items(new_content),
)
+
if disallowed_change:
changed_nodes = functools.reduce(
lambda a, b: a + " , " + b,
).keys()
],
)
+
raise EngineException(
f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
"there are disallowed changes in the vnf descriptor.",
topic_msg = "nsd"
def __init__(self, db, fs, msg, auth):
- DescriptorTopic.__init__(self, db, fs, msg, auth)
+ super().__init__(db, fs, msg, auth)
def pyangbind_validation(self, item, data, force=False):
if self._descriptor_data_is_in_old_format(data):
# TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
for vld in get_iterable(indata.get("virtual-link-desc")):
self.validate_vld_mgmt_network_with_virtual_link_protocol_data(vld, indata)
+ for fg in get_iterable(indata.get("vnffgd")):
+ self.validate_vnffgd_data(fg, indata)
self.validate_vnf_profiles_vnfd_id(indata)
http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
)
+ @staticmethod
+ def validate_vnffgd_data(fg, indata):
+ position_list = []
+ all_vnf_ids = set(get_iterable(fg.get("vnf-profile-id")))
+ for fgposition in get_iterable(fg.get("nfp-position-element")):
+ position_list.append(fgposition["id"])
+
+ for nfpd in get_iterable(fg.get("nfpd")):
+ nfp_position = []
+ for position in get_iterable(nfpd.get("position-desc-id")):
+ nfp_position = position.get("nfp-position-element-id")
+ if position == "nfp-position-element-id":
+ nfp_position = position.get("nfp-position-element-id")
+ if nfp_position[0] not in position_list:
+ raise EngineException(
+ "Error at vnffgd nfpd[id='{}']:nfp-position-element-id='{}' "
+ "does not match any nfp-position-element".format(
+ nfpd["id"], nfp_position[0]
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
+ for cp in get_iterable(position.get("cp-profile-id")):
+ for cpe in get_iterable(cp.get("constituent-profile-elements")):
+ constituent_base_element_id = cpe.get(
+ "constituent-base-element-id"
+ )
+ if (
+ constituent_base_element_id
+ and constituent_base_element_id not in all_vnf_ids
+ ):
+ raise EngineException(
+ "Error at vnffgd constituent_profile[id='{}']:vnfd-id='{}' "
+ "does not match any constituent-base-element-id".format(
+ cpe["id"], constituent_base_element_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+
@staticmethod
def validate_vnf_profiles_vnfd_id(indata):
all_vnfd_ids = set(get_iterable(indata.get("vnfd-id")))
# to preserve current expected behaviour
if "userDefinedData" in indata:
data = indata.pop("userDefinedData")
- if type(data) == dict:
+ if isinstance(data, dict):
indata["_admin"]["userDefinedData"] = data
else:
raise EngineException(
:raises: FsException in case of error while deleting associated storage
"""
super().delete_extra(session, _id, db_content, not_send_msg)
- self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
@staticmethod
def extract_day12_primitives(nsd: dict) -> dict:
def _validate_descriptor_changes(
self,
+ descriptor_id: str,
descriptor_file_name: str,
old_descriptor_directory: str,
new_descriptor_directory: str,
Args:
old_descriptor_directory: Directory of descriptor which is in-use
- new_descriptor_directory: Directory of directory which is proposed to update (new revision)
+ new_descriptor_directory: Directory of descriptor which is proposed to update (new revision)
Returns:
None
"""
try:
+ # If NSD does not exist in DB, or it is not in use by any NS,
+ # validation is not required.
+ nsd = self.db.get_one("nsds", {"_id": descriptor_id}, fail_on_empty=False)
+ if not nsd or not detect_descriptor_usage(nsd, "nsds", self.db):
+ return
+
+ # Get the old and new descriptor contents in order to compare them.
with self.fs.file_open(
- (old_descriptor_directory, descriptor_file_name), "r"
+ (old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as old_descriptor_file:
with self.fs.file_open(
(new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as new_descriptor_file:
- old_content = yaml.load(
- old_descriptor_file.read(), Loader=yaml.SafeLoader
- )
- new_content = yaml.load(
- new_descriptor_file.read(), Loader=yaml.SafeLoader
- )
+ old_content = yaml.safe_load(old_descriptor_file.read())
+ new_content = yaml.safe_load(new_descriptor_file.read())
+
if old_content and new_content:
disallowed_change = DeepDiff(
self.remove_modifiable_items(old_content),
self.remove_modifiable_items(new_content),
)
+
if disallowed_change:
changed_nodes = functools.reduce(
lambda a, b: a + ", " + b,
).keys()
],
)
+
raise EngineException(
f"Error in validating new descriptor: {changed_nodes} cannot be modified, "
"there are disallowed changes in the ns descriptor. ",
rollback.append({"topic": self.topic, "_id": vnfpkgop_id})
self.msg.write(self.topic_msg, operation, vnfpkgop_desc)
return vnfpkgop_id, None
+
+
+class NsConfigTemplateTopic(DescriptorTopic):
+ topic = "ns_config_template"
+ topic_msg = "nsd"
+ schema_new = ns_config_template
+ instantiation_params = {
+ "vnf": vnf_schema,
+ "vld": vld_schema,
+ "additionalParamsForVnf": additional_params_for_vnf,
+ }
+
+ def __init__(self, db, fs, msg, auth):
+ super().__init__(db, fs, msg, auth)
+
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check that there is not any NSR that uses this NS CONFIG TEMPLATE. Only NSRs belonging to this project are considered.
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: ns config template internal id
+ :param db_content: The database content of the _id
+ :return: None or raises EngineException with the conflict
+ """
+ if session["force"]:
+ return
+ descriptor = db_content
+ descriptor_id = descriptor.get("nsdId")
+ if not descriptor_id: # empty nsd not uploaded
+ return
+
+ # check NS CONFIG TEMPLATE used by NS
+ ns_config_template_id = _id
+
+ if self.db.get_list(
+ "nsrs", {"instantiate_params.nsConfigTemplateId": ns_config_template_id}
+ ):
+ raise EngineException(
+ "There is at least one NS instance using this template",
+ http_code=HTTPStatus.CONFLICT,
+ )
+
+ def check_unique_template_name(self, edit_content, _id, session):
+ """
+ Check whether the name of the template is unique or not
+ """
+
+ if edit_content.get("name"):
+ name = edit_content.get("name")
+ db_content = self.db.get_one(
+ "ns_config_template", {"name": name}, fail_on_empty=False
+ )
+ if db_content is not None:
+ if db_content.get("_id") == _id:
+ if db_content.get("name") == name:
+ return
+ elif db_content.get("_id") != _id:
+ raise EngineException(
+ "{} of the template already exist".format(name)
+ )
+ else:
+ return
+
+ def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+ """
+ Check the input data format
+ And the edit content data too.
+ """
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
+ db_content_id = self.db.get_one(
+ "ns_config_template", {"_id": _id}, fail_on_empty=False
+ )
+ if not (
+ db_content_id.get("name")
+ and db_content_id.get("nsdId")
+ and db_content_id.get("config")
+ ):
+ validate_input(edit_content, self.schema_new)
+
+ try:
+ for key, value in edit_content.items():
+ if key == "name":
+ self.check_unique_template_name(edit_content, _id, session)
+ elif key == "nsdId":
+ ns_config_template = self.db.get_one(
+ "ns_config_template", {"_id": _id}, fail_on_empty=False
+ )
+ if not ns_config_template.get("nsdId"):
+ pass
+ else:
+ raise EngineException("Nsd id cannot be edited")
+ elif key == "config":
+ edit_content_param = edit_content.get("config")
+ for key, value in edit_content_param.items():
+ param = key
+ param_content = value
+ validate_input(param_content, self.instantiation_params[param])
+ return final_content
+ except Exception as e:
+ raise EngineException(
+ "Error in instantiation parameters validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )