def setUp(self):
self.db = DbMemory()
self.pmjobs_topic = PmJobsTopic(self.db, host="prometheus", port=9091)
- self.db.create_list("nsds", yaml.load(db_nsds_text, Loader=yaml.Loader))
- self.db.create_list("vnfds", yaml.load(db_vnfds_text, Loader=yaml.Loader))
- self.db.create_list("vnfrs", yaml.load(db_vnfrs_text, Loader=yaml.Loader))
- self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.db.create_list("nsds", yaml.safe_load(db_nsds_text))
+ self.db.create_list("vnfds", yaml.safe_load(db_vnfds_text))
+ self.db.create_list("vnfrs", yaml.safe_load(db_vnfrs_text))
+ self.db.create_list("nsrs", yaml.safe_load(db_nsrs_text))
self.nsr = self.db.get_list("nsrs")[0]
self.nsr_id = self.nsr["_id"]
project_id = self.nsr["_admin"]["projects_write"]
for metric in metric_list:
endpoint = re.sub(r"metric_name", metric, site)
if metric == "cpu_utilization":
- response = yaml.load(cpu_utilization, Loader=yaml.Loader)
+ response = yaml.safe_load(cpu_utilization)
elif metric == "users":
- response = yaml.load(users, Loader=yaml.Loader)
+ response = yaml.safe_load(users)
elif metric == "load":
- response = yaml.load(load, Loader=yaml.Loader)
+ response = yaml.safe_load(load)
else:
- response = yaml.load(empty, Loader=yaml.Loader)
+ response = yaml.safe_load(empty)
mock_res.get(endpoint, payload=response)
async def test_prom_metric_request(self):
with self.subTest("Test case1 failed in test_prom"):
- prom_response = yaml.load(prom_res, Loader=yaml.Loader)
+ prom_response = yaml.safe_load(prom_res)
with aioresponses() as mock_res:
self.set_get_mock_res(mock_res, self.nsr_id, self.metric_check_list)
result = await self.pmjobs_topic._prom_metric_request(
def test_show(self):
with self.subTest("Test case1 failed in test_show"):
- show_response = yaml.load(show_res, Loader=yaml.Loader)
+ show_response = yaml.safe_load(show_res)
with aioresponses() as mock_res:
self.set_get_mock_res(mock_res, self.nsr_id, self.metric_check_list)
result = self.pmjobs_topic.show(self.session, self.nsr_id)