schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
- await self.update_states(op_state, vim_state)
except ActivityError as e:
error_details = str(e.cause.with_traceback(e.__traceback__))
self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}")
raise e
+ await self.update_states(op_state, vim_state)
+
async def update_states(
self,
op_state: UpdateVimOperationStateInput,
vim_state: UpdateVimStateInput,
):
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_VIM_STATE,
- arg=vim_state,
- activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=default_schedule_to_close_timeout,
- retry_policy=retry_policy,
- )
+ raised_exceptions = []
+ try:
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_VIM_STATE,
+ arg=vim_state,
+ activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+ except ActivityError as e:
+ raised_exceptions.append(e)
+ self.logger.error(f"{WORKFLOW_VIM_CREATE} failed to update VIM state.")
+ try:
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ arg=op_state,
+ activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+ except ActivityError as e:
+ self.logger.error(
+ f"{WORKFLOW_VIM_CREATE} failed to update VIM operation state."
+ )
+ raised_exceptions.append(e)
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
- arg=op_state,
- activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=default_schedule_to_close_timeout,
- retry_policy=retry_policy,
- )
+ # Only return de first exception. There is not much value in complicating the logic
+ # here to return all the exceptions if more than one is raised.
+ if raised_exceptions:
+ raise raised_exceptions[0]
@workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED)
raise TestException("Test exception")
-def assert_vim_state(call_args_list, vim_id, expected_states):
- """Asserts that the VIM state was set to the expected states."""
- assert call_args_list
- # Due to retry policy, in failure cases there are duplicated states
- called_states = set()
- for call in call_args_list:
- assert call.args[0].vim_uuid == vim_id
- called_states.add(call.args[0].operational_state)
- assert list(called_states) == expected_states
-
-
-def assert_vim_op_state(call_args_list, vim_id, expected_states):
- """Asserts that the VIM operation state was set to the expected states."""
- assert call_args_list
- # Due to retry policy, in failure cases there are duplicated states
- called_states = set()
- for call in call_args_list:
- assert call.args[0].vim_uuid == vim_id
- called_states.add(call.args[0].op_state)
- assert list(called_states) == expected_states
-
-
class TestVimWorkflowsBase(asynctest.TestCase):
task_queue_name = LCM_TASK_QUEUE
vim_id = "some-vim-uuid"
debug_mode=True,
)
- def check_vim_states_update(self, expected_vim_state, expected_vim_op_state):
- assert_vim_state(
- self.mock_update_vim_state_tracker.call_args_list,
- self.vim_id,
- expected_vim_state,
- )
- assert_vim_op_state(
- self.mock_update_vim_operation_state_tracker.call_args_list,
- self.vim_id,
- expected_vim_op_state,
- )
+ def check_vim_state_is_updated(self, expected_states):
+ """Asserts that the VIM state was set to each of the expected states (in order)."""
+ call_args_list = self.mock_update_vim_state_tracker.call_args_list
+ self.assertTrue(call_args_list)
+ called_states = list()
+ for call in call_args_list:
+ self.assertEqual(call.args[0].vim_uuid, self.vim_id)
+ called_states.append(call.args[0].operational_state)
+ self.assertEqual(called_states, expected_states)
+
+ def check_vim_op_state_is_updated(self, expected_states):
+ """Asserts that the VIM operation state was set to each of the expected states (in order)."""
+ call_args_list = self.mock_update_vim_operation_state_tracker.call_args_list
+ self.assertTrue(call_args_list)
+ called_states = list()
+ for call in call_args_list:
+ self.assertEqual(call.args[0].vim_uuid, self.vim_id)
+ called_states.append(call.args[0].op_state)
+ self.assertEqual(called_states, expected_states)
@parameterized_class(
]
)
class TestVimWorkflow(TestVimWorkflowsBase):
- async def test_nominal_case(self):
+ async def test_nominal_case_updates_vim_state_and_vim_op_state(self):
activities = [
mock_test_vim_connectivity,
self.mock_update_vim_state,
expected_vim_state = [VimState.ENABLED]
expected_vim_op_state = [VimOperationState.COMPLETED]
async with self.env, self.get_worker(activities):
- result = await self.client.execute_workflow(
+ await self.client.execute_workflow(
self.workflow_name,
arg=self.vim_operation_input,
id=self.worflow_id,
task_queue=self.task_queue_name,
)
- self.assertIsNone(result)
- self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
- async def test_fail_update_vim_state(self):
+ async def test_fail_update_vim_state_activity__updates_vim_operation_state(self):
activities = [
mock_test_vim_connectivity,
self.mock_update_vim_state_raises,
self.mock_update_vim_operation_state,
]
- expected_vim_state = [VimState.ENABLED, VimState.ERROR]
+ retry_policy = 3
+ expected_vim_state = [VimState.ENABLED] * retry_policy
+ expected_vim_op_state = [VimOperationState.COMPLETED]
async with self.env, self.get_worker(activities):
with self.assertRaises(WorkflowFailureError):
- result = await self.client.execute_workflow(
+ await self.client.execute_workflow(
self.workflow_name,
arg=self.vim_operation_input,
id=self.worflow_id,
task_queue=self.task_queue_name,
)
- self.assertIsNone(result)
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
- assert_vim_state(
- self.mock_update_vim_state_tracker.call_args_list,
- self.vim_id,
- expected_vim_state,
- )
- self.mock_update_vim_operation_state_tracker.assert_not_called()
-
- async def test_fail_update_vim_op_state(self):
+ async def test_fail_update_vim_op_state_activity__updates_vim_state(self):
activities = [
mock_test_vim_connectivity,
self.mock_update_vim_state,
self.mock_update_vim_operation_state_raises,
]
- expected_vim_state = [VimState.ENABLED, VimState.ERROR]
- expected_vim_op_state = [VimOperationState.COMPLETED, VimOperationState.FAILED]
+ expected_vim_state = [VimState.ENABLED]
+ retry_policy = 3
+ expected_vim_op_state = [VimOperationState.COMPLETED] * retry_policy
async with self.env, self.get_worker(activities):
with self.assertRaises(WorkflowFailureError):
- result = await self.client.execute_workflow(
+ await self.client.execute_workflow(
self.workflow_name,
arg=self.vim_operation_input,
id=self.worflow_id,
task_queue=self.task_queue_name,
)
- self.assertIsNone(result)
- self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
- async def test_activity_error(self):
+ async def test_connectivity_activity_failure__updates_vim_state_and_vim_op_state_to_failure(
+ self,
+ ):
activities = [
mock_test_vim_connectivity_raises,
self.mock_update_vim_state,
expected_vim_op_state = [VimOperationState.FAILED]
async with self.env, self.get_worker(activities):
with self.assertRaises(WorkflowFailureError):
- result = await self.client.execute_workflow(
+ await self.client.execute_workflow(
self.workflow_name,
arg=self.vim_operation_input,
id=self.worflow_id,
task_queue=self.task_queue_name,
)
- self.assertIsNone(result)
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
+
+ async def test_connectivity_and_vim_state_update_activity_failures(self):
+ activities = [
+ mock_test_vim_connectivity_raises,
+ self.mock_update_vim_state_raises,
+ self.mock_update_vim_operation_state,
+ ]
+ retry_policy = 3
+ expected_vim_state = [VimState.ERROR] * retry_policy
+ expected_vim_op_state = [VimOperationState.FAILED]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
- self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+ async def test_connectivity_and_vim_op_state_update_activity_failures(self):
+ activities = [
+ mock_test_vim_connectivity_raises,
+ self.mock_update_vim_state,
+ self.mock_update_vim_operation_state_raises,
+ ]
+ retry_policy = 3
+ expected_vim_state = [VimState.ERROR]
+ expected_vim_op_state = [VimOperationState.FAILED] * retry_policy
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
+
+ async def test_connectivity_vim_state_update_and_vim_op_state_update_failures(self):
+ activities = [
+ mock_test_vim_connectivity_raises,
+ self.mock_update_vim_state_raises,
+ self.mock_update_vim_operation_state_raises,
+ ]
+ retry_policy = 3
+ expected_vim_state = [VimState.ERROR] * retry_policy
+ expected_vim_op_state = [VimOperationState.FAILED] * retry_policy
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError):
+ await self.client.execute_workflow(
+ self.workflow_name,
+ arg=self.vim_operation_input,
+ id=self.worflow_id,
+ task_queue=self.task_queue_name,
+ )
+ self.check_vim_state_is_updated(expected_vim_state)
+ self.check_vim_op_state_is_updated(expected_vim_op_state)
class TestVimDeleteWorkflow(TestVimWorkflowsBase):