| ####################################################################################### |
| # Copyright ETSI Contributors and Others. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import asynctest |
| from asyncio.exceptions import CancelledError |
| from copy import deepcopy |
| from osm_common.dataclasses.temporal_dataclasses import ( |
| ChangeVnfInstantiationStateInput, |
| ChangeVnfStateInput, |
| GetTaskQueueInput, |
| GetVimCloudInput, |
| GetVnfDetailsInput, |
| VnfInstantiateInput, |
| VnfInstantiationState, |
| VnfState, |
| ) |
| from osm_common.dbbase import DbException |
| from osm_common.temporal_constants import ( |
| LCM_TASK_QUEUE, |
| ) |
| from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations |
| from temporalio.testing import ActivityEnvironment |
| from unittest.mock import Mock |
| |
| vnfr_uuid = "d08d2da5-2120-476c-8538-deaeb4e88b3e" |
| model_name = "a-model-name" |
| vnf_instantiate_input = VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name) |
| cloud = "microk8s" |
| sample_vnfr = { |
| "_id": "9f472177-95c0-4335-b357-5cdc17a79965", |
| "id": "9f472177-95c0-4335-b357-5cdc17a79965", |
| "nsr-id-ref": "dcf4c922-5a73-41bf-a6ca-870c22d6336c", |
| "vnfd-ref": "jar_vnfd_scalable", |
| "vnfd-id": "f1b38eac-190c-485d-9a74-c6e169c929d8", |
| "vim-account-id": "9b0bedc3-ea8e-42fd-acc9-dd03f4dee73c", |
| } |
| sample_vnfd = { |
| "_id": "97784f19-d254-4252-946c-cf92d85443ca", |
| "id": "sol006-vnf", |
| "provider": "Canonical", |
| "product-name": "test-vnf", |
| "software-version": "1.0", |
| } |
| sample_vim_record = { |
| "_id": "a64f7c6c-bc27-4ec8-b664-5500a3324eca", |
| "name": "juju", |
| "vim_type": "paas", |
| "vim_url": "192.168.1.100:17070", |
| "vim_user": "admin", |
| "vim_password": "c16gylWEepEREN6vWw==", |
| "config": { |
| "paas_provider": "juju", |
| "cloud": cloud, |
| "cloud_credentials": "microk8s", |
| "authorized_keys": "$HOME/.local/share/juju/ssh/juju_id_rsa.pub", |
| "ca_cert_content": "-----BEGIN-----", |
| }, |
| } |
| |
| |
| class TestVnfDbActivity(asynctest.TestCase): |
| def setUp(self): |
| self.db = Mock() |
| self.env = ActivityEnvironment() |
| self.vnf_db_activity = VnfDbActivity(self.db) |
| |
| async def test_change_vnf_state_nominal_case(self): |
| vnf_state = VnfState.STOPPED |
| change_vnf_state_input = ChangeVnfStateInput(vnfr_uuid, vnf_state) |
| await self.env.run( |
| self.vnf_db_activity.change_vnf_state, change_vnf_state_input |
| ) |
| self.db.set_one.assert_called_with( |
| "vnfrs", {"_id": vnfr_uuid}, {"vnfState": vnf_state.name} |
| ) |
| |
| async def test_change_vnf_state_db_raises_exception(self): |
| change_vnf_state_input = ChangeVnfStateInput(vnfr_uuid, VnfState.STARTED) |
| self.db.set_one.side_effect = DbException("not found") |
| with self.assertRaises(DbException): |
| await self.env.run( |
| self.vnf_db_activity.change_vnf_state, change_vnf_state_input |
| ) |
| |
| async def test_change_vnf_instantiation_state_nominal_case(self): |
| instantiation_state = VnfInstantiationState.NOT_INSTANTIATED |
| change_instantiation_input = ChangeVnfInstantiationStateInput( |
| vnfr_uuid, instantiation_state |
| ) |
| await self.env.run( |
| self.vnf_db_activity.change_vnf_instantiation_state, |
| change_instantiation_input, |
| ) |
| self.db.set_one.assert_called_with( |
| "vnfrs", |
| {"_id": vnfr_uuid}, |
| {"instantiationState": instantiation_state.name}, |
| ) |
| |
| async def test_change_vnf_instantiation_state_db_raises_exception(self): |
| change_instantiation_input = ChangeVnfInstantiationStateInput( |
| vnfr_uuid, VnfInstantiationState.INSTANTIATED |
| ) |
| self.db.set_one.side_effect = DbException("not found") |
| with self.assertRaises(DbException): |
| await self.env.run( |
| self.vnf_db_activity.change_vnf_state, change_instantiation_input |
| ) |
| |
| async def test_set_vnf_model_nominal_case(self): |
| await self.env.run(self.vnf_db_activity.set_vnf_model, vnf_instantiate_input) |
| self.db.set_one.assert_called_with( |
| "vnfrs", {"_id": vnfr_uuid}, {"namespace": model_name} |
| ) |
| |
| async def test_set_vnf_model_db_raises_exception(self): |
| self.db.set_one.side_effect = DbException("not found") |
| with self.assertRaises(DbException): |
| await self.env.run( |
| self.vnf_db_activity.set_vnf_model, vnf_instantiate_input |
| ) |
| |
| |
| class TestGetTaskQueue(asynctest.TestCase): |
| async def setUp(self): |
| self.db = Mock() |
| self.vnf_operations_instance = VnfOperations(self.db) |
| self.env = ActivityEnvironment() |
| self.get_task_queue_input = GetTaskQueueInput(vnfr_uuid=sample_vnfr["id"]) |
| |
| async def test_activity_succeeded(self): |
| self.db.get_one.side_effect = [sample_vnfr, sample_vim_record] |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_task_queue, |
| self.get_task_queue_input, |
| ) |
| self.assertEqual(activity_result.task_queue, LCM_TASK_QUEUE) |
| |
| async def test_db_raises_exception(self): |
| self.db.get_one.side_effect = DbException("not found") |
| with self.assertRaises(DbException): |
| await self.env.run( |
| self.vnf_operations_instance.get_task_queue, |
| self.get_task_queue_input, |
| ) |
| |
| async def test_invalid_task_queue(self): |
| vim_record = deepcopy(sample_vim_record) |
| vim_record["vim_type"] = "some-vim-type" |
| self.db.get_one.side_effect = [sample_vnfr, vim_record] |
| with self.assertRaises(KeyError): |
| await self.env.run( |
| self.vnf_operations_instance.get_task_queue, |
| self.get_task_queue_input, |
| ) |
| |
| async def test_activity_cancelled(self): |
| self.env._cancelled = True |
| self.db.get_one.side_effect = [sample_vnfr, sample_vim_record] |
| with self.assertRaises(CancelledError): |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_task_queue, |
| self.get_task_queue_input, |
| ) |
| self.assertEqual(activity_result, None) |
| |
| |
| class TestVnfDetails(asynctest.TestCase): |
| async def setUp(self): |
| self.db = Mock() |
| self.vnf_operations_instance = VnfOperations(self.db) |
| self.env = ActivityEnvironment() |
| |
| async def test_activity_succeeded(self): |
| self.db.get_one.side_effect = [sample_vnfr, sample_vnfd] |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vnf_details, |
| GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result.vnfd, sample_vnfd) |
| self.assertEqual(activity_result.vnfr, sample_vnfr) |
| |
| async def test_activity_failed_db_exception(self): |
| self.db.get_one.side_effect = DbException("Can not connect to Database.") |
| with self.assertRaises(DbException) as err: |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vnf_details, |
| GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result, None) |
| self.assertEqual( |
| str(err.exception), "database exception Can not connect to Database." |
| ) |
| |
| async def test_activity_failed_key_error(self): |
| vnfr = deepcopy(sample_vnfr) |
| vnfr.pop("vnfd-id") |
| self.db.get_one.side_effect = [vnfr, sample_vnfd] |
| with self.assertRaises(KeyError) as err: |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vnf_details, |
| GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result, None) |
| self.assertEqual(str(err.exception.args[0]), "vnfd-id") |
| |
| async def test_activity_cancelled(self): |
| self.env._cancelled = True |
| self.db.get_one.side_effect = [sample_vnfr, sample_vnfd] |
| with self.assertRaises(CancelledError): |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vnf_details, |
| GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result, None) |
| |
| |
| class TestGetVimCloud(asynctest.TestCase): |
| async def setUp(self): |
| self.db = Mock() |
| self.vnf_operations_instance = VnfOperations(self.db) |
| self.env = ActivityEnvironment() |
| |
| async def test_activity_succeeded(self): |
| self.db.get_one.side_effect = [sample_vnfr, sample_vim_record] |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vim_cloud, |
| GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result.cloud, cloud) |
| |
| async def test_activity_vim_record_without_cloud(self): |
| vim_record = deepcopy(sample_vim_record) |
| vim_record["config"].pop("cloud") |
| self.db.get_one.side_effect = [sample_vnfr, vim_record] |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vim_cloud, |
| GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result.cloud, "") |
| |
| async def test_activity_failed_db_exception(self): |
| self.db.get_one.side_effect = DbException("Can not connect to Database.") |
| with self.assertRaises(DbException) as err: |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vim_cloud, |
| GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result, None) |
| self.assertEqual( |
| str(err.exception), "database exception Can not connect to Database." |
| ) |
| |
| async def test_activity_failed_key_error(self): |
| vim_record = deepcopy(sample_vim_record) |
| vim_record.pop("config") |
| self.db.get_one.side_effect = [sample_vnfr, vim_record] |
| with self.assertRaises(KeyError) as err: |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vim_cloud, |
| GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result, None) |
| self.assertEqual(str(err.exception.args[0]), "config") |
| |
| async def test_activity_cancelled(self): |
| self.env._cancelled = True |
| self.db.get_one.side_effect = [sample_vnfr, sample_vim_record] |
| with self.assertRaises(CancelledError): |
| activity_result = await self.env.run( |
| self.vnf_operations_instance.get_vim_cloud, |
| GetVimCloudInput(vnfr_uuid=sample_vnfr["id"]), |
| ) |
| self.assertEqual(activity_result, None) |
| |
| |
| if __name__ == "__main__": |
| asynctest.main() |