if not getenv("OSMLCMTEST_DB_NOMOCK"):
self.assertTrue(self.db.set_one.called, "db.set_one not called")
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
+ for vnfr in db_vnfrs_list:
+ self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated")
+
if not getenv("OSMLCMTEST_VCA_NOMOCK"):
# check intial-primitives called
self.assertTrue(self.my_ns.n2vc.exec_primitive.called,
for _call in self.my_ns.n2vc.exec_primitive.call_args_list:
self.assertIn(_call[1]["primitive_name"], ("config", "touch"),
"called exec primitive with a primitive different than config or touch")
+
# TODO add more checks of called methods
# TODO add a terminate
logging_text = "KDU"
self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id")
self.my_ns.k8sclusterhelm.synchronize_repos = asynctest.CoroutineMock(return_value=("", ""))
+ self.my_ns.k8sclusterhelm.get_services = asynctest.CoroutineMock(return_value=([]))
await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register)
await asyncio.wait(list(task_register.keys()), timeout=100)
db_nsr = self.db.get_list("nsrs")[1]
self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None")
self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None")
self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None")
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+ for vnfr in db_vnfrs_list:
+ self.assertEqual(vnfr["_admin"].get("nsState"), "NOT_INSTANTIATED", "Not instantiated")
@asynctest.fail_on(active_handles=True) # all async tasks must be completed
async def test_terminate_primitive(self):