From c91db8eea2b0a8e187a3a39f8357460ee2d2a861 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 27 Jul 2023 13:50:42 +0300 Subject: [PATCH] OSMENG-1171 DeleteNsRecords Workflow Change-Id: I6a2923879fc35872fe6a3f079423535944ba4777 Signed-off-by: gatici --- osm_lcm/temporal/ns_workflows.py | 93 ++++++++- osm_lcm/tests/test_ns_workflows.py | 298 ++++++++++++++++++++++++++--- 2 files changed, 361 insertions(+), 30 deletions(-) diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py index ea08f9d..570e6bb 100644 --- a/osm_lcm/temporal/ns_workflows.py +++ b/osm_lcm/temporal/ns_workflows.py @@ -15,24 +15,41 @@ # limitations under the License. import asyncio +from datetime import timedelta import traceback from osm_common.temporal.activities.paas import CreateModel from osm_common.temporal.activities.ns import ( + DeleteNsRecord, GetVnfDetails, GetNsRecord, UpdateNsState, ) from osm_common.temporal.workflows.lcm import LcmOperationWorkflow -from osm_common.temporal.workflows.ns import NsInstantiateWorkflow -from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow +from osm_common.temporal.workflows.ns import ( + NsInstantiateWorkflow, + NsDeleteRecordsWorkflow, +) +from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow +from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from osm_common.temporal.states import NsState from temporalio import workflow +from temporalio.common import RetryPolicy from temporalio.converter import value_to_type -from temporalio.exceptions import ActivityError, ChildWorkflowError +from temporalio.exceptions import ( + ActivityError, + ApplicationError, + ChildWorkflowError, + FailureError, +) + +_SANDBOXED = False +retry_policy = RetryPolicy(maximum_attempts=3) +no_retry_policy = RetryPolicy(maximum_attempts=1) +default_schedule_to_close_timeout = timedelta(minutes=10) -@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=False) +@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=_SANDBOXED) class NsInstantiateWorkflowImpl(LcmOperationWorkflow): @workflow.run async def wrap_nslcmop(self, workflow_input: NsInstantiateWorkflow.Input) -> None: @@ -152,3 +169,71 @@ class NsInstantiateWorkflowImpl(LcmOperationWorkflow): if vnf_config.get("member-vnf-index") == vnf_member_index_ref: return vnf_config return {} + + +@workflow.defn(name=NsDeleteRecordsWorkflow.__name__, sandboxed=_SANDBOXED) +class NsDeleteRecordsWorkflowImpl(NsDeleteRecordsWorkflow): + @workflow.run + async def run(self, workflow_input: NsDeleteRecordsWorkflow.Input) -> None: + ns_uuid = workflow_input.ns_uuid + try: + nsr = value_to_type( + GetNsRecord.Output, + await workflow.execute_activity( + activity=GetNsRecord.__name__, + arg=GetNsRecord.Input(ns_uuid), + activity_id=f"{GetNsRecord.__name__}-{ns_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ).nsr + instantiation_state = nsr.get("nsState") + if instantiation_state != NsState.NOT_INSTANTIATED.name: + raise ApplicationError( + f"NS must be in {NsState.NOT_INSTANTIATED.name} state", + non_retryable=True, + ) + vnf_details = value_to_type( + GetVnfDetails.Output, + await workflow.execute_activity( + activity=GetVnfDetails.__name__, + arg=GetVnfDetails.Input(ns_uuid=ns_uuid), + activity_id=f"{GetVnfDetails.__name__}-{ns_uuid}", + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ) + + await asyncio.gather( + *( + workflow.execute_child_workflow( + workflow=VnfDeleteWorkflow.__name__, + arg=VnfDeleteWorkflow.Input( + vnfr_uuid=vnfr_uuid, + ), + id=f"{VnfDeleteWorkflow.__name__}-{vnfr_uuid}", + ) + for vnfr_uuid, _ in vnf_details.vnf_details + ) + ) + await workflow.execute_activity( + activity=DeleteNsRecord.__name__, + arg=DeleteNsRecord.Input(ns_uuid), + activity_id=f"{DeleteNsRecord.__name__}-{ns_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + except FailureError as e: + if not hasattr(e, "cause") or e.cause is None: + self.logger.error( + f"{NsDeleteRecordsWorkflow.__name__} failed with {str(e)}" + ) + raise e + err_details = str(e.cause.with_traceback(e.__traceback__)) + self.logger.error( + f"{NsDeleteRecordsWorkflow.__name__} failed with {err_details}" + ) + raise e diff --git a/osm_lcm/tests/test_ns_workflows.py b/osm_lcm/tests/test_ns_workflows.py index 56852df..77a05bf 100644 --- a/osm_lcm/tests/test_ns_workflows.py +++ b/osm_lcm/tests/test_ns_workflows.py @@ -16,7 +16,9 @@ import asynctest import copy from datetime import timedelta +from typing import Any from unittest.mock import Mock, patch +import uuid from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from osm_common.temporal.activities.lcm import ( @@ -26,14 +28,17 @@ from osm_common.temporal.activities.ns import ( GetNsRecord, GetVnfDetails, UpdateNsState, + DeleteNsRecord, ) from osm_common.temporal.activities.paas import ( CreateModel, ) -from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow +from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow from osm_common.temporal.workflows.lcm import LcmOperationWorkflow from osm_common.temporal.states import LcmOperationState, NsState from osm_lcm.temporal.ns_workflows import ( + NsDeleteRecordsWorkflow, + NsDeleteRecordsWorkflowImpl, NsInstantiateWorkflow, NsInstantiateWorkflowImpl, ) @@ -42,6 +47,7 @@ from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.exceptions import ( ActivityError, + ApplicationError, ChildWorkflowError, RetryState, TimeoutError, @@ -106,6 +112,20 @@ class TestException(Exception): pass +class MockActivity: + def __init__(self, *args: Any, **kwargs: Any): + self.tracker = Mock(return_value=None) + + async def __call__(self, *args: Any, **kwargs: Any) -> Any: + return self.tracker(*args, **kwargs) + + def __setattr__(self, __name: str, __value: Any) -> None: + if __name in ("return_value", "side_effect"): + self.tracker.__setattr__(__name, __value) + else: + super().__setattr__(__name, __value) + + @activity.defn(name=CreateModel.__name__) async def mock_create_model(create_model_input: CreateModel.Input) -> None: pass @@ -117,17 +137,17 @@ async def mock_create_model_raises(create_model_input: CreateModel.Input) -> Non @activity.defn(name=GetVnfDetails.__name__) -async def mock_get_vnf_details( - get_vnf_details_input: GetVnfDetails.Input, -) -> GetVnfDetails.Output: - return GetVnfDetails.Output(vnf_details=vnf_details) +class MockGetVnfDetails(MockActivity): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tracker.return_value = GetVnfDetails.Output(vnf_details=vnf_details) @activity.defn(name=GetVnfDetails.__name__) -async def mock_get_vnf_details_raises( - et_vnf_details_input: GetVnfDetails.Input, -) -> GetVnfDetails.Output: - raise TestException(f"{GetVnfDetails.__name__} failed.") +class MockGetVnfDetailsRaises(MockActivity): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tracker.side_effect = TestException(f"{GetVnfDetails.__name__} failed.") @activity.defn(name=GetNsRecord.__name__) @@ -144,6 +164,24 @@ async def mock_get_ns_record_raise_exception( raise TestException(f"{GetNsRecord.__name__} failed.") +@activity.defn(name=GetNsRecord.__name__) +async def mock_get_ns_record_not_instantiated_state( + get_ns_record_input: GetNsRecord.Input, +) -> GetNsRecord.Output: + nsr = copy.deepcopy(sample_nsr) + nsr["nsState"] = NsState.NOT_INSTANTIATED.name + return GetNsRecord.Output(nsr=nsr) + + +@activity.defn(name=GetNsRecord.__name__) +async def mock_get_ns_record_instantiated_state( + get_ns_record_input: GetNsRecord.Input, +) -> GetNsRecord.Output: + nsr = copy.deepcopy(sample_nsr) + nsr["nsState"] = NsState.INSTANTIATED.name + return GetNsRecord.Output(nsr=nsr) + + @workflow.defn(name=VnfInstantiateWorkflow.__name__, sandboxed=SANDBOXED) class MockVnfInstantiateWorkflow: @workflow.run @@ -167,6 +205,29 @@ class MockVnfInstantiateWorkflowFailed: ) +@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED) +class MockVnfDeleteWorkflow: + @workflow.run + async def run(self, input: VnfDeleteWorkflow.Input) -> None: + pass + + +@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED) +class MockVnfDeleteWorkflowFailed: + @workflow.run + async def run(self, input: VnfDeleteWorkflow.Input) -> None: + raise ChildWorkflowError( + message=f"{VnfDeleteWorkflow.__name__} child workflow failed.", + namespace="default", + workflow_id="123", + run_id="1", + workflow_type=VnfDeleteWorkflow.__name__, + initiated_event_id=0, + started_event_id=0, + retry_state=RetryState.NON_RETRYABLE_FAILURE, + ) + + @patch( "osm_lcm.temporal.ns_workflows.NsInstantiateWorkflowImpl._get_namespace", new=mock_get_namespace, @@ -222,7 +283,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): async def test_instantiate_workflow__successful__update_lcm_op_state(self): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -239,7 +300,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): async def test_instantiate_workflow__successful__update_ns_state(self): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -257,7 +318,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model_raises, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -276,7 +337,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model_raises, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -296,7 +357,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model_raises, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -313,7 +374,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model_raises, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -333,7 +394,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record_raise_exception, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -353,7 +414,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record_raise_exception, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -370,7 +431,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details_raises, + MockGetVnfDetailsRaises(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -390,7 +451,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details_raises, + MockGetVnfDetailsRaises(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -409,7 +470,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): # Because of workflow task timeout policy, workflow times out activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -429,7 +490,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): # Workflow task failure activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -451,7 +512,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): # Workflow task failure activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -472,7 +533,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -491,7 +552,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -513,7 +574,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -534,7 +595,7 @@ class TestNsInstantiateWorkflow(asynctest.TestCase): ): activities = [ mock_create_model, - mock_get_vnf_details, + MockGetVnfDetails(), mock_get_ns_record, self.mock_update_ns_state, self.mock_update_lcm_operation_state, @@ -604,3 +665,188 @@ class TestGetVnfConfig(asynctest.TestCase): vnf_member_index_ref_2, sample_nsr ) self.assertEqual(result, {}) + + +class TestNsDeleteRecordsWorkflow(asynctest.TestCase): + async def setUp(self): + self.env = await WorkflowEnvironment.start_time_skipping() + self.client = self.env.client + self.task_queue = LCM_TASK_QUEUE + self.wf_input = NsDeleteRecordsWorkflow.Input(ns_uuid=ns_uuid) + self.mock_delete_ns_record_tracker = Mock() + + def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker: + return Worker( + self.client, + task_queue=task_queue, + workflows=workflows, + activities=activities, + debug_mode=DEBUG_MODE, + ) + + async def execute_workflow(self, wf_input): + return await self.client.execute_workflow( + NsDeleteRecordsWorkflowImpl.run, + arg=wf_input, + id=uuid.uuid4().hex, + task_queue=self.task_queue, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + + def assert_ns_record_is_called(self): + self.assertEqual( + self.mock_delete_ns_record_tracker.call_args_list[0].args[0], + DeleteNsRecord.Input(ns_uuid=ns_uuid), + ) + + @activity.defn(name=DeleteNsRecord.__name__) + async def mock_delete_ns_record( + self, + delete_ns_record_input: DeleteNsRecord.Input, + ) -> None: + self.mock_delete_ns_record_tracker(delete_ns_record_input) + + @activity.defn(name=DeleteNsRecord.__name__) + async def mock_delete_ns_record_raises_exception( + self, + delete_ns_record_input: DeleteNsRecord.Input, + ) -> None: + self.mock_delete_ns_record_tracker(delete_ns_record_input) + raise TestException(f"{DeleteNsRecord.__name__} failed.") + + async def test_delete_ns_records_workflow__successful(self): + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow] + activities = [ + MockGetVnfDetails(), + mock_get_ns_record_not_instantiated_state, + self.mock_delete_ns_record, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + await self.execute_workflow(self.wf_input) + self.assert_ns_record_is_called() + + @patch( + "temporalio.workflow.execute_child_workflow", + wraps=workflow.execute_child_workflow, + ) + async def test_ns_terminate_workflow__successful__executes_vnf_delete_child_workflows_concurrently( + self, + mock_execute_child_workflow: asynctest.CoroutineMock, + ): + # Choose a number of child workflows that would take longer to execute than the execution timeout + num_child_workflows = EXECUTION_TIMEOUT.seconds * 2 + 1 + mock_get_vnf_details = MockGetVnfDetails() + mock_get_vnf_details.return_value = GetVnfDetails.Output( + vnf_details=[ + (f"vnfr-{uuid.uuid4().hex}", vnf_member_index_ref_1) + for _ in range(num_child_workflows) + ] + ) + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow] + activities = [ + mock_get_vnf_details, + mock_get_ns_record_not_instantiated_state, + self.mock_delete_ns_record, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + await self.execute_workflow(self.wf_input) + self.assert_ns_record_is_called() + self.assertEqual(mock_execute_child_workflow.call_count, num_child_workflows) + # Check that VnfDeleteWorkflow is executed + for i in range(0, num_child_workflows): + call_args = mock_execute_child_workflow.call_args_list[i].kwargs["workflow"] + self.assertEqual(call_args, VnfDeleteWorkflow.__name__) + + async def test_delete_ns_records_workflow__instantiated_state__fails(self): + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow] + activities = [ + MockGetVnfDetails(), + mock_get_ns_record_instantiated_state, + self.mock_delete_ns_record, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertIsInstance(err.exception.cause, ApplicationError) + self.assertEqual( + str(err.exception.cause), + "NS must be in NOT_INSTANTIATED state", + ) + self.mock_delete_ns_record_tracker.assert_not_called() + + async def test_delete_ns_records_workflow__get_vnf_details_fails__workflow_fails( + self, + ): + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow] + activities = [ + MockGetVnfDetailsRaises(), + mock_get_ns_record_not_instantiated_state, + self.mock_delete_ns_record, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertIsInstance(err.exception.cause, ActivityError) + self.assertEqual( + str(err.exception.cause.cause), + f"TestException: {GetVnfDetails.__name__} failed.", + ) + self.mock_delete_ns_record_tracker.assert_not_called() + + async def test_delete_ns_records_workflow__get_ns_record_fails__workflow_fails( + self, + ): + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow] + activities = [ + MockGetVnfDetails(), + mock_get_ns_record_raise_exception, + self.mock_delete_ns_record, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertIsInstance(err.exception.cause, ActivityError) + self.assertEqual( + str(err.exception.cause.cause), + f"TestException: {GetNsRecord.__name__} failed.", + ) + self.mock_delete_ns_record_tracker.assert_not_called() + + async def test_delete_ns_records_workflow__delete_ns_record_fails__workflow_fails( + self, + ): + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow] + activities = [ + MockGetVnfDetails(), + mock_get_ns_record_not_instantiated_state, + self.mock_delete_ns_record_raises_exception, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertIsInstance(err.exception.cause, ActivityError) + self.assertEqual( + str(err.exception.cause.cause), + f"TestException: {DeleteNsRecord.__name__} failed.", + ) + self.mock_delete_ns_record_tracker.assert_called() + + async def test_delete_ns_records_workflow__vnf_delete_workflow_failed__workflow_fails( + self, + ): + workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflowFailed] + activities = [ + MockGetVnfDetails(), + mock_get_ns_record_not_instantiated_state, + self.mock_delete_ns_record, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertIsInstance(err.exception.cause, ChildWorkflowError) + self.assertEqual( + str(err.exception.cause.cause), + "VnfDeleteWorkflow child workflow failed.", + ) + self.mock_delete_ns_record_tracker.assert_not_called() -- 2.25.1