From 842c11fee5317c515129a8f9673bc97050f88bca Mon Sep 17 00:00:00 2001 From: Patricia Reinoso Date: Tue, 30 May 2023 11:50:58 +0000 Subject: [PATCH] Improve VIM workflows logic Avoid use an activity that failed to update the VIM status as FAILED. Change-Id: I91b1b2dbb042d0566508d507b991dd40bf6ba02c Signed-off-by: Patricia Reinoso --- osm_lcm/temporal/vim_workflows.py | 50 ++++++--- osm_lcm/tests/test_vim_workflows.py | 160 ++++++++++++++++++---------- 2 files changed, 135 insertions(+), 75 deletions(-) diff --git a/osm_lcm/temporal/vim_workflows.py b/osm_lcm/temporal/vim_workflows.py index e083967..cb7d636 100644 --- a/osm_lcm/temporal/vim_workflows.py +++ b/osm_lcm/temporal/vim_workflows.py @@ -68,7 +68,6 @@ class VimCreateWorkflow: schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, ) - await self.update_states(op_state, vim_state) except ActivityError as e: error_details = str(e.cause.with_traceback(e.__traceback__)) @@ -98,28 +97,45 @@ class VimCreateWorkflow: self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}") raise e + await self.update_states(op_state, vim_state) + async def update_states( self, op_state: UpdateVimOperationStateInput, vim_state: UpdateVimStateInput, ): - await workflow.execute_activity( - activity=ACTIVITY_UPDATE_VIM_STATE, - arg=vim_state, - activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=default_schedule_to_close_timeout, - retry_policy=retry_policy, - ) + raised_exceptions = [] + try: + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_VIM_STATE, + arg=vim_state, + activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + except ActivityError as e: + raised_exceptions.append(e) + self.logger.error(f"{WORKFLOW_VIM_CREATE} failed to update VIM state.") + try: + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE, + arg=op_state, + activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + except ActivityError as e: + self.logger.error( + f"{WORKFLOW_VIM_CREATE} failed to update VIM operation state." + ) + raised_exceptions.append(e) - await workflow.execute_activity( - activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE, - arg=op_state, - activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=default_schedule_to_close_timeout, - retry_policy=retry_policy, - ) + # Only return de first exception. There is not much value in complicating the logic + # here to return all the exceptions if more than one is raised. + if raised_exceptions: + raise raised_exceptions[0] @workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED) diff --git a/osm_lcm/tests/test_vim_workflows.py b/osm_lcm/tests/test_vim_workflows.py index 0220fb7..e48b94c 100644 --- a/osm_lcm/tests/test_vim_workflows.py +++ b/osm_lcm/tests/test_vim_workflows.py @@ -70,28 +70,6 @@ async def mock_delete_vim_record_raises(data: DeleteVimInput) -> None: raise TestException("Test exception") -def assert_vim_state(call_args_list, vim_id, expected_states): - """Asserts that the VIM state was set to the expected states.""" - assert call_args_list - # Due to retry policy, in failure cases there are duplicated states - called_states = set() - for call in call_args_list: - assert call.args[0].vim_uuid == vim_id - called_states.add(call.args[0].operational_state) - assert list(called_states) == expected_states - - -def assert_vim_op_state(call_args_list, vim_id, expected_states): - """Asserts that the VIM operation state was set to the expected states.""" - assert call_args_list - # Due to retry policy, in failure cases there are duplicated states - called_states = set() - for call in call_args_list: - assert call.args[0].vim_uuid == vim_id - called_states.add(call.args[0].op_state) - assert list(called_states) == expected_states - - class TestVimWorkflowsBase(asynctest.TestCase): task_queue_name = LCM_TASK_QUEUE vim_id = "some-vim-uuid" @@ -140,17 +118,25 @@ class TestVimWorkflowsBase(asynctest.TestCase): debug_mode=True, ) - def check_vim_states_update(self, expected_vim_state, expected_vim_op_state): - assert_vim_state( - self.mock_update_vim_state_tracker.call_args_list, - self.vim_id, - expected_vim_state, - ) - assert_vim_op_state( - self.mock_update_vim_operation_state_tracker.call_args_list, - self.vim_id, - expected_vim_op_state, - ) + def check_vim_state_is_updated(self, expected_states): + """Asserts that the VIM state was set to each of the expected states (in order).""" + call_args_list = self.mock_update_vim_state_tracker.call_args_list + self.assertTrue(call_args_list) + called_states = list() + for call in call_args_list: + self.assertEqual(call.args[0].vim_uuid, self.vim_id) + called_states.append(call.args[0].operational_state) + self.assertEqual(called_states, expected_states) + + def check_vim_op_state_is_updated(self, expected_states): + """Asserts that the VIM operation state was set to each of the expected states (in order).""" + call_args_list = self.mock_update_vim_operation_state_tracker.call_args_list + self.assertTrue(call_args_list) + called_states = list() + for call in call_args_list: + self.assertEqual(call.args[0].vim_uuid, self.vim_id) + called_states.append(call.args[0].op_state) + self.assertEqual(called_states, expected_states) @parameterized_class( @@ -160,7 +146,7 @@ class TestVimWorkflowsBase(asynctest.TestCase): ] ) class TestVimWorkflow(TestVimWorkflowsBase): - async def test_nominal_case(self): + async def test_nominal_case_updates_vim_state_and_vim_op_state(self): activities = [ mock_test_vim_connectivity, self.mock_update_vim_state, @@ -169,59 +155,58 @@ class TestVimWorkflow(TestVimWorkflowsBase): expected_vim_state = [VimState.ENABLED] expected_vim_op_state = [VimOperationState.COMPLETED] async with self.env, self.get_worker(activities): - result = await self.client.execute_workflow( + await self.client.execute_workflow( self.workflow_name, arg=self.vim_operation_input, id=self.worflow_id, task_queue=self.task_queue_name, ) - self.assertIsNone(result) - self.check_vim_states_update(expected_vim_state, expected_vim_op_state) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) - async def test_fail_update_vim_state(self): + async def test_fail_update_vim_state_activity__updates_vim_operation_state(self): activities = [ mock_test_vim_connectivity, self.mock_update_vim_state_raises, self.mock_update_vim_operation_state, ] - expected_vim_state = [VimState.ENABLED, VimState.ERROR] + retry_policy = 3 + expected_vim_state = [VimState.ENABLED] * retry_policy + expected_vim_op_state = [VimOperationState.COMPLETED] async with self.env, self.get_worker(activities): with self.assertRaises(WorkflowFailureError): - result = await self.client.execute_workflow( + await self.client.execute_workflow( self.workflow_name, arg=self.vim_operation_input, id=self.worflow_id, task_queue=self.task_queue_name, ) - self.assertIsNone(result) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) - assert_vim_state( - self.mock_update_vim_state_tracker.call_args_list, - self.vim_id, - expected_vim_state, - ) - self.mock_update_vim_operation_state_tracker.assert_not_called() - - async def test_fail_update_vim_op_state(self): + async def test_fail_update_vim_op_state_activity__updates_vim_state(self): activities = [ mock_test_vim_connectivity, self.mock_update_vim_state, self.mock_update_vim_operation_state_raises, ] - expected_vim_state = [VimState.ENABLED, VimState.ERROR] - expected_vim_op_state = [VimOperationState.COMPLETED, VimOperationState.FAILED] + expected_vim_state = [VimState.ENABLED] + retry_policy = 3 + expected_vim_op_state = [VimOperationState.COMPLETED] * retry_policy async with self.env, self.get_worker(activities): with self.assertRaises(WorkflowFailureError): - result = await self.client.execute_workflow( + await self.client.execute_workflow( self.workflow_name, arg=self.vim_operation_input, id=self.worflow_id, task_queue=self.task_queue_name, ) - self.assertIsNone(result) - self.check_vim_states_update(expected_vim_state, expected_vim_op_state) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) - async def test_activity_error(self): + async def test_connectivity_activity_failure__updates_vim_state_and_vim_op_state_to_failure( + self, + ): activities = [ mock_test_vim_connectivity_raises, self.mock_update_vim_state, @@ -231,15 +216,74 @@ class TestVimWorkflow(TestVimWorkflowsBase): expected_vim_op_state = [VimOperationState.FAILED] async with self.env, self.get_worker(activities): with self.assertRaises(WorkflowFailureError): - result = await self.client.execute_workflow( + await self.client.execute_workflow( self.workflow_name, arg=self.vim_operation_input, id=self.worflow_id, task_queue=self.task_queue_name, ) - self.assertIsNone(result) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) + + async def test_connectivity_and_vim_state_update_activity_failures(self): + activities = [ + mock_test_vim_connectivity_raises, + self.mock_update_vim_state_raises, + self.mock_update_vim_operation_state, + ] + retry_policy = 3 + expected_vim_state = [VimState.ERROR] * retry_policy + expected_vim_op_state = [VimOperationState.FAILED] + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError): + await self.client.execute_workflow( + self.workflow_name, + arg=self.vim_operation_input, + id=self.worflow_id, + task_queue=self.task_queue_name, + ) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) - self.check_vim_states_update(expected_vim_state, expected_vim_op_state) + async def test_connectivity_and_vim_op_state_update_activity_failures(self): + activities = [ + mock_test_vim_connectivity_raises, + self.mock_update_vim_state, + self.mock_update_vim_operation_state_raises, + ] + retry_policy = 3 + expected_vim_state = [VimState.ERROR] + expected_vim_op_state = [VimOperationState.FAILED] * retry_policy + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError): + await self.client.execute_workflow( + self.workflow_name, + arg=self.vim_operation_input, + id=self.worflow_id, + task_queue=self.task_queue_name, + ) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) + + async def test_connectivity_vim_state_update_and_vim_op_state_update_failures(self): + activities = [ + mock_test_vim_connectivity_raises, + self.mock_update_vim_state_raises, + self.mock_update_vim_operation_state_raises, + ] + retry_policy = 3 + expected_vim_state = [VimState.ERROR] * retry_policy + expected_vim_op_state = [VimOperationState.FAILED] * retry_policy + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError): + await self.client.execute_workflow( + self.workflow_name, + arg=self.vim_operation_input, + id=self.worflow_id, + task_queue=self.task_queue_name, + ) + self.check_vim_state_is_updated(expected_vim_state) + self.check_vim_op_state_is_updated(expected_vim_op_state) class TestVimDeleteWorkflow(TestVimWorkflowsBase): -- 2.25.1