X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=4d3997e5ef0cdabee84af9a7ccaacfb37d0283a2;hb=6509c6c40a160d05fff193ae0b761fe31b849c90;hp=e35afc6fc37e0ef18b28e58e0e284e7a6312113c;hpb=e95ed366604ea6ed0746272b77be2cd598aa6bda;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index e35afc6..4d3997e 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -57,11 +57,12 @@ lcm_config = { "user": getenv("OSMLCM_VCA_USER", "admin"), "secret": getenv("OSMLCM_VCA_SECRET", "vca"), "public_key": getenv("OSMLCM_VCA_PUBKEY", None), - 'ca_cert': getenv("OSMLCM_VCA_CACERT", None) + 'ca_cert': getenv("OSMLCM_VCA_CACERT", None), + 'apiproxy': getenv("OSMLCM_VCA_APIPROXY", "192.168.1.1") }, "ro_config": { - "endpoint_url": "http://{}:{}/openmano".format(getenv("OSMLCM_RO_HOST", "ro"), - getenv("OSMLCM_RO_PORT", "9090")), + "uri": "http://{}:{}/openmano".format(getenv("OSMLCM_RO_HOST", "ro"), + getenv("OSMLCM_RO_PORT", "9090")), "tenant": getenv("OSMLCM_RO_TENANT", "osm"), "logger_name": "lcm.ROclient", "loglevel": "DEBUG", @@ -86,7 +87,7 @@ class TestMyNS(asynctest.TestCase): yield "app_name-{}".format(num_calls) num_calls += 1 - def _n2vc_CreateExecutionEnvironment(self, namespace, reuse_ee_id, db_dict): + def _n2vc_CreateExecutionEnvironment(self, namespace, reuse_ee_id, db_dict, *args, **kwargs): k_list = namespace.split(".") ee_id = k_list[1] + "." if len(k_list) >= 2: @@ -175,6 +176,8 @@ class TestMyNS(asynctest.TestCase): if not getenv("OSMLCMTEST_VCA_NOMOCK"): ns.N2VCJujuConnector = asynctest.MagicMock(ns.N2VCJujuConnector) + ns.N2VCJujuConnectorLCM = asynctest.MagicMock(ns.N2VCJujuConnectorLCM) + ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn) # Create NsLCM class self.my_ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, lcm_config, self.loop) @@ -256,10 +259,53 @@ class TestMyNS(asynctest.TestCase): if not getenv("OSMLCMTEST_DB_NOMOCK"): self.assertTrue(self.db.set_one.called, "db.set_one not called") + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + self.assertEqual(db_nsr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated") + for vnfr in db_vnfrs_list: + self.assertEqual(vnfr["_admin"].get("nsState"), "INSTANTIATED", "Not instantiated") + + if not getenv("OSMLCMTEST_VCA_NOMOCK"): + # check intial-primitives called + self.assertTrue(self.my_ns.n2vc.exec_primitive.called, + "Exec primitive not called for initial config primitive") + for _call in self.my_ns.n2vc.exec_primitive.call_args_list: + self.assertIn(_call[1]["primitive_name"], ("config", "touch"), + "called exec primitive with a primitive different than config or touch") # TODO add more checks of called methods # TODO add a terminate + async def test_instantiate_ee_list(self): + # Using modern IM where configuration is in the new format of execution_environment_list + ee_descriptor_id = "charm_simple" + non_used_initial_primitive = { + "name": "not_to_be_called", + "seq": 3, + "execution-environment-ref": "not_used_ee" + } + ee_list = [ + { + "id": ee_descriptor_id, + "juju": {"charm": "simple"}, + + }, + ] + + self.db.set_one( + "vnfds", + q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"}, + update_dict={"vnf-configuration.execution-environment-list": ee_list, + "vnf-configuration.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id, + "vnf-configuration.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id, + "vnf-configuration.initial-config-primitive.2": non_used_initial_primitive, + "vnf-configuration.config-primitive.0.execution-environment-ref": ee_descriptor_id, + "vnf-configuration.config-primitive.0.execution-environment-primitive": "touch_charm", + }, + unset={"vnf-configuration.juju": None}) + await self.test_instantiate() + # this will check that the initial-congig-primitive 'not_to_be_called' is not called + def test_ns_params_2_RO(self): vims = self.db.get_list("vim_accounts") vim_id = vims[0]["_id"] @@ -311,6 +357,20 @@ class TestMyNS(asynctest.TestCase): self.assertEqual(return_value, expected_value) # print("scale_result: {}".format(self.db.get_one("nslcmops", {"_id": nslcmop_id}).get("detailed-status"))) + async def test_vca_status_refresh(self): + nsr_id = descriptors.test_ids["TEST-A"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"] + await self.my_ns.vca_status_refresh(nsr_id, nslcmop_id) + for vnf_descriptor in self.db.get_list("vnfds"): + expected_value = dict() + return_value = dict() + if vnf_descriptor.get("vnf-configuration"): + if "juju" in vnf_descriptor["vnf-configuration"]: + expected_value = self.db.get_list("nsrs")[0]["vcaStatus"] + await self.my_ns._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed.VCA.0", {}) + return_value = self.db.get_list("nsrs")[0]["vcaStatus"] + self.assertEqual(return_value, expected_value) + # Test _retry_or_skip_suboperation() # Expected result: # - if a suboperation's 'operationState' is marked as 'COMPLETED', SUBOPERATION_STATUS_SKIP is expected @@ -503,6 +563,7 @@ class TestMyNS(asynctest.TestCase): logging_text = "KDU" self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id") self.my_ns.k8sclusterhelm.synchronize_repos = asynctest.CoroutineMock(return_value=("", "")) + self.my_ns.k8sclusterhelm.get_services = asynctest.CoroutineMock(return_value=([])) await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register) await asyncio.wait(list(task_register.keys()), timeout=100) db_nsr = self.db.get_list("nsrs")[1] @@ -567,6 +628,9 @@ class TestMyNS(asynctest.TestCase): self.assertEqual(db_nsr.get("currentOperationID"), None, "currentOperationID different than None") self.assertEqual(db_nsr.get("errorDescription "), None, "errorDescription different than None") self.assertEqual(db_nsr.get("errorDetail"), None, "errorDetail different than None") + db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) + for vnfr in db_vnfrs_list: + self.assertEqual(vnfr["_admin"].get("nsState"), "NOT_INSTANTIATED", "Not instantiated") @asynctest.fail_on(active_handles=True) # all async tasks must be completed async def test_terminate_primitive(self):