.pydevproject
.settings
+#vscode
+.vscode
+
#local stuff files that end in ".local" or folders called "local"
local
osm_nbi/local
ADD . /app/NBI
# Run app.py when the container launches
-CMD python3 -m osm_nbi.nbi
+CMD ["python3", "-m", "osm_nbi.nbi"]
if not session["force"] and edit_content.get("name"):
self.check_unique_name(session, edit_content["name"], _id=_id)
+ return final_content
+
def format_on_edit(self, final_content, edit_content):
"""
Modifies final_content inserting admin information upon edition
return oid
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super(CommonVimWimSdn, self).check_conflict_on_edit(session, final_content, edit_content, _id)
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super(CommonVimWimSdn, self).check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
# Update Helm/Juju Repo lists
repos = {"helm-chart": [], "juju-bundle": []}
for proj in session.get("set_project", []):
if rlist not in final_content["_admin"]:
final_content["_admin"][rlist] = []
final_content["_admin"][rlist] += repos[k]
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
raise EngineException("You cannot remove system_admin role from admin user",
http_code=HTTPStatus.FORBIDDEN)
+ return final_content
+
def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
if not content:
content = self.show(session, _id)
indata = self._validate_input_edit(indata, content, force=session["force"])
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
# self.format_on_edit(content, indata)
if not ("password" in indata or "username" in indata or indata.get("remove_project_role_mappings") or
# Check that project name is not used, regardless keystone already checks this
if project_name and self.auth.get_project_list(filter_q={"name": project_name}):
raise EngineException("project '{}' is already used".format(project_name), HTTPStatus.CONFLICT)
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
if not content:
content = self.show(session, _id)
indata = self._validate_input_edit(indata, content, force=session["force"])
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)
content_original = copy.deepcopy(content)
deep_update_rfc7396(content, indata)
if roles and roles[0][BaseTopic.id_field("roles", _id)] != _id:
raise EngineException("role name '{}' exists".format(role_name), HTTPStatus.CONFLICT)
+ return final_content
+
def check_conflict_on_del(self, session, _id, db_content):
"""
Check if deletion can be done because of dependencies if it is not force. To override
content = self.show(session, _id)
indata = self._validate_input_edit(indata, content, force=session["force"])
deep_update_rfc7396(content, indata)
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
self.format_on_edit(content, indata)
self.auth.update_role(content)
except ValidationError as e:
:param final_content: data once modified. This method may change it.
:param edit_content: incremental data that contains the modifications to apply
:param _id: internal _id
- :return: None or raises EngineException
+ :return: final_content or raises EngineException
"""
if not self.multiproject:
- return
+ return final_content
# Change public status
if session["public"] is not None:
if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
if p not in final_content["_admin"]["projects_read"]:
final_content["_admin"]["projects_read"].append(p)
+ return final_content
+
def check_unique_name(self, session, name, _id=None):
"""
Check that the name is unique for this project
if indata and session.get("set_project"):
raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
HTTPStatus.UNPROCESSABLE_ENTITY)
-
# TODO self._check_edition(session, indata, _id, force)
if not content:
content = self.show(session, _id)
-
indata = self._validate_input_edit(indata, content, force=session["force"])
-
deep_update_rfc7396(content, indata)
# To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
_id = content.get("_id") or _id
- self.check_conflict_on_edit(session, content, indata, _id=_id)
+ content = self.check_conflict_on_edit(session, content, indata, _id=_id)
op_id = self.format_on_edit(content, indata)
self.db.replace(self.topic, _id, content)
import yaml
import json
import importlib
+import copy
# import logging
from hashlib import md5
from osm_common.dbbase import DbException, deep_update_rfc7396
from osm_im.nst import nst as nst_im
from pyangbind.lib.serialise import pybindJSONDecoder
import pyangbind.lib.pybindJSON as pybindJSON
+from osm_nbi.utils import deep_update_dict
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
BaseTopic.__init__(self, db, fs, msg, auth)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
def _check_unique_id_name(descriptor, position=""):
for desc_key, desc_item in descriptor.items():
internal_keys[k] = final_content.pop(k)
storage_params = internal_keys["_admin"].get("storage")
serialized = self._validate_input_new(final_content, storage_params, session["force"])
+
# 1.2. modify final_content with a serialized version
- final_content.clear()
- final_content.update(serialized)
+ final_content = copy.deepcopy(serialized)
# 1.3. restore internal keys
for k, v in internal_keys.items():
final_content[k] = v
if session["force"]:
- return
+ return final_content
+
# 2. check that this id is not present
if "id" in edit_content:
_filter = self._get_project_filter(session)
+
_filter["id"] = final_content["id"]
_filter["_id.neq"] = _id
+
if self.db.get_one(self.topic, _filter, fail_on_empty=False):
raise EngineException("{} with id '{}' already exists for this project".format(self.topic[:-1],
final_content["id"]),
HTTPStatus.CONFLICT)
+ return final_content
+
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
BaseTopic.format_on_new(content, project_id=project_id, make_public=make_public)
self._update_input_with_kwargs(indata, kwargs)
deep_update_rfc7396(current_desc, indata)
- self.check_conflict_on_edit(session, current_desc, indata, _id=_id)
+ current_desc = self.check_conflict_on_edit(session, current_desc, indata, _id=_id)
current_desc["_admin"]["modified"] = time()
self.db.replace(self.topic, _id, current_desc)
self.fs.dir_rename(temp_folder, _id)
raise EngineException("ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
try:
- virtual_compute_descriptors = data.get('virtual-compute-desc')
- virtual_storage_descriptors = data.get('virtual-storage-desc')
myvnfd = etsi_nfv_vnfd.etsi_nfv_vnfd()
pybindJSONDecoder.load_ietf_json({'etsi-nfv-vnfd:vnfd': data}, None, None, obj=myvnfd,
path_helper=True, skip_unknown=force)
out = pybindJSON.dumps(myvnfd, mode="ietf")
desc_out = self._remove_envelop(yaml.safe_load(out))
desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
- if virtual_compute_descriptors:
- desc_out['virtual-compute-desc'] = virtual_compute_descriptors
- if virtual_storage_descriptors:
- desc_out['virtual-storage-desc'] = virtual_storage_descriptors
- return desc_out
+ return deep_update_dict(data, desc_out)
except Exception as e:
raise EngineException("Error in pyangbind validation: {}".format(str(e)),
http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
return clean_indata
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
# set type of vnfd
contains_pdu = False
elif contains_vdu:
final_content["_admin"]["type"] = "vnfd"
# if neither vud nor pdu do not fill type
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
self._check_descriptor_dependencies(session, final_content)
+ return final_content
+
def check_conflict_on_del(self, session, _id, db_content):
"""
Check that there is not any NSR that uses this NSD. Only NSRs belonging to this project are considered. Note
"existing nsd".format(nsd_id), http_code=HTTPStatus.CONFLICT)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
self._check_descriptor_dependencies(session, final_content)
+ return final_content
def check_conflict_on_del(self, session, _id, db_content):
"""
step = "validating input parameters"
ns_request = self._remove_envelop(indata)
self._update_input_with_kwargs(ns_request, kwargs)
- self._validate_input_new(ns_request, session["force"])
+ ns_request = self._validate_input_new(ns_request, session["force"])
step = "getting nsd id='{}' from database".format(ns_request.get("nsdId"))
nsd = self._get_nsd_from_db(ns_request["nsdId"], session)
slice_request = self._remove_envelop(indata)
# Override descriptor with query string kwargs
self._update_input_with_kwargs(slice_request, kwargs)
- self._validate_input_new(slice_request, session["force"])
+ slice_request = self._validate_input_new(slice_request, session["force"])
# look for nstd
step = "getting nstd id='{}' from database".format(slice_request.get("nstId"))
elif o == "--fail-fast":
fail_fast = True
elif o == "--test":
- # print("asdfadf", o, a, a.split(","))
for _test in a.split(","):
if _test not in test_classes:
print("Invalid test name '{}'. Use option '--list' to show available tests".format(_test),
del test_vnfd["vdu"][0]["cloud-init-file"]
del test_vnfd["vnf-configuration"][0]["juju"]
try:
- self.db.get_one.side_effect = [{"_id": did, "_admin": db_vnfd_content["_admin"]}, None]
+ self.db.get_one.side_effect = [{"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])}, None]
self.topic.upload_content(fake_session, did, test_vnfd, {}, {"Content-Type": []})
msg_args = self.msg.write.call_args[0]
test_vnfd["_id"] = did
self.assertEqual(db_args[0], self.topic.topic, "Wrong DB topic")
self.assertEqual(db_args[1], did, "Wrong DB VNFD id")
admin = db_args[2]["_admin"]
- db_admin = db_vnfd_content["_admin"]
+ db_admin = deepcopy(db_vnfd_content["_admin"])
self.assertEqual(admin["type"], "vnfd", "Wrong descriptor type")
self.assertEqual(admin["created"], db_admin["created"], "Wrong creation time")
self.assertGreater(admin["modified"], db_admin["created"], "Wrong modification time")
test_vnfd["vdu"][0]["cloud-init-file"] = tmp1
test_vnfd["vnf-configuration"][0]["juju"] = tmp2
self.db.get_one.side_effect = lambda table, filter, fail_on_empty=None, fail_on_more=None: \
- {"_id": did, "_admin": db_vnfd_content["_admin"]}
+ {"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])}
with self.subTest(i=2, t='Check Pyangbind Validation: additional properties'):
test_vnfd["extra-property"] = 0
try:
test_vnfd['vnf-configuration'][0]['config-primitive'] = tmp
with self.subTest(i=15, t='Check Input Validation: everything right'):
test_vnfd["id"] = "fake-vnfd-id"
- self.db.get_one.side_effect = [{"_id": did, "_admin": db_vnfd_content["_admin"]}, None]
+ self.db.get_one.side_effect = [{"_id": did, "_admin": deepcopy(db_vnfd_content["_admin"])}, None]
rc = self.topic.upload_content(fake_session, did, test_vnfd, {}, {"Content-Type": []})
self.assertTrue(rc, "Input Validation: Unexpected failure")
return
self.fs.dir_ls.return_value = True
with self.subTest(i=1, t='Normal Edition'):
now = time()
- self.db.get_one.side_effect = [vnfd_content, None]
+ self.db.get_one.side_effect = [deepcopy(vnfd_content), None]
data = {"id": "new-vnfd-id", "product-name": "new-vnfd-name"}
self.topic.edit(fake_session, did, data)
db_args = self.db.replace.call_args[0]
self.assertEqual(db_args[2]["product-name"], data["product-name"], "Wrong VNFD Name")
with self.subTest(i=2, t='Conflict on Edit'):
data = {"id": "fake-vnfd-id", "product-name": "new-vnfd-name"}
- self.db.get_one.side_effect = [vnfd_content, {"_id": str(uuid4()), "id": data["id"]}]
+ self.db.get_one.side_effect = [deepcopy(vnfd_content), {"_id": str(uuid4()), "id": data["id"]}]
with self.assertRaises(EngineException, msg="Accepted existing VNFD ID") as e:
self.topic.edit(fake_session, did, data)
self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
self.fs.dir_ls.return_value = True
with self.subTest(i=1, t='Normal Edition'):
now = time()
- self.db.get_one.side_effect = [nsd_content, None]
+ self.db.get_one.side_effect = [deepcopy(nsd_content), None]
self.db.get_list.return_value = [db_vnfd_content]
data = {"id": "new-nsd-id", "name": "new-nsd-name"}
self.topic.edit(fake_session, did, data)
nsd_descriptor['df'][0]['vnf-profile'][1]['vnfd-id'] = invalid_vnfd_id
with self.assertRaises(EngineException) as e:
self.db.get_list.return_value = []
- self.topic.check_conflict_on_edit(fake_session, nsd_descriptor, [], 'id')
+ nsd_descriptor = self.topic.check_conflict_on_edit(fake_session, nsd_descriptor, [], 'id')
self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
self.assertIn(norm("Descriptor error at 'vnfd-id'='{}' references a non "
"existing vnfd".format(invalid_vnfd_id)),
return index
else:
return -1
+
+
+def deep_update_dict(data, updated_data):
+ if isinstance(data, list):
+ processed_items_data = []
+ for index, item in enumerate(data):
+ processed_items_data.append(deep_update_dict(item, updated_data[index]))
+ return processed_items_data
+
+ if isinstance(data, dict):
+ for key in data.keys():
+ if key in updated_data:
+ if not isinstance(data[key], dict) and not isinstance(data[key], list):
+ data[key] = updated_data[key]
+ else:
+ data[key] = deep_update_dict(data[key], updated_data[key])
+ return data
+
+ return data