# no yes -> error
# onefile yes no -> zip
# X yes -> text
-
- if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"):
+ contain_many_files = False
+ if storage.get('pkg-dir'):
+ # check if there are more than one file in the package, ignoring checksums.txt.
+ pkg_files = self.fs.dir_ls((storage['folder'], storage['pkg-dir']))
+ if len(pkg_files) >= 3 or (len(pkg_files) == 2 and 'checksums.txt' not in pkg_files):
+ contain_many_files = True
+ if accept_text and (not contain_many_files or path == "$DESCRIPTOR"):
return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
- elif storage.get('pkg-dir') and not accept_zip:
+ elif contain_many_files and not accept_zip:
raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
"Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
else:
http_code=HTTPStatus.CONFLICT)
def _validate_input_new(self, indata, storage_params, force=False):
+ indata.pop("onboardingState", None)
+ indata.pop("operationalState", None)
+ indata.pop("usageState", None)
+
+ indata.pop("links", None)
+
indata = self.pyangbind_validation("vnfds", indata, force)
# Cross references validation in the descriptor
if indata.get("vdu"):
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
+
+ def sol005_projection(self, data):
+ data["onboardingState"] = data["_admin"]["onboardingState"]
+ data["operationalState"] = data["_admin"]["operationalState"]
+ data["usageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/vnfpkgm/v1/vnf_packages/{}".format(data["_id"])}
+ links["vnfd"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(data["_id"])}
+ links["packageContent"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/package_content".format(data["_id"])}
+ data["_links"] = links
+
+ return super().sol005_projection(data)
class NsdTopic(DescriptorTopic):
return clean_indata
def _validate_input_new(self, indata, storage_params, force=False):
+ indata.pop("nsdOnboardingState", None)
+ indata.pop("nsdOperationalState", None)
+ indata.pop("nsdUsageState", None)
+
+ indata.pop("links", None)
+
indata = self.pyangbind_validation("nsds", indata, force)
# Cross references validation in the descriptor
# TODO validata that if contains cloud-init-file or charms, have artifacts _admin.storage."pkg-dir" is not none
if self.db.get_list("nsts", _filter):
raise EngineException("There is at least one NetSlice Template referencing this descriptor",
http_code=HTTPStatus.CONFLICT)
+
+ def sol005_projection(self, data):
+ data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
+ data["nsdOperationalState"] = data["_admin"]["operationalState"]
+ data["nsdUsageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/nsd/v1/ns_descriptors/{}".format(data["_id"])}
+ links["nsd_content"] = {"href": "/nsd/v1/ns_descriptors/{}/nsd_content".format(data["_id"])}
+ data["_links"] = links
+
+ return super().sol005_projection(data)
class NstTopic(DescriptorTopic):
return clean_indata
def _validate_input_new(self, indata, storage_params, force=False):
+ indata.pop("onboardingState", None)
+ indata.pop("operationalState", None)
+ indata.pop("usageState", None)
indata = self.pyangbind_validation("nsts", indata, force)
return indata.copy()
raise EngineException("there is at least one Netslice Instance using this descriptor",
http_code=HTTPStatus.CONFLICT)
+ def sol005_projection(self, data):
+ data["onboardingState"] = data["_admin"]["onboardingState"]
+ data["operationalState"] = data["_admin"]["operationalState"]
+ data["usageState"] = data["_admin"]["usageState"]
+
+ links = {}
+ links["self"] = {"href": "/nst/v1/netslice_templates/{}".format(data["_id"])}
+ links["nst"] = {"href": "/nst/v1/netslice_templates/{}/nst".format(data["_id"])}
+ data["_links"] = links
+
+ return super().sol005_projection(data)
+
class PduTopic(BaseTopic):
topic = "pdus"