Improve VIM workflows logic 70/13470/9
authorPatricia Reinoso <patricia.reinoso@canonical.com>
Tue, 30 May 2023 11:50:58 +0000 (11:50 +0000)
committerbeierlm <mark.beierl@canonical.com>
Mon, 5 Jun 2023 18:58:31 +0000 (20:58 +0200)
Avoid use an activity that failed to
update the VIM status as FAILED.

Change-Id: I91b1b2dbb042d0566508d507b991dd40bf6ba02c
Signed-off-by: Patricia Reinoso <patricia.reinoso@canonical.com>
osm_lcm/temporal/vim_workflows.py
osm_lcm/tests/test_vim_workflows.py

index e083967..cb7d636 100644 (file)
@@ -68,7 +68,6 @@ class VimCreateWorkflow:
                 schedule_to_close_timeout=default_schedule_to_close_timeout,
                 retry_policy=retry_policy,
             )
-            await self.update_states(op_state, vim_state)
 
         except ActivityError as e:
             error_details = str(e.cause.with_traceback(e.__traceback__))
@@ -98,28 +97,45 @@ class VimCreateWorkflow:
             self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}")
             raise e
 
+        await self.update_states(op_state, vim_state)
+
     async def update_states(
         self,
         op_state: UpdateVimOperationStateInput,
         vim_state: UpdateVimStateInput,
     ):
-        await workflow.execute_activity(
-            activity=ACTIVITY_UPDATE_VIM_STATE,
-            arg=vim_state,
-            activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}",
-            task_queue=LCM_TASK_QUEUE,
-            schedule_to_close_timeout=default_schedule_to_close_timeout,
-            retry_policy=retry_policy,
-        )
+        raised_exceptions = []
+        try:
+            await workflow.execute_activity(
+                activity=ACTIVITY_UPDATE_VIM_STATE,
+                arg=vim_state,
+                activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+        except ActivityError as e:
+            raised_exceptions.append(e)
+            self.logger.error(f"{WORKFLOW_VIM_CREATE} failed to update VIM state.")
+        try:
+            await workflow.execute_activity(
+                activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+                arg=op_state,
+                activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+        except ActivityError as e:
+            self.logger.error(
+                f"{WORKFLOW_VIM_CREATE} failed to update VIM operation state."
+            )
+            raised_exceptions.append(e)
 
-        await workflow.execute_activity(
-            activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
-            arg=op_state,
-            activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}",
-            task_queue=LCM_TASK_QUEUE,
-            schedule_to_close_timeout=default_schedule_to_close_timeout,
-            retry_policy=retry_policy,
-        )
+        # Only return de first exception. There is not much value in complicating the logic
+        # here to return all the exceptions if more than one is raised.
+        if raised_exceptions:
+            raise raised_exceptions[0]
 
 
 @workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED)
index 0220fb7..e48b94c 100644 (file)
@@ -70,28 +70,6 @@ async def mock_delete_vim_record_raises(data: DeleteVimInput) -> None:
     raise TestException("Test exception")
 
 
-def assert_vim_state(call_args_list, vim_id, expected_states):
-    """Asserts that the VIM state was set to the expected states."""
-    assert call_args_list
-    # Due to retry policy, in failure cases there are duplicated states
-    called_states = set()
-    for call in call_args_list:
-        assert call.args[0].vim_uuid == vim_id
-        called_states.add(call.args[0].operational_state)
-    assert list(called_states) == expected_states
-
-
-def assert_vim_op_state(call_args_list, vim_id, expected_states):
-    """Asserts that the VIM operation state was set to the expected states."""
-    assert call_args_list
-    # Due to retry policy, in failure cases there are duplicated states
-    called_states = set()
-    for call in call_args_list:
-        assert call.args[0].vim_uuid == vim_id
-        called_states.add(call.args[0].op_state)
-    assert list(called_states) == expected_states
-
-
 class TestVimWorkflowsBase(asynctest.TestCase):
     task_queue_name = LCM_TASK_QUEUE
     vim_id = "some-vim-uuid"
@@ -140,17 +118,25 @@ class TestVimWorkflowsBase(asynctest.TestCase):
             debug_mode=True,
         )
 
-    def check_vim_states_update(self, expected_vim_state, expected_vim_op_state):
-        assert_vim_state(
-            self.mock_update_vim_state_tracker.call_args_list,
-            self.vim_id,
-            expected_vim_state,
-        )
-        assert_vim_op_state(
-            self.mock_update_vim_operation_state_tracker.call_args_list,
-            self.vim_id,
-            expected_vim_op_state,
-        )
+    def check_vim_state_is_updated(self, expected_states):
+        """Asserts that the VIM state was set to each of the expected states (in order)."""
+        call_args_list = self.mock_update_vim_state_tracker.call_args_list
+        self.assertTrue(call_args_list)
+        called_states = list()
+        for call in call_args_list:
+            self.assertEqual(call.args[0].vim_uuid, self.vim_id)
+            called_states.append(call.args[0].operational_state)
+        self.assertEqual(called_states, expected_states)
+
+    def check_vim_op_state_is_updated(self, expected_states):
+        """Asserts that the VIM operation state was set to each of the expected states (in order)."""
+        call_args_list = self.mock_update_vim_operation_state_tracker.call_args_list
+        self.assertTrue(call_args_list)
+        called_states = list()
+        for call in call_args_list:
+            self.assertEqual(call.args[0].vim_uuid, self.vim_id)
+            called_states.append(call.args[0].op_state)
+        self.assertEqual(called_states, expected_states)
 
 
 @parameterized_class(
@@ -160,7 +146,7 @@ class TestVimWorkflowsBase(asynctest.TestCase):
     ]
 )
 class TestVimWorkflow(TestVimWorkflowsBase):
-    async def test_nominal_case(self):
+    async def test_nominal_case_updates_vim_state_and_vim_op_state(self):
         activities = [
             mock_test_vim_connectivity,
             self.mock_update_vim_state,
@@ -169,59 +155,58 @@ class TestVimWorkflow(TestVimWorkflowsBase):
         expected_vim_state = [VimState.ENABLED]
         expected_vim_op_state = [VimOperationState.COMPLETED]
         async with self.env, self.get_worker(activities):
-            result = await self.client.execute_workflow(
+            await self.client.execute_workflow(
                 self.workflow_name,
                 arg=self.vim_operation_input,
                 id=self.worflow_id,
                 task_queue=self.task_queue_name,
             )
-            self.assertIsNone(result)
-        self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
 
-    async def test_fail_update_vim_state(self):
+    async def test_fail_update_vim_state_activity__updates_vim_operation_state(self):
         activities = [
             mock_test_vim_connectivity,
             self.mock_update_vim_state_raises,
             self.mock_update_vim_operation_state,
         ]
-        expected_vim_state = [VimState.ENABLED, VimState.ERROR]
+        retry_policy = 3
+        expected_vim_state = [VimState.ENABLED] * retry_policy
+        expected_vim_op_state = [VimOperationState.COMPLETED]
         async with self.env, self.get_worker(activities):
             with self.assertRaises(WorkflowFailureError):
-                result = await self.client.execute_workflow(
+                await self.client.execute_workflow(
                     self.workflow_name,
                     arg=self.vim_operation_input,
                     id=self.worflow_id,
                     task_queue=self.task_queue_name,
                 )
-                self.assertIsNone(result)
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
 
-        assert_vim_state(
-            self.mock_update_vim_state_tracker.call_args_list,
-            self.vim_id,
-            expected_vim_state,
-        )
-        self.mock_update_vim_operation_state_tracker.assert_not_called()
-
-    async def test_fail_update_vim_op_state(self):
+    async def test_fail_update_vim_op_state_activity__updates_vim_state(self):
         activities = [
             mock_test_vim_connectivity,
             self.mock_update_vim_state,
             self.mock_update_vim_operation_state_raises,
         ]
-        expected_vim_state = [VimState.ENABLED, VimState.ERROR]
-        expected_vim_op_state = [VimOperationState.COMPLETED, VimOperationState.FAILED]
+        expected_vim_state = [VimState.ENABLED]
+        retry_policy = 3
+        expected_vim_op_state = [VimOperationState.COMPLETED] * retry_policy
         async with self.env, self.get_worker(activities):
             with self.assertRaises(WorkflowFailureError):
-                result = await self.client.execute_workflow(
+                await self.client.execute_workflow(
                     self.workflow_name,
                     arg=self.vim_operation_input,
                     id=self.worflow_id,
                     task_queue=self.task_queue_name,
                 )
-                self.assertIsNone(result)
-        self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
 
-    async def test_activity_error(self):
+    async def test_connectivity_activity_failure__updates_vim_state_and_vim_op_state_to_failure(
+        self,
+    ):
         activities = [
             mock_test_vim_connectivity_raises,
             self.mock_update_vim_state,
@@ -231,15 +216,74 @@ class TestVimWorkflow(TestVimWorkflowsBase):
         expected_vim_op_state = [VimOperationState.FAILED]
         async with self.env, self.get_worker(activities):
             with self.assertRaises(WorkflowFailureError):
-                result = await self.client.execute_workflow(
+                await self.client.execute_workflow(
                     self.workflow_name,
                     arg=self.vim_operation_input,
                     id=self.worflow_id,
                     task_queue=self.task_queue_name,
                 )
-                self.assertIsNone(result)
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
+
+    async def test_connectivity_and_vim_state_update_activity_failures(self):
+        activities = [
+            mock_test_vim_connectivity_raises,
+            self.mock_update_vim_state_raises,
+            self.mock_update_vim_operation_state,
+        ]
+        retry_policy = 3
+        expected_vim_state = [VimState.ERROR] * retry_policy
+        expected_vim_op_state = [VimOperationState.FAILED]
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError):
+                await self.client.execute_workflow(
+                    self.workflow_name,
+                    arg=self.vim_operation_input,
+                    id=self.worflow_id,
+                    task_queue=self.task_queue_name,
+                )
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
 
-        self.check_vim_states_update(expected_vim_state, expected_vim_op_state)
+    async def test_connectivity_and_vim_op_state_update_activity_failures(self):
+        activities = [
+            mock_test_vim_connectivity_raises,
+            self.mock_update_vim_state,
+            self.mock_update_vim_operation_state_raises,
+        ]
+        retry_policy = 3
+        expected_vim_state = [VimState.ERROR]
+        expected_vim_op_state = [VimOperationState.FAILED] * retry_policy
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError):
+                await self.client.execute_workflow(
+                    self.workflow_name,
+                    arg=self.vim_operation_input,
+                    id=self.worflow_id,
+                    task_queue=self.task_queue_name,
+                )
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
+
+    async def test_connectivity_vim_state_update_and_vim_op_state_update_failures(self):
+        activities = [
+            mock_test_vim_connectivity_raises,
+            self.mock_update_vim_state_raises,
+            self.mock_update_vim_operation_state_raises,
+        ]
+        retry_policy = 3
+        expected_vim_state = [VimState.ERROR] * retry_policy
+        expected_vim_op_state = [VimOperationState.FAILED] * retry_policy
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError):
+                await self.client.execute_workflow(
+                    self.workflow_name,
+                    arg=self.vim_operation_input,
+                    id=self.worflow_id,
+                    task_queue=self.task_queue_name,
+                )
+        self.check_vim_state_is_updated(expected_vim_state)
+        self.check_vim_op_state_is_updated(expected_vim_op_state)
 
 
 class TestVimDeleteWorkflow(TestVimWorkflowsBase):