X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Fdescriptor_topics.py;fp=osm_nbi%2Fdescriptor_topics.py;h=deae786f14ffdee736dd91fe792fa4eb4af29b91;hp=8efde1e1cff3bdeb0a909c2c4ffc1cb545f1386e;hb=c26740a54b2b1acf5663aab0b2c4753b9949504c;hpb=e395aa49c788ab9c776d16b49b75cf80a54af532 diff --git a/osm_nbi/descriptor_topics.py b/osm_nbi/descriptor_topics.py index 8efde1e..deae786 100644 --- a/osm_nbi/descriptor_topics.py +++ b/osm_nbi/descriptor_topics.py @@ -16,7 +16,6 @@ import tarfile import yaml import json -import importlib import copy # import logging @@ -26,6 +25,7 @@ from http import HTTPStatus from time import time from uuid import uuid4 from re import fullmatch +from zipfile import ZipFile from osm_nbi.validation import ( ValidationError, pdu_new_schema, @@ -232,12 +232,19 @@ class DescriptorTopic(BaseTopic): content_type and "application/gzip" in content_type or "application/x-gzip" in content_type - or "application/zip" in content_type ): compressed = "gzip" + if ( + content_type + and "application/zip" in content_type + ): + compressed = "zip" filename = headers.get("Content-Filename") - if not filename: - filename = "package.tar.gz" if compressed else "package" + if not filename and compressed: + filename = "package.tar.gz" if compressed == "gzip" else "package.zip" + elif not filename: + filename = "package" + # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266 file_pkg = None error_text = "" @@ -353,6 +360,47 @@ class DescriptorTopic(BaseTopic): storage["descriptor"] = descriptor_file_name storage["zipfile"] = filename self.fs.file_extract(tar, temp_folder) + with self.fs.file_open( + (temp_folder, descriptor_file_name), "r" + ) as descriptor_file: + content = descriptor_file.read() + elif compressed == "zip": + zipfile = ZipFile(file_pkg) + descriptor_file_name = None + for package_file in zipfile.infolist(): + zipfilename = package_file.filename + file_path = zipfilename.split("/") + if ( + not file_path[0] or ".." in zipfilename + ): # if start with "/" means absolute path + raise EngineException( + "Absolute path or '..' are not allowed for package descriptor zip" + ) + + if ( + ( + zipfilename.endswith(".yaml") + or zipfilename.endswith(".json") + or zipfilename.endswith(".yml") + ) and ( + zipfilename.find("/") < 0 + or zipfilename.find("Definitions") >= 0 + ) + ): + storage["pkg-dir"] = "" + if descriptor_file_name: + raise EngineException( + "Found more than one descriptor file at package descriptor zip" + ) + descriptor_file_name = zipfilename + if not descriptor_file_name: + raise EngineException( + "Not found any descriptor file at package descriptor zip" + ) + storage["descriptor"] = descriptor_file_name + storage["zipfile"] = filename + self.fs.file_extract(zipfile, temp_folder) + with self.fs.file_open( (temp_folder, descriptor_file_name), "r" ) as descriptor_file: @@ -791,6 +839,8 @@ class VnfdTopic(DescriptorTopic): ): if not self._validate_package_folders( storage_params, "charms" + ) and not self._validate_package_folders( + storage_params, "Scripts/charms" ): raise EngineException( "Charm defined in vnf[id={}] but not present in " @@ -802,6 +852,8 @@ class VnfdTopic(DescriptorTopic): return if not self._validate_package_folders( storage_params, "cloud_init", vdu["cloud-init-file"] + ) and not self._validate_package_folders( + storage_params, "Scripts/cloud_init", vdu["cloud-init-file"] ): raise EngineException( "Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in " @@ -826,14 +878,35 @@ class VnfdTopic(DescriptorTopic): day_1_2_config.get("execution-environment-list", []), lambda ee: "juju" in ee, ): - if not self._validate_package_folders(storage_params, "charms"): + if not self._validate_package_folders( + storage_params, "charms" + ) and not self._validate_package_folders( + storage_params, "Scripts/charms" + ): raise EngineException( "Charm defined in vnf[id={}] but not present in " "package".format(indata["id"]) ) def _validate_package_folders(self, storage_params, folder, file=None): - if not storage_params or not storage_params.get("pkg-dir"): + if not storage_params: + return False + elif not storage_params.get("pkg-dir"): + if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"): + f = "{}_/{}".format( + storage_params["folder"], folder + ) + else: + f = "{}/{}".format( + storage_params["folder"], folder + ) + if file: + return self.fs.file_exists("{}/{}".format(f, file), "file") + else: + f = f+"/" + if self.fs.file_exists(f, "dir"): + if self.fs.dir_ls(f): + return True return False else: if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):