class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
-
BaseTopic.__init__(self, db, fs, msg, auth)
+ def _validate_input_new(self, indata, storage_params, force=False):
+ return indata
+
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
final_content = super().check_conflict_on_edit(
session, final_content, edit_content, _id
if self.db.get_one(self.topic, _filter, fail_on_empty=False):
raise EngineException(
"{} with id '{}' already exists for this project".format(
- self.topic[:-1], final_content["id"]
+ (str(self.topic))[:-1], final_content["id"]
),
HTTPStatus.CONFLICT,
)
self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
revision = revision - 1
-
@staticmethod
def get_one_by_id(db, session, topic, id):
# find owned by this project
# Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
- content = {"_admin": {
- "userDefinedData": indata,
- "revision": 0
- }}
+ content = {"_admin": {"userDefinedData": indata, "revision": 0}}
self.format_on_new(
content, session["project_id"], make_public=session["public"]
or "application/x-gzip" in content_type
):
compressed = "gzip"
- if (
- content_type
- and "application/zip" in content_type
- ):
+ if content_type and "application/zip" in content_type:
compressed = "zip"
filename = headers.get("Content-Filename")
if not filename and compressed:
)
if (
- (
- zipfilename.endswith(".yaml")
- or zipfilename.endswith(".json")
- or zipfilename.endswith(".yml")
- ) and (
- zipfilename.find("/") < 0
- or zipfilename.find("Definitions") >= 0
- )
+ zipfilename.endswith(".yaml")
+ or zipfilename.endswith(".json")
+ or zipfilename.endswith(".yml")
+ ) and (
+ zipfilename.find("/") < 0
+ or zipfilename.find("Definitions") >= 0
):
storage["pkg-dir"] = ""
if descriptor_file_name:
indata = json.load(content)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(content, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(content)
# Need to close the file package here so it can be copied from the
# revision to the current, unrevisioned record
proposed_revision_path,
)
except Exception as e:
- shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
- shutil.rmtree(self.fs.path + proposed_revision_path, ignore_errors=True)
+ shutil.rmtree(
+ self.fs.path + current_revision_path, ignore_errors=True
+ )
+ shutil.rmtree(
+ self.fs.path + proposed_revision_path, ignore_errors=True
+ )
# Only delete the new revision. We need to keep the original version in place
# as it has not been changed.
self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
raise e
-
indata = self._remove_envelop(indata)
# Override descriptor with query string kwargs
# Copy the revision to the active package name by its original id
shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
- os.rename(self.fs.path + proposed_revision_path, self.fs.path + current_revision_path)
+ os.rename(
+ self.fs.path + proposed_revision_path,
+ self.fs.path + current_revision_path,
+ )
self.fs.file_delete(current_revision_path, ignore_non_exist=True)
self.fs.mkdir(current_revision_path)
self.fs.reverse_sync(from_path=current_revision_path)
descriptor_id,
descriptor_file_name,
old_descriptor_directory,
- new_descriptor_directory
+ new_descriptor_directory,
):
# Example:
# raise EngineException(
# )
pass
+
class VnfdTopic(DescriptorTopic):
topic = "vnfds"
topic_msg = "vnfd"
return False
elif not storage_params.get("pkg-dir"):
if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
- f = "{}_/{}".format(
- storage_params["folder"], folder
- )
+ f = "{}_/{}".format(storage_params["folder"], folder)
else:
- f = "{}/{}".format(
- storage_params["folder"], folder
- )
+ f = "{}/{}".format(storage_params["folder"], folder)
if file:
return self.fs.file_exists("{}/{}".format(f, file), "file")
else:
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
- self.db.del_list(self.topic+"_revisions", {"_id": {"$regex": _id}})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
def sol005_projection(self, data):
data["onboardingState"] = data["_admin"]["onboardingState"]
"""
for df in vnfd.get("df", {}):
for policy in ["scaling-aspect", "healing-aspect"]:
- if (df.get(policy, {})):
+ if df.get(policy, {}):
df.pop(policy)
for vdu in vnfd.get("vdu", {}):
for alarm_policy in ["alarm", "monitoring-parameter"]:
- if (vdu.get(alarm_policy, {})):
+ if vdu.get(alarm_policy, {}):
vdu.pop(alarm_policy)
return vnfd
with self.fs.file_open(
(old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as old_descriptor_file:
-
with self.fs.file_open(
(new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as new_descriptor_file:
-
old_content = yaml.safe_load(old_descriptor_file.read())
new_content = yaml.safe_load(new_descriptor_file.read())
:raises: FsException in case of error while deleting associated storage
"""
super().delete_extra(session, _id, db_content, not_send_msg)
- self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
+ self.db.del_list(self.topic + "_revisions", {"_id": {"$regex": _id}})
@staticmethod
def extract_day12_primitives(nsd: dict) -> dict:
with self.fs.file_open(
(old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as old_descriptor_file:
-
with self.fs.file_open(
(new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as new_descriptor_file:
-
old_content = yaml.safe_load(old_descriptor_file.read())
new_content = yaml.safe_load(new_descriptor_file.read())