+
+
+class NsConfigTemplateTopic(DescriptorTopic):
+ topic = "ns_config_template"
+ topic_msg = "nsd"
+ schema_new = ns_config_template
+ instantiation_params = {
+ "vnf": vnf_schema,
+ "vld": vld_schema,
+ "additionalParamsForVnf": additional_params_for_vnf,
+ }
+
+ def __init__(self, db, fs, msg, auth):
+ super().__init__(db, fs, msg, auth)
+
+ def check_conflict_on_del(self, session, _id, db_content):
+ """
+ Check that there is not any NSR that uses this NS CONFIG TEMPLATE. Only NSRs belonging to this project are considered.
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: ns config template internal id
+ :param db_content: The database content of the _id
+ :return: None or raises EngineException with the conflict
+ """
+ if session["force"]:
+ return
+ descriptor = db_content
+ descriptor_id = descriptor.get("nsdId")
+ if not descriptor_id: # empty nsd not uploaded
+ return
+
+ # check NS CONFIG TEMPLATE used by NS
+ ns_config_template_id = _id
+
+ if self.db.get_list(
+ "nsrs", {"instantiate_params.nsConfigTemplateId": ns_config_template_id}
+ ):
+ raise EngineException(
+ "There is at least one NS instance using this template",
+ http_code=HTTPStatus.CONFLICT,
+ )
+
+ def check_unique_template_name(self, edit_content, _id, session):
+ """
+ Check whether the name of the template is unique or not
+ """
+
+ if edit_content.get("name"):
+ name = edit_content.get("name")
+ db_content = self.db.get_one(
+ "ns_config_template", {"name": name}, fail_on_empty=False
+ )
+ if db_content is not None:
+ if db_content.get("_id") == _id:
+ if db_content.get("name") == name:
+ return
+ elif db_content.get("_id") != _id:
+ raise EngineException(
+ "{} of the template already exist".format(name)
+ )
+ else:
+ return
+
+ def check_conflict_on_edit(self, session, final_content, edit_content, _id):
+ """
+ Check the input data format
+ And the edit content data too.
+ """
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
+ db_content_id = self.db.get_one(
+ "ns_config_template", {"_id": _id}, fail_on_empty=False
+ )
+ if not (
+ db_content_id.get("name")
+ and db_content_id.get("nsdId")
+ and db_content_id.get("config")
+ ):
+ validate_input(edit_content, self.schema_new)
+
+ try:
+ for key, value in edit_content.items():
+ if key == "name":
+ self.check_unique_template_name(edit_content, _id, session)
+ elif key == "nsdId":
+ ns_config_template = self.db.get_one(
+ "ns_config_template", {"_id": _id}, fail_on_empty=False
+ )
+ if not ns_config_template.get("nsdId"):
+ pass
+ else:
+ raise EngineException("Nsd id cannot be edited")
+ elif key == "config":
+ edit_content_param = edit_content.get("config")
+ for key, value in edit_content_param.items():
+ param = key
+ param_content = value
+ validate_input(param_content, self.instantiation_params[param])
+ return final_content
+ except Exception as e:
+ raise EngineException(
+ "Error in instantiation parameters validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )