schema_version
) or self.config_to_encrypt.get("default")
if edit_content.get("config") and config_to_encrypt_keys:
-
for p in config_to_encrypt_keys:
if edit_content["config"].get(p):
final_content["config"][p] = self.db.encrypt(
mapping["role"],
mapping["role_name"],
):
-
if mapping in mappings_to_remove: # do not remove
mappings_to_remove.remove(mapping)
break # do not add, it is already at user
(r for r in records if r["name"] == "system_admin"), None
):
with open(self.roles_to_operations_file, "r") as stream:
- roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
+ roles_to_operations_yaml = yaml.safe_load(stream)
role_names = []
for role_with_operations in roles_to_operations_yaml["roles"]:
:return: returns the id of the user in keystone.
"""
try:
-
if (
user_info.get("domain_name")
and user_info["domain_name"] in self.user_domain_ro_list
indata = json.load(content)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(content, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(content)
# Need to close the file package here so it can be copied from the
# revision to the current, unrevisioned record
with self.fs.file_open(
(old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as old_descriptor_file:
-
with self.fs.file_open(
(new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as new_descriptor_file:
-
old_content = yaml.safe_load(old_descriptor_file.read())
new_content = yaml.safe_load(new_descriptor_file.read())
with self.fs.file_open(
(old_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as old_descriptor_file:
-
with self.fs.file_open(
(new_descriptor_directory.rstrip("/"), descriptor_file_name), "r"
) as new_descriptor_file:
-
old_content = yaml.safe_load(old_descriptor_file.read())
new_content = yaml.safe_load(new_descriptor_file.read())
# "resources_to_operations file missing")
#
# with open(resources_to_operations_file, 'r') as f:
- # resources_to_operations = yaml.load(f, Loader=yaml.Loader)
+ # resources_to_operations = yaml.safeload(f)
#
# self.operations = []
#
vnfd_id_2update = indata["changeVnfPackageData"]["vnfdId"]
if vnf_instance_id not in nsr["constituent-vnfr-ref"]:
-
raise EngineException(
f"Error in validating ns-update request: vnf {vnf_instance_id} does not "
f"belong to NS {ns_instance_id}",
# Check the given vnfd-id belongs to given vnf instance
if constituent_vnfd_id and (vnfd_id_2update != constituent_vnfd_id):
-
raise EngineException(
f"Error in validating ns-update request: vnfd-id {vnfd_id_2update} does not "
f"match with the vnfd-id: {constituent_vnfd_id} of VNF instance: {vnf_instance_id}",
cherrypy.request.headers.pop("Content-File-MD5", None)
elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
error_text = "Invalid yaml format "
- indata = yaml.load(
- cherrypy.request.body, Loader=yaml.SafeLoader
- )
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
elif (
"application/binary" in cherrypy.request.headers["Content-Type"]
# "Only 'Content-Type' of type 'application/json' or
# 'application/yaml' for input format are available")
error_text = "Invalid yaml format "
- indata = yaml.load(
- cherrypy.request.body, Loader=yaml.SafeLoader
- )
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
if not indata:
indata = {}
kwargs[k] = None
elif format_yaml:
try:
- kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader)
+ kwargs[k] = yaml.safe_load(v)
except Exception:
pass
elif (
v[index] = None
elif format_yaml:
try:
- v[index] = yaml.load(v[index], Loader=yaml.SafeLoader)
+ v[index] = yaml.safe_load(v[index])
except Exception:
pass
return self._format_out(str(alarm_list))
# to handle patch request for alarm update
elif cherrypy.request.method == "PATCH":
- data = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ data = yaml.safe_load(cherrypy.request.body)
try:
# check if uuid is valid
self.engine.db.get_one("alarms", {"uuid": data.get("uuid")})
return_text = "<html><pre>{} ->\n".format(main_topic)
try:
if cherrypy.request.method == "POST":
- to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ to_send = yaml.safe_load(cherrypy.request.body)
for k, v in to_send.items():
self.engine.msg.write(main_topic, k, v)
return_text += " {}: {}\n".format(k, v)
elif cherrypy.request.method == "GET":
for k, v in kwargs.items():
- v_dict = yaml.load(v, Loader=yaml.SafeLoader)
+ v_dict = yaml.safe_load(v)
self.engine.msg.write(main_topic, k, v_dict)
return_text += " {}: {}\n".format(k, v_dict)
except Exception as e:
class NotificationBase:
-
response_models = None
# Common HTTP payload header for all notifications.
payload_header = {"Content-Type": "application/json", "Accept": "application/json"}
class NsLcmNotification(NotificationBase):
-
# SOL005 response model for nslcm notifications
response_models = {
"NsLcmOperationOccurrenceNotification": {
class NewVnfInstance(BaseMethod):
-
# sample ns descriptor
sample_nsd = {
"nsd": {
self.logger.debug("Starting")
while not self.to_terminate:
try:
-
self.loop.run_until_complete(
asyncio.ensure_future(self.start_kafka(), loop=self.loop)
)
]
def run(self, engine, test_osm, manual_check, test_params=None):
-
vim_bad = self.vim.copy()
vim_bad.pop("name")
keys=None,
timeout=0,
):
-
r = engine.test(
"GET VNFR IDs",
"GET",
ns_data.update(self.ns_params)
if test_params and test_params.get("ns-config"):
if isinstance(test_params["ns-config"], str):
- ns_data.update(yaml.load(test_params["ns-config"]), Loader=yaml.Loader)
+ ns_data.update(yaml.safe_load(test_params["ns-config"]))
else:
ns_data.update(test_params["ns-config"])
self.instantiate(engine, ns_data)
"2": ["ls -lrt /home/ubuntu/first-touch-2"],
}
self.descriptor_edit = {
- "vnfd0": yaml.load(
+ "vnfd0": yaml.safe_load(
"""
scaling-group-descriptor:
- name: "scale_dataVM"
"$[0]":
default-value: "<touch_filename2>"
""",
- Loader=yaml.Loader,
)
}
self.ns_params = {
}
if test_params and test_params.get("ns-config"):
if isinstance(test_params["ns-config"], str):
- ns_data.update(yaml.load(test_params["ns-config"]), Loader=yaml.Loader)
+ ns_data.update(yaml.safe_load(test_params["ns-config"]))
else:
ns_data.update(test_params["ns-config"])
test_user_id = engine.last_id if res else None
if test_project_id and test_user_id:
-
# Get user access
engine.token = None
engine.user = test_username
test_vim_ids += [engine.last_id if res else None]
if test_vim_ids[0]:
-
# Download descriptor files (if required)
test_dir = "/tmp/" + test_username + "/"
test_url = "https://osm-download.etsi.org/ftp/osm-6.0-six/7th-hackfest/packages/"
file.write(res.content)
if all([os.path.exists(test_dir + p) for p in desc_filenames]):
-
# Test VNFD Quotas
res = engine.test(
"Create test VNFD #1",
test_vnfd_ids[i] = None
if test_vnfd_ids[0] and test_vnfd_ids[1]:
-
# Test NSD Quotas
res = engine.test(
"Create test NSD #1",
test_nsd_ids[i] = None
if test_nsd_ids[0] and test_nsd_ids[1]:
-
# Test NSR Quotas
res = engine.test(
"Create test NSR #1",
self.nslcmop_topic = NsLcmOpTopic(self.db, self.fs, self.msg, None)
self.nslcmop_topic.check_quota = Mock(return_value=None) # skip quota
- self.db.create_list(
- "vim_accounts", yaml.load(db_vim_accounts_text, Loader=yaml.Loader)
- )
- self.db.create_list("nsds", yaml.load(db_nsds_text, Loader=yaml.Loader))
- self.db.create_list("vnfds", yaml.load(db_vnfds_text, Loader=yaml.Loader))
- self.db.create_list("vnfrs", yaml.load(db_vnfrs_text, Loader=yaml.Loader))
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("vim_accounts", yaml.safe_load(db_vim_accounts_text))
+ self.db.create_list("nsds", yaml.safe_load(db_nsds_text))
+ self.db.create_list("vnfds", yaml.safe_load(db_vnfds_text))
+ self.db.create_list("vnfrs", yaml.safe_load(db_vnfrs_text))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
self.db.create = Mock(return_value="created_id")
self.nsd = self.db.get_list("nsds")[0]
self.nsd_id = self.nsd["_id"]
self.nslcmop_topic = NsLcmOpTopic(self.db, self.fs, self.msg, None)
def test_get_vnfd_from_vnf_member_revision(self):
- test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
- test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
+ test_vnfr = yaml.safe_load(db_vnfrs_text)[0]
+ test_vnfd = yaml.safe_load(db_vnfds_text)
self.db.get_one.side_effect = [test_vnfr, test_vnfd]
_ = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr["_id"])
self.assertEqual(
)
def test_get_vnfd_from_vnf_member_no_revision(self):
- test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)[0]
+ test_vnfr = yaml.safe_load(db_vnfrs_text)[0]
test_vnfr["revision"] = 3
- test_vnfd = yaml.load(db_vnfds_text, Loader=yaml.Loader)
+ test_vnfd = yaml.safe_load(db_vnfds_text)
self.db.get_one.side_effect = [test_vnfr, test_vnfd]
_ = self.nslcmop_topic._get_vnfd_from_vnf_member_index("1", test_vnfr["_id"])
self.assertEqual(
session = {}
with self.subTest(i=1, t="VNF instance does not belong to NS"):
- test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)
+ test_vnfr = yaml.safe_load(db_vnfrs_text)
test_vnfr[0]["revision"] = 2
- test_nsr = yaml.load(db_nsrs_text, Loader=yaml.Loader)
+ test_nsr = yaml.safe_load(db_nsrs_text)
test_nsr[0]["constituent-vnfr-ref"][
0
] = "99d90b0c-faff-4b9f-bccd-017f33985984"
)
with self.subTest(i=2, t="Ns update request validated with no exception"):
- test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)
+ test_vnfr = yaml.safe_load(db_vnfrs_text)
test_vnfr[0]["revision"] = 2
- test_nsr = yaml.load(db_nsrs_text, Loader=yaml.Loader)
+ test_nsr = yaml.safe_load(db_nsrs_text)
self.db.create_list("vnfrs", test_vnfr)
self.db.create_list("nsrs", test_nsr)
nsrs = self.db.get_list("nsrs")[1]
)
with self.subTest(i=4, t="wrong vnfdid is given as an update parameter"):
- test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)
+ test_vnfr = yaml.safe_load(db_vnfrs_text)
test_vnfr[0]["revision"] = 2
- test_nsr = yaml.load(db_nsrs_text, Loader=yaml.Loader)
+ test_nsr = yaml.safe_load(db_nsrs_text)
self.db.create_list("vnfrs", test_vnfr)
self.db.create_list("nsrs", test_nsr)
nsrs = self.db.get_list("nsrs")[2]
with self.subTest(
i=5, t="Ns update REMOVE_VNF request validated with no exception"
):
- test_vnfr = yaml.load(db_vnfrs_text, Loader=yaml.Loader)
+ test_vnfr = yaml.safe_load(db_vnfrs_text)
test_vnfr[0]["revision"] = 2
- test_nsr = yaml.load(db_nsrs_text, Loader=yaml.Loader)
+ test_nsr = yaml.safe_load(db_nsrs_text)
self.db.create_list("vnfrs", test_vnfr)
self.db.create_list("nsrs", test_nsr)
nsrs = self.db.get_list("nsrs")[1]
self.nsr_topic = NsrTopic(self.db, self.fs, self.msg, None)
self.nsr_topic.check_quota = Mock(return_value=None) # skip quota
- self.db.create_list(
- "vim_accounts", yaml.load(db_vim_accounts_text, Loader=yaml.Loader)
- )
- self.db.create_list("nsds", yaml.load(db_nsds_text, Loader=yaml.Loader))
- self.db.create_list("vnfds", yaml.load(db_vnfds_text, Loader=yaml.Loader))
+ self.db.create_list("vim_accounts", yaml.safe_load(db_vim_accounts_text))
+ self.db.create_list("nsds", yaml.safe_load(db_nsds_text))
+ self.db.create_list("vnfds", yaml.safe_load(db_vnfds_text))
self.db.create = Mock(return_value="created_id")
self.nsd = self.db.get_list("nsds")[0]
self.nsd_id = self.nsd["_id"]
}
filter_q = {}
for refresh_status in ("true", "false"):
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
actual_nsr = self.db.get_list("nsrs")[0]
nsr_id = actual_nsr["_id"]
filter_q["vcaStatus-refresh"] = refresh_status
}
filter_q = {"vcaStatus-refresh": "true"}
time_delta = 120
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
nsr = self.db.get_list("nsrs")[0]
# When vcaStatus-refresh is true
)
def test_delete_ns(self):
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
self.nsr = self.db.get_list("nsrs")[0]
self.nsr_id = self.nsr["_id"]
self.db_set_one = self.db.set_one
self.msg = Mock(MsgBase())
self.vnfinstances = VnfInstances(self.db, self.fs, self.msg, None)
self.nsrtopic = NsrTopic(self.db, self.fs, self.msg, None)
- self.db.create_list(
- "vim_accounts", yaml.load(db_vim_accounts_text, Loader=yaml.Loader)
- )
- self.db.create_list("vnfds", yaml.load(db_vnfm_vnfd_text, Loader=yaml.Loader))
+ self.db.create_list("vim_accounts", yaml.safe_load(db_vim_accounts_text))
+ self.db.create_list("vnfds", yaml.safe_load(db_vnfm_vnfd_text))
self.vnfd = self.db.get_list("vnfds")[0]
self.vnfd_id = self.vnfd["id"]
self.vnfd_project = self.vnfd["_admin"]["projects_read"][0]
"method": "write",
}
filter_q = {}
- self.db.create_list("vnfrs", yaml.load(db_vnfrs_text, Loader=yaml.Loader))
+ self.db.create_list("vnfrs", yaml.safe_load(db_vnfrs_text))
actual_vnfr = self.db.get_list("vnfrs")[0]
id = actual_vnfr["_id"]
expected_vnfr = self.vnfinstances.show(session, id, filter_q)
"project_id": [self.vnfd_project],
"method": "delete",
}
- self.db.create_list("vnfrs", yaml.load(db_vnfrs_text, Loader=yaml.Loader))
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
- self.db.create_list("nsds", yaml.load(db_nsds_text, Loader=yaml.Loader))
+ self.db.create_list("vnfrs", yaml.safe_load(db_vnfrs_text))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
+ self.db.create_list("nsds", yaml.safe_load(db_nsds_text))
self.vnfr = self.db.get_list("vnfrs")[0]
self.vnfr_id = self.vnfr["_id"]
self.vnflcmop_topic = VnfLcmOpTopic(self.db, self.fs, self.msg, None)
self.vnflcmop_topic.check_quota = Mock(return_value=None) # skip quota
- self.db.create_list(
- "vim_accounts", yaml.load(db_vim_accounts_text, Loader=yaml.Loader)
- )
- self.db.create_list("nsds", yaml.load(db_nsds_text, Loader=yaml.Loader))
- self.db.create_list("vnfds", yaml.load(db_vnfm_vnfd_text, Loader=yaml.Loader))
- self.db.create_list("vnfrs", yaml.load(db_vnfrs_text, Loader=yaml.Loader))
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("vim_accounts", yaml.safe_load(db_vim_accounts_text))
+ self.db.create_list("nsds", yaml.safe_load(db_nsds_text))
+ self.db.create_list("vnfds", yaml.safe_load(db_vnfm_vnfd_text))
+ self.db.create_list("vnfrs", yaml.safe_load(db_vnfrs_text))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
self.vnfd = self.db.get_list("vnfds")[0]
self.vnfd_id = self.vnfd["_id"]
"project_id": [self.vnfd_project],
"method": "write",
}
- self.db.create_list("nslcmops", yaml.load(db_nslcmops_text, Loader=yaml.Loader))
+ self.db.create_list("nslcmops", yaml.safe_load(db_nslcmops_text))
filter_q = {}
actual_lcmop = self.db.get_list("nslcmops")[0]
id = actual_lcmop["_id"]
def setUp(self):
self.db = DbMemory()
self.pmjobs_topic = PmJobsTopic(self.db, host="prometheus", port=9091)
- self.db.create_list("nsds", yaml.load(db_nsds_text, Loader=yaml.Loader))
- self.db.create_list("vnfds", yaml.load(db_vnfds_text, Loader=yaml.Loader))
- self.db.create_list("vnfrs", yaml.load(db_vnfrs_text, Loader=yaml.Loader))
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("nsds", yaml.safe_load(db_nsds_text))
+ self.db.create_list("vnfds", yaml.safe_load(db_vnfds_text))
+ self.db.create_list("vnfrs", yaml.safe_load(db_vnfrs_text))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
self.nsr = self.db.get_list("nsrs")[0]
self.nsr_id = self.nsr["_id"]
project_id = self.nsr["_admin"]["projects_write"]
for metric in metric_list:
endpoint = re.sub(r"metric_name", metric, site)
if metric == "cpu_utilization":
- response = yaml.load(cpu_utilization, Loader=yaml.Loader)
+ response = yaml.safe_load(cpu_utilization)
elif metric == "users":
- response = yaml.load(users, Loader=yaml.Loader)
+ response = yaml.safe_load(users)
elif metric == "load":
- response = yaml.load(load, Loader=yaml.Loader)
+ response = yaml.safe_load(load)
else:
- response = yaml.load(empty, Loader=yaml.Loader)
+ response = yaml.safe_load(empty)
mock_res.get(endpoint, payload=response)
async def test_prom_metric_request(self):
with self.subTest("Test case1 failed in test_prom"):
- prom_response = yaml.load(prom_res, Loader=yaml.Loader)
+ prom_response = yaml.safe_load(prom_res)
with aioresponses() as mock_res:
self.set_get_mock_res(mock_res, self.nsr_id, self.metric_check_list)
result = await self.pmjobs_topic._prom_metric_request(
def test_show(self):
with self.subTest("Test case1 failed in test_show"):
- show_response = yaml.load(show_res, Loader=yaml.Loader)
+ show_response = yaml.safe_load(show_res)
with aioresponses() as mock_res:
self.set_get_mock_res(mock_res, self.nsr_id, self.metric_check_list)
result = self.pmjobs_topic.show(self.session, self.nsr_id)
coverage report --omit='*tests*'
coverage html -d ./cover --omit='*tests*'
coverage xml -o coverage.xml --omit=*tests*
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################