X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=fe2b652215cf9f14771eefd11ae1825cb99ea319;hb=80ad921cf8875a4fa018b9a99275115b9f26d834;hp=e35afc6fc37e0ef18b28e58e0e284e7a6312113c;hpb=e95ed366604ea6ed0746272b77be2cd598aa6bda;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index e35afc6..fe2b652 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -57,11 +57,12 @@ lcm_config = { "user": getenv("OSMLCM_VCA_USER", "admin"), "secret": getenv("OSMLCM_VCA_SECRET", "vca"), "public_key": getenv("OSMLCM_VCA_PUBKEY", None), - 'ca_cert': getenv("OSMLCM_VCA_CACERT", None) + 'ca_cert': getenv("OSMLCM_VCA_CACERT", None), + 'apiproxy': getenv("OSMLCM_VCA_APIPROXY", "192.168.1.1") }, "ro_config": { - "endpoint_url": "http://{}:{}/openmano".format(getenv("OSMLCM_RO_HOST", "ro"), - getenv("OSMLCM_RO_PORT", "9090")), + "uri": "http://{}:{}/openmano".format(getenv("OSMLCM_RO_HOST", "ro"), + getenv("OSMLCM_RO_PORT", "9090")), "tenant": getenv("OSMLCM_RO_TENANT", "osm"), "logger_name": "lcm.ROclient", "loglevel": "DEBUG", @@ -86,7 +87,7 @@ class TestMyNS(asynctest.TestCase): yield "app_name-{}".format(num_calls) num_calls += 1 - def _n2vc_CreateExecutionEnvironment(self, namespace, reuse_ee_id, db_dict): + def _n2vc_CreateExecutionEnvironment(self, namespace, reuse_ee_id, db_dict, *args, **kwargs): k_list = namespace.split(".") ee_id = k_list[1] + "." if len(k_list) >= 2: @@ -175,6 +176,8 @@ class TestMyNS(asynctest.TestCase): if not getenv("OSMLCMTEST_VCA_NOMOCK"): ns.N2VCJujuConnector = asynctest.MagicMock(ns.N2VCJujuConnector) + ns.N2VCJujuConnectorLCM = asynctest.MagicMock(ns.N2VCJujuConnectorLCM) + ns.LCMHelmConn = asynctest.MagicMock(ns.LCMHelmConn) # Create NsLCM class self.my_ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, lcm_config, self.loop) @@ -256,10 +259,46 @@ class TestMyNS(asynctest.TestCase): if not getenv("OSMLCMTEST_DB_NOMOCK"): self.assertTrue(self.db.set_one.called, "db.set_one not called") - + if not getenv("OSMLCMTEST_VCA_NOMOCK"): + # check intial-primitives called + self.assertTrue(self.my_ns.n2vc.exec_primitive.called, + "Exec primitive not called for initial config primitive") + for _call in self.my_ns.n2vc.exec_primitive.call_args_list: + self.assertIn(_call[1]["primitive_name"], ("config", "touch"), + "called exec primitive with a primitive different than config or touch") # TODO add more checks of called methods # TODO add a terminate + async def test_instantiate_ee_list(self): + # Using modern IM where configuration is in the new format of execution_environment_list + ee_descriptor_id = "charm_simple" + non_used_initial_primitive = { + "name": "not_to_be_called", + "seq": 3, + "execution-environment-ref": "not_used_ee" + } + ee_list = [ + { + "id": ee_descriptor_id, + "juju": {"charm": "simple"}, + + }, + ] + + self.db.set_one( + "vnfds", + q_filter={"_id": "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77"}, + update_dict={"vnf-configuration.execution-environment-list": ee_list, + "vnf-configuration.initial-config-primitive.0.execution-environment-ref": ee_descriptor_id, + "vnf-configuration.initial-config-primitive.1.execution-environment-ref": ee_descriptor_id, + "vnf-configuration.initial-config-primitive.2": non_used_initial_primitive, + "vnf-configuration.config-primitive.0.execution-environment-ref": ee_descriptor_id, + "vnf-configuration.config-primitive.0.execution-environment-primitive": "touch_charm", + }, + unset={"vnf-configuration.juju": None}) + await self.test_instantiate() + # this will check that the initial-congig-primitive 'not_to_be_called' is not called + def test_ns_params_2_RO(self): vims = self.db.get_list("vim_accounts") vim_id = vims[0]["_id"] @@ -503,6 +542,7 @@ class TestMyNS(asynctest.TestCase): logging_text = "KDU" self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id") self.my_ns.k8sclusterhelm.synchronize_repos = asynctest.CoroutineMock(return_value=("", "")) + self.my_ns.k8sclusterhelm.get_services = asynctest.CoroutineMock(return_value=([])) await self.my_ns.deploy_kdus(logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_register) await asyncio.wait(list(task_register.keys()), timeout=100) db_nsr = self.db.get_list("nsrs")[1]