import yaml
import json
import copy
+import os
+import shutil
# import logging
from hashlib import md5
class DescriptorTopic(BaseTopic):
def __init__(self, db, fs, msg, auth):
+
BaseTopic.__init__(self, db, fs, msg, auth)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
"""
self.fs.file_delete(_id, ignore_non_exist=True)
self.fs.file_delete(_id + "_", ignore_non_exist=True) # remove temp folder
+ # Remove file revisions
+ if "revision" in db_content["_admin"]:
+ revision = db_content["_admin"]["revision"]
+ while revision > 0:
+ self.fs.file_delete(_id + ":" + str(revision), ignore_non_exist=True)
+ revision = revision - 1
+
@staticmethod
def get_one_by_id(db, session, topic, id):
# Avoid override in this case as the target is userDefinedData, but not vnfd,nsd descriptors
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
- content = {"_admin": {"userDefinedData": indata}}
+ content = {"_admin": {
+ "userDefinedData": indata,
+ "revision": 0
+ }}
+
self.format_on_new(
content, session["project_id"], make_public=session["public"]
)
elif not filename:
filename = "package"
+ revision = 1
+ if "revision" in current_desc["_admin"]:
+ revision = current_desc["_admin"]["revision"] + 1
+
# TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
file_pkg = None
error_text = ""
total = int(content_range[3])
else:
start = 0
- temp_folder = (
- _id + "_"
+ # Rather than using a temp folder, we will store the package in a folder based on
+ # the current revision.
+ proposed_revision_path = (
+ _id + ":" + str(revision)
) # all the content is upload here and if ok, it is rename from id_ to is folder
if start:
- if not self.fs.file_exists(temp_folder, "dir"):
+ if not self.fs.file_exists(proposed_revision_path, "dir"):
raise EngineException(
"invalid Transaction-Id header", HTTPStatus.NOT_FOUND
)
else:
- self.fs.file_delete(temp_folder, ignore_non_exist=True)
- self.fs.mkdir(temp_folder)
+ self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(proposed_revision_path)
storage = self.fs.get_params()
storage["folder"] = _id
- file_path = (temp_folder, filename)
+ file_path = (proposed_revision_path, filename)
if self.fs.file_exists(file_path, "file"):
file_size = self.fs.file_size(file_path)
else:
)
storage["descriptor"] = descriptor_file_name
storage["zipfile"] = filename
- self.fs.file_extract(tar, temp_folder)
+ self.fs.file_extract(tar, proposed_revision_path)
with self.fs.file_open(
- (temp_folder, descriptor_file_name), "r"
+ (proposed_revision_path, descriptor_file_name), "r"
) as descriptor_file:
content = descriptor_file.read()
elif compressed == "zip":
)
storage["descriptor"] = descriptor_file_name
storage["zipfile"] = filename
- self.fs.file_extract(zipfile, temp_folder)
+ self.fs.file_extract(zipfile, proposed_revision_path)
with self.fs.file_open(
- (temp_folder, descriptor_file_name), "r"
+ (proposed_revision_path, descriptor_file_name), "r"
) as descriptor_file:
content = descriptor_file.read()
else:
error_text = "Invalid yaml format "
indata = yaml.load(content, Loader=yaml.SafeLoader)
+ # Need to close the file package here so it can be copied from the
+ # revision to the current, unrevisioned record
+ if file_pkg:
+ file_pkg.close()
+ file_pkg = None
+
+ # Fetch both the incoming, proposed revision and the original revision so we
+ # can call a validate method to compare them
+ current_revision_path = _id + "/"
+ self.fs.sync(from_path=current_revision_path)
+ self.fs.sync(from_path=proposed_revision_path)
+
+ if revision > 1:
+ try:
+ self._validate_descriptor_changes(
+ descriptor_file_name,
+ current_revision_path,
+ proposed_revision_path)
+ except Exception as e:
+ shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+ shutil.rmtree(self.fs.path + proposed_revision_path, ignore_errors=True)
+ # Only delete the new revision. We need to keep the original version in place
+ # as it has not been changed.
+ self.fs.file_delete(proposed_revision_path, ignore_non_exist=True)
+ raise e
+
+ # Copy the revision to the active package name by its original id
+ shutil.rmtree(self.fs.path + current_revision_path, ignore_errors=True)
+ os.rename(self.fs.path + proposed_revision_path, self.fs.path + current_revision_path)
+ self.fs.file_delete(current_revision_path, ignore_non_exist=True)
+ self.fs.mkdir(current_revision_path)
+ self.fs.reverse_sync(from_path=current_revision_path)
+ shutil.rmtree(self.fs.path + _id)
+
current_desc["_admin"]["storage"] = storage
current_desc["_admin"]["onboardingState"] = "ONBOARDED"
current_desc["_admin"]["operationalState"] = "ENABLED"
session, current_desc, indata, _id=_id
)
current_desc["_admin"]["modified"] = time()
+ current_desc["_admin"]["revision"] = revision
self.db.replace(self.topic, _id, current_desc)
- self.fs.dir_rename(temp_folder, _id)
+
+ # Store a copy of the package as a point in time revision
+ revision_desc = dict(current_desc)
+ revision_desc["_id"] = _id + ":" + str(revision_desc["_admin"]["revision"])
+ self.db.create(self.topic + "_revisions", revision_desc)
indata["_id"] = _id
self._send_msg("edited", indata)
return indata
+ def _validate_descriptor_changes(self,
+ descriptor_file_name,
+ old_descriptor_directory,
+ new_descriptor_directory):
+ # Todo: compare changes and throw a meaningful exception for the user to understand
+ # Example:
+ # raise EngineException(
+ # "Error in validating new descriptor: <NODE> cannot be modified",
+ # http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ # )
+ pass
class VnfdTopic(DescriptorTopic):
topic = "vnfds"
"""
super().delete_extra(session, _id, db_content, not_send_msg)
self.db.del_list("vnfpkgops", {"vnfPkgId": _id})
+ self.db.del_list(self.topic+"_revisions", {"_id": {"$regex": _id}})
def sol005_projection(self, data):
data["onboardingState"] = data["_admin"]["onboardingState"]
http_code=HTTPStatus.CONFLICT,
)
+ def delete_extra(self, session, _id, db_content, not_send_msg=None):
+ """
+ Deletes associate file system storage (via super)
+ Deletes associated vnfpkgops from database.
+ :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
+ :param _id: server internal id
+ :param db_content: The database content of the descriptor
+ :return: None
+ :raises: FsException in case of error while deleting associated storage
+ """
+ super().delete_extra(session, _id, db_content, not_send_msg)
+ self.db.del_list(self.topic+"_revisions", { "_id": { "$regex": _id}})
+
def sol005_projection(self, data):
data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
data["nsdOperationalState"] = data["_admin"]["operationalState"]
)
if vnfd_id not in needed_vnfds:
vnfd = self._get_vnfd_from_db(vnfd_id, session)
+ if "revision" in vnfd["_admin"]:
+ vnfd["revision"] = vnfd["_admin"]["revision"]
+ vnfd.pop("_admin")
needed_vnfds[vnfd_id] = vnfd
nsr_descriptor["vnfd-id"].append(vnfd["_id"])
else:
_filter = self._get_project_filter(session)
_filter["id"] = vnfd_id
vnfd = self.db.get_one("vnfds", _filter, fail_on_empty=True, fail_on_more=True)
- vnfd.pop("_admin")
return vnfd
def _add_nsr_to_db(self, nsr_descriptor, rollback, session):
)
vnfd = self._get_vnfd_from_db(vnf_profile.get("vnfd-id"), session)
+ vnfd.pop("_admin")
for vdu in vnfd.get("vdu", ()):
flavor_data = {}
"connection-point": [],
"ip-address": None, # mgmt-interface filled by LCM
}
+
+ # Revision backwards compatility. Only specify the revision in the record if
+ # the original VNFD has a revision.
+ if "revision" in vnfd:
+ vnfr_descriptor["revision"] = vnfd["revision"]
+
+
vnf_k8s_namespace = ns_k8s_namespace
if vnf_params:
if vnf_params.get("k8s-namespace"):
"Invalid parameter member_vnf_index='{}' is not one of the "
"nsd:constituent-vnfd".format(member_vnf_index)
)
- vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False)
+
+ ## Backwards compatibility: if there is no revision, get it from the one and only VNFD entry
+ if "revision" in vnfr:
+ vnfd_revision = vnfr["vnfd-id"] + ":" + str(vnfr["revision"])
+ vnfd = self.db.get_one("vnfds_revisions", {"_id": vnfd_revision}, fail_on_empty=False)
+ else:
+ vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}, fail_on_empty=False)
+
if not vnfd:
raise EngineException(
"vnfd id={} has been deleted!. Operation cannot be performed".format(
import unittest
from unittest import TestCase
-from unittest.mock import Mock
+from unittest.mock import Mock, patch
from uuid import uuid4
from http import HTTPStatus
from copy import deepcopy
self.topic = VnfdTopic(self.db, self.fs, self.msg, self.auth)
self.topic.check_quota = Mock(return_value=None) # skip quota
- def test_new_vnfd(self):
+ @patch("osm_nbi.descriptor_topics.shutil")
+ @patch("osm_nbi.descriptor_topics.os.rename")
+ def test_new_vnfd(self, mock_rename, mock_shutil):
did = db_vnfd_content["_id"]
+ self.fs.path = ""
self.fs.get_params.return_value = {}
self.fs.file_exists.return_value = False
self.fs.file_open.side_effect = lambda path, mode: open(
self.assertEqual(
storage["descriptor"], "package", "Wrong storage descriptor"
)
+ self.assertEqual(
+ admin["revision"], 1, "Wrong revision number"
+ )
+
compare_desc(self, test_vnfd, db_args[2], "VNFD")
finally:
test_vnfd["vdu"][0]["cloud-init-file"] = tmp1
norm(str(e.exception)),
"Wrong exception text",
)
+ db_args = self.db.replace.call_args[0]
+ admin = db_args[2]["_admin"]
+ self.assertEqual(
+ admin["revision"], 1, "Wrong revision number"
+ )
+
finally:
del test_vnfd["extra-property"]
with self.subTest(i=3, t="Check Pyangbind Validation: property types"):
"Wrong DB NSD vnfd-id",
)
+ db_del_args = self.db.del_list.call_args[0]
+ self.assertEqual(
+ self.db.del_list.call_args[0][0],
+ self.topic.topic+"_revisions",
+ "Wrong DB topic",
+ )
+
+ self.assertEqual(
+ self.db.del_list.call_args[0][1]['_id']['$regex'],
+ did,
+ "Wrong ID for rexep delete",
+ )
+
self.db.set_one.assert_not_called()
fs_del_calls = self.fs.file_delete.call_args_list
self.assertEqual(fs_del_calls[0][0][0], did, "Wrong FS file id")
"Wrong exception text",
)
+ def test_new_vnfd_revision(self):
+ did = db_vnfd_content["_id"]
+ self.fs.get_params.return_value = {}
+ self.fs.file_exists.return_value = False
+ self.fs.file_open.side_effect = lambda path, mode: open(
+ "/tmp/" + str(uuid4()), "a+b"
+ )
+ test_vnfd = deepcopy(db_vnfd_content)
+ del test_vnfd["_id"]
+ del test_vnfd["_admin"]
+ self.db.create.return_value = did
+ rollback = []
+ did2, oid = self.topic.new(rollback, fake_session, {})
+ db_args = self.db.create.call_args[0]
+ self.assertEqual(db_args[1]['_admin']['revision'], 0,
+ "New package should be at revision 0")
+
+ @patch("osm_nbi.descriptor_topics.shutil")
+ @patch("osm_nbi.descriptor_topics.os.rename")
+ def test_update_vnfd(self, mock_rename, mock_shutil):
+ old_revision = 5
+ did = db_vnfd_content["_id"]
+ self.fs.path = ""
+ self.fs.get_params.return_value = {}
+ self.fs.file_exists.return_value = False
+ self.fs.file_open.side_effect = lambda path, mode: open(
+ "/tmp/" + str(uuid4()), "a+b"
+ )
+ new_vnfd = deepcopy(db_vnfd_content)
+ del new_vnfd["_id"]
+ self.db.create.return_value = did
+ rollback = []
+ did2, oid = self.topic.new(rollback, fake_session, {})
+ del new_vnfd["vdu"][0]["cloud-init-file"]
+ del new_vnfd["df"][0]["lcm-operations-configuration"]["operate-vnf-op-config"][
+ "day1-2"][0]["execution-environment-list"][0]["juju"]
+
+
+ old_vnfd = {
+ "_id": did,
+ "_admin": deepcopy(db_vnfd_content["_admin"])
+ }
+ old_vnfd["_admin"]["revision"] = old_revision
+ self.db.get_one.side_effect = [
+ old_vnfd,
+ None,
+ ]
+ self.topic.upload_content(
+ fake_session, did, new_vnfd, {}, {"Content-Type": []}
+ )
+
+ db_args = self.db.replace.call_args[0]
+ self.assertEqual(db_args[2]['_admin']['revision'], old_revision + 1,
+ "Revision should increment")
+
class Test_NsdTopic(TestCase):
@classmethod
self.topic = NsdTopic(self.db, self.fs, self.msg, self.auth)
self.topic.check_quota = Mock(return_value=None) # skip quota
- def test_new_nsd(self):
+ @patch("osm_nbi.descriptor_topics.shutil")
+ @patch("osm_nbi.descriptor_topics.os.rename")
+ def test_new_nsd(self, mock_rename, mock_shutil):
did = db_nsd_content["_id"]
self.fs.get_params.return_value = {}
self.fs.file_exists.return_value = False
"Wrong read-write project list",
)
try:
+ self.fs.path = ""
self.db.get_one.side_effect = [
{"_id": did, "_admin": db_nsd_content["_admin"]},
None,
storage["descriptor"], "package", "Wrong storage descriptor"
)
compare_desc(self, test_nsd, db_args[2], "NSD")
+ revision_args = self.db.create.call_args[0]
+ self.assertEqual(
+ revision_args[0], self.topic.topic + "_revisions", "Wrong topic"
+ )
+ self.assertEqual(
+ revision_args[1]["id"], db_args[2]["id"], "Wrong revision id"
+ )
+ self.assertEqual(
+ revision_args[1]["_id"],
+ db_args[2]["_id"] + ":1",
+ "Wrong revision _id"
+ )
+
finally:
pass
self.db.get_one.side_effect = (
from osm_common.dbmemory import DbMemory
from osm_common.fsbase import FsBase
from osm_common.msgbase import MsgBase
+from osm_common import dbbase
from http import HTTPStatus
from osm_nbi.instance_topics import NsLcmOpTopic, NsrTopic
from osm_nbi.tests.test_db_descriptors import (
)
+class TestNsLcmOpTopicWithMock(unittest.TestCase):
+ def setUp(self):
+ self.db = Mock(dbbase.DbBase())
+ self.fs = Mock(FsBase())
+ self.fs.get_params.return_value = {"./fake/folder"}
+ self.fs.file_open = mock_open()
+ self.msg = Mock(MsgBase())
+ # create class
+ self.nslcmop_topic = NsLcmOpTopic(self.db, self.fs, self.msg, None)
+
+ def test_get_vnfd_from_vnf_member_revision(self):
+ test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
+ test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
+ self.db.get_one.side_effect = [test_vnfr, test_vnfd]
+ vnfr = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr['_id'])
+ self.assertEqual(self.db.get_one.call_args_list[0][0][0], 'vnfrs', "Incorrect first DB lookup")
+ self.assertEqual(self.db.get_one.call_args_list[1][0][0], 'vnfds', "Incorrect second DB lookup")
+
+ def test_get_vnfd_from_vnf_member_no_revision(self):
+ test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
+ test_vnfr['revision'] = 3
+ test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
+ self.db.get_one.side_effect = [test_vnfr, test_vnfd]
+ vnfr = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr['_id'])
+ self.assertEqual(self.db.get_one.call_args_list[0][0][0], 'vnfrs', "Incorrect first DB lookup")
+ self.assertEqual(self.db.get_one.call_args_list[1][0][0], 'vnfds_revisions', "Incorrect second DB lookup")
+
+
class TestNsrTopic(unittest.TestCase):
def setUp(self):
self.db = DbMemory()