--- /dev/null
+#########################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asynctest
+from osm_common.dataclasses.temporal_dataclasses import (
+ DeleteVimInput,
+ TestVimConnectivityInput,
+ UpdateVimOperationStateInput,
+ UpdateVimStateInput,
+ VimOperationInput,
+ VimState,
+ VimOperationState,
+)
+from osm_common.temporal_constants import (
+ ACTIVITY_TEST_VIM_CONNECTIVITY,
+ ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ ACTIVITY_UPDATE_VIM_STATE,
+ ACTIVITY_DELETE_VIM,
+ LCM_TASK_QUEUE,
+ WORKFLOW_VIM_CREATE,
+ WORKFLOW_VIM_UPDATE,
+ WORKFLOW_VIM_DELETE,
+)
+from osm_lcm.temporal.vim_workflows import (
+ VimCreateWorkflow,
+ VimUpdateWorkflow,
+ VimDeleteWorkflow,
+)
+from parameterized import parameterized_class
+from temporalio import activity
+from temporalio.client import WorkflowFailureError
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+from unittest.mock import Mock
+
+
+class TestException(Exception):
+ pass
+
+
+@activity.defn(name=ACTIVITY_TEST_VIM_CONNECTIVITY)
+async def mock_test_vim_connectivity(
+ test_connectivity_input: TestVimConnectivityInput,
+) -> None:
+ pass
+
+
+@activity.defn(name=ACTIVITY_TEST_VIM_CONNECTIVITY)
+async def mock_test_vim_connectivity_raises(
+ test_connectivity_input: TestVimConnectivityInput,
+) -> None:
+ raise TestException("Test exception")
+
+
+@activity.defn(name=ACTIVITY_DELETE_VIM)
+async def mock_delete_vim_record_raises(data: DeleteVimInput) -> None:
+ raise TestException("Test exception")
+
+
+def assert_vim_state(call_args_list, vim_id, expected_states):
+ """Asserts that the VIM state was set to the expected states."""
+ assert call_args_list
+ # Due to retry policy, in failure cases there are duplicated states
+ called_states = set()
+ for call in call_args_list:
+ assert call.args[0].vim_uuid == vim_id
+ called_states.add(call.args[0].operational_state)
+ assert list(called_states) == expected_states
+
+
+def assert_vim_op_state(call_args_list, vim_id, expected_states):
+ """Asserts that the VIM operation state was set to the expected states."""
+ assert call_args_list
+ # Due to retry policy, in failure cases there are duplicated states
+ called_states = set()
+ for call in call_args_list:
+ assert call.args[0].vim_uuid == vim_id
+ called_states.add(call.args[0].op_state)
+ assert list(called_states) == expected_states
+
+
+class TestVimWorkflowsBase(asynctest.TestCase):
+ task_queue_name = LCM_TASK_QUEUE
+ vim_id = "some-vim-uuid"
+ worflow_id = vim_id
+ vim_operation_input = VimOperationInput(vim_id, "op_id")
+
+ @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE)
+ async def mock_update_vim_state(self, data: UpdateVimStateInput) -> None:
+ self.mock_update_vim_state_tracker(data)
+
+ @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE)
+ async def mock_update_vim_state_raises(self, data: UpdateVimStateInput) -> None:
+ self.mock_update_vim_state_tracker(data)
+ raise TestException("Test exception")
+
+ @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE)
+ async def mock_update_vim_operation_state(
+ self, data: UpdateVimOperationStateInput
+ ) -> None:
+ self.mock_update_vim_operation_state_tracker(data)
+
+ @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE)
+ async def mock_update_vim_operation_state_raises(
+ self, data: UpdateVimOperationStateInput
+ ) -> None:
+ self.mock_update_vim_operation_state_tracker(data)
+ raise TestException("Test exception")
+
+ @activity.defn(name=ACTIVITY_DELETE_VIM)
+ async def mock_delete_vim_record(self, data: DeleteVimInput) -> None:
+ self.mock_delete_vim_record_tracker(data)
+
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+ self.mock_update_vim_state_tracker = Mock()
+ self.mock_update_vim_operation_state_tracker = Mock()
+ self.mock_delete_vim_record_tracker = Mock()
+
+ def get_worker(self, activities: list) -> Worker:
+ return Worker(
+ self.client,
+ task_queue=self.task_queue_name,
+ workflows=[VimCreateWorkflow, VimUpdateWorkflow, VimDeleteWorkflow],
+ activities=activities,
+ debug_mode=True,
+ )
+
+ def check_vim_states_update(self, expected_vim_state, expected_vim_op_state):
+ assert_vim_state(
+ self.mock_update_vim_state_tracker.call_args_list,
+ self.vim_id,
+ expected_vim_state,
+ )
+ assert_vim_op_state(
+ self.mock_update_vim_operation_state_tracker.call_args_list,
+ self.vim_id,
+ expected_vim_op_state,
+ )
+
+
+@parameterized_class(
+ [
+ {"workflow_name": WORKFLOW_VIM_CREATE},
+ {"workflow_name": WORKFLOW_VIM_UPDATE},
+ ]
+)
+class TestVimWorkflow(TestVimWorkflowsBase):
+ async def test_nominal_case(self):
+ activities = [
+ mock_test_vim_connectivity,
+ self.mock_update_vim_state,
+ self.mock_update_vim_operation_state,
+ ]
+ expected_vim_state = [VimState.ENABLED]
+ expected_vim_op_state = [VimOperationState.COMPLETED]
+ async with self.env, self.get_worker(activities):
+ result = await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.assertIsNone(result)
+ self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+
+ async def test_fail_update_vim_state(self):
+ activities = [
+ mock_test_vim_connectivity,
+ self.mock_update_vim_state_raises,
+ self.mock_update_vim_operation_state,
+ ]
+ expected_vim_state = [VimState.ENABLED, VimState.ERROR]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ result = await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.assertIsNone(result)
+
+ assert_vim_state(
+ self.mock_update_vim_state_tracker.call_args_list,
+ self.vim_id,
+ expected_vim_state,
+ )
+ self.mock_update_vim_operation_state_tracker.assert_not_called()
+
+ async def test_fail_update_vim_op_state(self):
+ activities = [
+ mock_test_vim_connectivity,
+ self.mock_update_vim_state,
+ self.mock_update_vim_operation_state_raises,
+ ]
+ expected_vim_state = [VimState.ENABLED, VimState.ERROR]
+ expected_vim_op_state = [VimOperationState.COMPLETED, VimOperationState.FAILED]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ result = await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.assertIsNone(result)
+ self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+
+ async def test_activity_error(self):
+ activities = [
+ mock_test_vim_connectivity_raises,
+ self.mock_update_vim_state,
+ self.mock_update_vim_operation_state,
+ ]
+ expected_vim_state = [VimState.ERROR]
+ expected_vim_op_state = [VimOperationState.FAILED]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ result = await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.assertIsNone(result)
+
+ self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+
+
+class TestVimDeleteWorkflow(TestVimWorkflowsBase):
+ async def test_vim_delete_nominal_case(self):
+ activities = [self.mock_delete_vim_record]
+ async with self.env, self.get_worker(activities):
+ result = await self.client.execute_workflow(
+ WORKFLOW_VIM_DELETE,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.assertIsNone(result)
+ self.mock_delete_vim_record_tracker.assert_called_once()
+
+ async def test_vim_delete_exception(self):
+ activities = [mock_delete_vim_record_raises]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ result = await self.client.execute_workflow(
+ WORKFLOW_VIM_DELETE,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.assertIsNone(result)