for k in ("_id", "_admin"):
if k in final_content:
internal_keys[k] = final_content.pop(k)
- serialized = self._validate_input_new(final_content, force)
+ storage_params = internal_keys["_admin"].get("storage")
+ serialized = self._validate_input_new(final_content, storage_params, force)
# 1.2. modify final_content with a serialized version
final_content.clear()
final_content.update(serialized)
if self.db.get_list("nsds", _filter):
raise EngineException("There is soame NSD that depends on this VNFD", http_code=HTTPStatus.CONFLICT)
- def _validate_input_new(self, indata, force=False):
+ def _validate_input_new(self, indata, storage_params, force=False):
indata = self.pyangbind_validation("vnfds", indata, force)
# Cross references validation in the descriptor
if indata.get("vdu"):
.format(vdu["id"], interface["name"],
interface["internal-connection-point-ref"]),
http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ # Validate that if descriptor contains charms, artifacts _admin.storage."pkg-dir" is not none
+ if vdu.get("vdu-configuration"):
+ if vdu["vdu-configuration"].get("juju"):
+ if not self._validate_package_folders(storage_params, 'charms'):
+ raise EngineException("Charm defined in vnf[id={}]:vdu[id={}] but not present in "
+ "package".format(indata["id"], vdu["id"]))
+ # Validate that if descriptor contains cloud-init, artifacts _admin.storage."pkg-dir" is not none
+ if vdu.get("cloud-init-file"):
+ if not self._validate_package_folders(storage_params, 'cloud_init', vdu["cloud-init-file"]):
+ raise EngineException("Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
+ "package".format(indata["id"], vdu["id"]))
+ # Validate that if descriptor contains charms, artifacts _admin.storage."pkg-dir" is not none
+ if indata.get("vnf-configuration"):
+ if indata["vnf-configuration"].get("juju"):
+ if not self._validate_package_folders(storage_params, 'charms'):
+ raise EngineException("Charm defined in vnf[id={}] but not present in "
+ "package".format(indata["id"]))
for ivld in get_iterable(indata.get("internal-vld")):
for icp in get_iterable(ivld.get("internal-connection-point")):
icp_mark = False
"vnf-configuration:config-primitive:name"
.format(sgd["name"], sca["vnf-config-primitive-name-ref"]),
http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- # TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
return indata
def _validate_input_edit(self, indata, force=False):
# not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
return indata
+ def _validate_package_folders(self, storage_params, folder, file=None):
+ if not storage_params or not storage_params.get("pkg-dir"):
+ return False
+ else:
+ if self.fs.file_exists("{}_".format(storage_params["folder"]), 'dir'):
+ f = "{}_/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
+ else:
+ f = "{}/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
+ if file:
+ return self.fs.file_exists("{}/{}".format(f, file), 'file')
+ else:
+ if self.fs.file_exists(f, 'dir'):
+ if self.fs.dir_ls(f):
+ return True
+ return False
+
class NsdTopic(DescriptorTopic):
topic = "nsds"
clean_indata = clean_indata['nsd:nsd'][0]
return clean_indata
- def _validate_input_new(self, indata, force=False):
+ def _validate_input_new(self, indata, storage_params, force=False):
indata = self.pyangbind_validation("nsds", indata, force)
# Cross references validation in the descriptor
# TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
# TODO validate with pyangbind, serialize
return indata
- def _validate_input_new(self, indata, force=False):
+ def _validate_input_new(self, indata, storage_params, force=False):
indata = self.pyangbind_validation("nsts", indata, force)
return indata.copy()