X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=28b8fbe25d45f3f1e58ce2f5818e31cff4dd513d;hb=refs%2Fchanges%2F87%2F10687%2F2;hp=2a1b125091ca4134495262c64e0da99cb97d9f3e;hpb=922c41753ffbb4b526f2135a23c39f480c58e2cb;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 2a1b125..28b8fbe 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -342,6 +342,25 @@ class TestMyNS(asynctest.TestCase): self.assertEqual(return_value, expected_value) # print("scale_result: {}".format(self.db.get_one("nslcmops", {"_id": nslcmop_id}).get("detailed-status"))) + async def test_vca_status_refresh(self): + nsr_id = descriptors.test_ids["TEST-A"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"] + await self.my_ns.vca_status_refresh(nsr_id, nslcmop_id) + expected_value = dict() + return_value = dict() + vnf_descriptors = self.db.get_list("vnfds") + for i, _ in enumerate(vnf_descriptors): + for j, value in enumerate(vnf_descriptors[i]["df"]): + if "lcm-operations-configuration" in vnf_descriptors[i]["df"][j]: + if "day1-2" in value["lcm-operations-configuration"]["operate-vnf-op-config"]: + for k, v in enumerate(value["lcm-operations-configuration"]["operate-vnf-op-config"]["day1-2"]): + if "juju" in v["execution-environment-list"][k]: + expected_value = self.db.get_list("nsrs")[i]["vcaStatus"] + await self.my_ns._on_update_n2vc_db("nsrs", {"_id": nsr_id}, + "_admin.deployed.VCA.0", {}) + return_value = self.db.get_list("nsrs")[i]["vcaStatus"] + self.assertEqual(return_value, expected_value) + # Test _retry_or_skip_suboperation() # Expected result: # - if a suboperation's 'operationState' is marked as 'COMPLETED', SUBOPERATION_STATUS_SKIP is expected @@ -529,10 +548,12 @@ class TestMyNS(asynctest.TestCase): db_vnfr = self.db.get_one("vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": "multikdu"}) db_vnfrs = {"multikdu": db_vnfr} db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) - db_vnfds = {db_vnfd["_id"]: db_vnfd} + db_vnfds = [db_vnfd] task_register = {} logging_text = "KDU" - self.my_ns.k8sclusterhelm3.install = asynctest.CoroutineMock(return_value="k8s_id") + self.my_ns.k8sclusterhelm3.generate_kdu_instance_name = asynctest.mock.Mock() + self.my_ns.k8sclusterhelm3.generate_kdu_instance_name.return_value = "k8s_id" + self.my_ns.k8sclusterhelm3.install = asynctest.CoroutineMock() self.my_ns.k8sclusterhelm3.synchronize_repos = asynctest.CoroutineMock(return_value=("", "")) self.my_ns.k8sclusterhelm3.get_services = asynctest.CoroutineMock(return_value=([])) await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register)