From: Dario Faccin Date: Mon, 31 Jul 2023 07:23:49 +0000 (+0200) Subject: OSMENG-1088: NS Terminate Workflow X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=9fc3060ad06d9171f4f9bb31b1bbd0d5ecbd7dae;p=osm%2FLCM.git OSMENG-1088: NS Terminate Workflow Add workflow implementation Change-Id: I5ee73a4bdbc4dd31b1fd6f6155b49ff4af6d3fb1 Signed-off-by: Dario Faccin --- diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py index 570e6bb..b11fc02 100644 --- a/osm_lcm/temporal/ns_workflows.py +++ b/osm_lcm/temporal/ns_workflows.py @@ -18,19 +18,31 @@ import asyncio from datetime import timedelta import traceback -from osm_common.temporal.activities.paas import CreateModel +from osm_common.temporal.activities.paas import ( + CreateModel, + RemoveModel, + CheckModelIsRemoved, +) from osm_common.temporal.activities.ns import ( DeleteNsRecord, GetVnfDetails, GetNsRecord, UpdateNsState, ) +from osm_common.temporal.activities.vnf import ( + GetModelNames, +) from osm_common.temporal.workflows.lcm import LcmOperationWorkflow from osm_common.temporal.workflows.ns import ( NsInstantiateWorkflow, + NsTerminateWorkflow, NsDeleteRecordsWorkflow, ) -from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow +from osm_common.temporal.workflows.vnf import ( + VnfInstantiateWorkflow, + VnfTerminateWorkflow, + VnfDeleteWorkflow, +) from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from osm_common.temporal.states import NsState from temporalio import workflow @@ -171,6 +183,132 @@ class NsInstantiateWorkflowImpl(LcmOperationWorkflow): return {} +@workflow.defn(name=NsTerminateWorkflow.__name__, sandboxed=False) +class NsTerminateWorkflowImpl(NsTerminateWorkflow): + @workflow.run + async def run(self, workflow_input: NsTerminateWorkflow.Input) -> None: + try: + vnfs_details = value_to_type( + GetVnfDetails.Output, + await workflow.execute_activity( + activity=GetVnfDetails.__name__, + arg=GetVnfDetails.Input(ns_uuid=workflow_input.ns_uuid), + activity_id=f"{GetVnfDetails.__name__}-{workflow_input.ns_uuid}", + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ) + await asyncio.gather( + *( + workflow.execute_child_workflow( + workflow=VnfTerminateWorkflow.__name__, + arg=VnfTerminateWorkflow.Input( + vnfr_uuid=vnfr_uuid, + ), + id=f"{VnfTerminateWorkflow.__name__}-{vnfr_uuid}", + ) + for vnfr_uuid, _ in vnfs_details.vnf_details + ) + ) + models_names = value_to_type( + GetModelNames.Output, + await workflow.execute_activity( + activity=GetModelNames.__name__, + arg=GetModelNames.Input(ns_uuid=workflow_input.ns_uuid), + activity_id=f"{GetModelNames.__name__}-{workflow_input.ns_uuid}", + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ) + await asyncio.gather( + *( + NsTerminateWorkflowImpl.remove_model( + vim_uuid=workflow_input.vim_uuid, + model_name=model_name, + force_remove=False, + ) + for model_name in models_names.model_names + ) + ) + + try: + await asyncio.gather( + *( + NsTerminateWorkflowImpl.check_model_is_removed( + vim_uuid=workflow_input.vim_uuid, + model_name=model_name, + ) + for model_name in models_names.model_names + ) + ) + + except ActivityError: + self.logger.info("Removing models forcefully.") + await asyncio.gather( + *( + NsTerminateWorkflowImpl.remove_model( + vim_uuid=workflow_input.vim_uuid, + model_name=model_name, + force_remove=True, + ) + for model_name in models_names.model_names + ) + ) + + try: + await asyncio.gather( + *( + NsTerminateWorkflowImpl.check_model_is_removed( + vim_uuid=workflow_input.vim_uuid, + model_name=model_name, + ) + for model_name in models_names.model_names + ) + ) + except ActivityError as check_model_removal_error: + self.logger.error("Some models could not be removed.") + raise check_model_removal_error + + except FailureError as e: + if not hasattr(e, "cause") or e.cause is None: + raise e + err_details = str(e.cause.with_traceback(e.__traceback__)) + self.logger.error( + f"{NsTerminateWorkflow.__name__} failed with {err_details}" + ) + raise e + + @staticmethod + async def remove_model(vim_uuid, model_name, force_remove): + await workflow.execute_activity( + activity=RemoveModel.__name__, + arg=RemoveModel.Input( + vim_uuid=vim_uuid, + model_name=model_name, + force_remove=force_remove, + ), + activity_id=f"{RemoveModel.__name__}-{vim_uuid}-{model_name}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + @staticmethod + async def check_model_is_removed(vim_uuid, model_name): + await workflow.execute_activity( + activity=CheckModelIsRemoved.__name__, + arg=CheckModelIsRemoved.Input( + vim_uuid=vim_uuid, + model_name=model_name, + ), + activity_id=f"{CheckModelIsRemoved.__name__}-{vim_uuid}-{model_name}", + task_queue=LCM_TASK_QUEUE, + start_to_close_timeout=timedelta(minutes=5), + heartbeat_timeout=timedelta(seconds=30), + retry_policy=retry_policy, + ) + + @workflow.defn(name=NsDeleteRecordsWorkflow.__name__, sandboxed=_SANDBOXED) class NsDeleteRecordsWorkflowImpl(NsDeleteRecordsWorkflow): @workflow.run diff --git a/osm_lcm/tests/test_ns_workflows.py b/osm_lcm/tests/test_ns_workflows.py index 77a05bf..de0c147 100644 --- a/osm_lcm/tests/test_ns_workflows.py +++ b/osm_lcm/tests/test_ns_workflows.py @@ -13,14 +13,27 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import asynctest +import asyncio import copy from datetime import timedelta from typing import Any from unittest.mock import Mock, patch import uuid -from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE +import asynctest +from temporalio import activity, workflow +from temporalio.client import WorkflowFailureError +from temporalio.common import RetryPolicy +from temporalio.exceptions import ( + ActivityError, + ApplicationError, + ChildWorkflowError, + RetryState, + TimeoutError, +) +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + from osm_common.temporal.activities.lcm import ( UpdateNsLcmOperationState, ) @@ -32,29 +45,26 @@ from osm_common.temporal.activities.ns import ( ) from osm_common.temporal.activities.paas import ( CreateModel, + RemoveModel, + CheckModelIsRemoved, ) -from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow -from osm_common.temporal.workflows.lcm import LcmOperationWorkflow +from osm_common.temporal.activities.vnf import GetModelNames from osm_common.temporal.states import LcmOperationState, NsState +from osm_common.temporal.workflows.lcm import LcmOperationWorkflow +from osm_common.temporal.workflows.vnf import ( + VnfInstantiateWorkflow, + VnfTerminateWorkflow, + VnfDeleteWorkflow, +) +from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from osm_lcm.temporal.ns_workflows import ( NsDeleteRecordsWorkflow, NsDeleteRecordsWorkflowImpl, NsInstantiateWorkflow, NsInstantiateWorkflowImpl, + NsTerminateWorkflow, + NsTerminateWorkflowImpl, ) -from temporalio import activity, workflow -from temporalio.client import WorkflowFailureError -from temporalio.common import RetryPolicy -from temporalio.exceptions import ( - ActivityError, - ApplicationError, - ChildWorkflowError, - RetryState, - TimeoutError, -) -from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Worker - vnfr_uuid_1 = "828d91ee-fa04-43bb-8471-f66ea74597e7" vnfr_uuid_2 = "c4bbeb41-df7e-4daa-863d-c8fd29fac96d" @@ -104,8 +114,8 @@ retry_policy = RetryPolicy( ) SANDBOXED = False DEBUG_MODE = True -TASK_TIMEOUT = timedelta(seconds=0.5) -EXECUTION_TIMEOUT = timedelta(seconds=1) +TASK_TIMEOUT = timedelta(seconds=5) +EXECUTION_TIMEOUT = timedelta(seconds=10) class TestException(Exception): @@ -136,6 +146,53 @@ async def mock_create_model_raises(create_model_input: CreateModel.Input) -> Non raise TestException(f"{CreateModel.__name__} failed.") +@activity.defn(name=RemoveModel.__name__) +async def mock_remove_model(remove_model_input: RemoveModel.Input) -> None: + pass + + +@activity.defn(name=RemoveModel.__name__) +async def mock_remove_model_raises(remove_model_input: RemoveModel.Input) -> None: + raise TestException(f"{RemoveModel.__name__} failed.") + + +@activity.defn(name=CheckModelIsRemoved.__name__) +async def mock_check_model_is_removed( + check_model_is_removed_input: CheckModelIsRemoved.Input, +) -> None: + pass + + +@activity.defn(name=CheckModelIsRemoved.__name__) +async def mock_check_model_is_removed_raises( + check_model_is_removed_input: CheckModelIsRemoved.Input, +) -> None: + raise TestException(f"{CheckModelIsRemoved.__name__} failed.") + + +@activity.defn(name=CheckModelIsRemoved.__name__) +async def mock_check_model_is_removed_cancelled( + check_model_is_removed_input: CheckModelIsRemoved.Input, +) -> None: + raise asyncio.exceptions.CancelledError( + f"{CheckModelIsRemoved.__name__} cancelled." + ) + + +@activity.defn(name=GetModelNames.__name__) +async def mock_get_model_names( + get_model_names_input: GetModelNames.Input, +) -> GetModelNames.Output: + return GetModelNames.Output({"namespace-1", "namespace-2"}) + + +@activity.defn(name=GetModelNames.__name__) +async def mock_get_model_names_raises( + get_model_names_input: GetModelNames.Input, +) -> GetModelNames.Output: + raise TestException(f"{GetModelNames.__name__} failed.") + + @activity.defn(name=GetVnfDetails.__name__) class MockGetVnfDetails(MockActivity): def __init__(self, *args: Any, **kwargs: Any): @@ -205,6 +262,29 @@ class MockVnfInstantiateWorkflowFailed: ) +@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=SANDBOXED) +class MockVnfTerminateWorkflow: + @workflow.run + async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None: + pass + + +@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=SANDBOXED) +class MockVnfTerminateWorkflowFailed: + @workflow.run + async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None: + raise ChildWorkflowError( + message=f"{VnfTerminateWorkflow.__name__} child workflow failed.", + namespace="default", + workflow_id="123", + run_id="1", + workflow_type=VnfTerminateWorkflow.__name__, + initiated_event_id=0, + started_event_id=0, + retry_state=RetryState.NON_RETRYABLE_FAILURE, + ) + + @workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED) class MockVnfDeleteWorkflow: @workflow.run @@ -667,6 +747,214 @@ class TestGetVnfConfig(asynctest.TestCase): self.assertEqual(result, {}) +class TestNsTerminateWorkflow(asynctest.TestCase): + async def setUp(self): + self.wf_input = NsTerminateWorkflow.Input( + ns_uuid="ns-uuid", + vim_uuid="vim-uuid", + ) + self.env = await WorkflowEnvironment.start_time_skipping() + self.client = self.env.client + + # Maps workflow base classes to implementation classes. + # Can be overridden by individual tests to swap out implementations by + # setting self.workflows[] = + self.workflows = [MockVnfTerminateWorkflow, NsTerminateWorkflowImpl] + self.mock_get_vnf_details = MockGetVnfDetails() + self.mock_get_models_names = mock_get_model_names + self.task_queue = LCM_TASK_QUEUE + self.workflow_input = NsTerminateWorkflow.Input( + ns_uuid="my-ns-uuid", + vim_uuid="my-vim-uuid", + ) + + def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker: + return Worker( + self.client, + task_queue=task_queue, + workflows=workflows, + activities=activities, + debug_mode=DEBUG_MODE, + ) + + async def execute_workflow(self, wf_input): + return await self.client.execute_workflow( + NsTerminateWorkflowImpl.run, + arg=wf_input, + id=uuid.uuid4().hex, + task_queue=self.task_queue, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + + @patch( + "temporalio.workflow.execute_child_workflow", + wraps=workflow.execute_child_workflow, + ) + async def test_ns_terminate_workflow__successful__executes_child_workflow( + self, + mock_execute_child_workflow: asynctest.CoroutineMock, + ): + workflows = [ + MockVnfTerminateWorkflow, + NsTerminateWorkflowImpl, + ] + activities = [ + MockGetVnfDetails(), + mock_get_model_names, + mock_remove_model, + mock_check_model_is_removed, + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + await self.execute_workflow(self.wf_input) + self.assertTrue(mock_execute_child_workflow.called) + self.assertEqual(mock_execute_child_workflow.call_count, 2) + call_args = mock_execute_child_workflow.call_args_list[0].kwargs["workflow"] + self.assertEqual(call_args, VnfTerminateWorkflow.__name__) + + @patch( + "temporalio.workflow.execute_child_workflow", + wraps=workflow.execute_child_workflow, + ) + async def test_ns_terminate_workflow__successful__executes_child_workflows_concurrently( + self, + mock_execute_child_workflow: asynctest.CoroutineMock, + ): + # Choose a number of child workflows that would take longer to execute than the execution timeout + num_child_workflows = EXECUTION_TIMEOUT.seconds * 2 + 1 + workflows = [ + MockVnfTerminateWorkflow, + NsTerminateWorkflowImpl, + ] + mock_get_vnf_details = MockGetVnfDetails() + mock_get_vnf_details.return_value = GetVnfDetails.Output( + vnf_details=[ + (f"vnfr-{uuid.uuid4().hex}", vnf_member_index_ref_1) + for _ in range(num_child_workflows) + ] + ) + activities = [ + mock_get_vnf_details, + mock_get_model_names, + mock_remove_model, + mock_check_model_is_removed, + ] + + # Set up the return values with a unique workflow ID for each child workflow + mock_get_model_names.side_effect = [ + f"namespace-{uuid.uuid4().hex}" for _ in range(num_child_workflows) + ] + async with self.env, self.get_worker(self.task_queue, workflows, activities): + await self.execute_workflow(self.wf_input) + + self.assertEqual(mock_execute_child_workflow.call_count, num_child_workflows) + # Check that VnfTerminateWorkflow is executed + for i in range(0, num_child_workflows): + call_args = mock_execute_child_workflow.call_args_list[i].kwargs["workflow"] + self.assertEqual(call_args, VnfTerminateWorkflow.__name__) + + async def test_ns_terminate_workflow__get_vnf_details_failed__raises_activity_error( + self, + ): + activities = [ + MockGetVnfDetailsRaises(), + mock_get_model_names, + mock_remove_model, + mock_check_model_is_removed, + ] + async with self.env, self.get_worker( + self.task_queue, self.workflows, activities + ): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertTrue(isinstance(err.exception.cause, ActivityError)) + + async def test_ns_terminate_workflow__get_model_names_failed__raises_activity_error( + self, + ): + activities = [ + MockGetVnfDetails(), + mock_get_model_names_raises, + mock_remove_model, + mock_check_model_is_removed, + ] + async with self.env, self.get_worker( + self.task_queue, self.workflows, activities + ): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertTrue(isinstance(err.exception.cause, ActivityError)) + + async def test_ns_terminate_workflow__remove_model_failed__raises_activity_error( + self, + ): + activities = [ + MockGetVnfDetails(), + mock_get_model_names, + mock_remove_model_raises, + mock_check_model_is_removed, + ] + async with self.env, self.get_worker( + self.task_queue, self.workflows, activities + ): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertTrue(isinstance(err.exception.cause, ActivityError)) + + async def test_ns_terminate_workflow__check_model_removed_failed__raises_activity_error( + self, + ): + activities = [ + MockGetVnfDetails(), + mock_get_model_names, + mock_remove_model, + mock_check_model_is_removed_raises, + ] + async with self.env, self.get_worker( + self.task_queue, self.workflows, activities + ): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertTrue(isinstance(err.exception.cause, ActivityError)) + + async def test_ns_terminate_workflow__wf_vnf_terminate_failed__raises_child_wf_error( + self, + ): + activities = [ + MockGetVnfDetails(), + mock_get_model_names, + mock_remove_model, + mock_check_model_is_removed, + ] + async with self.env, self.get_worker( + self.task_queue, + [ + NsTerminateWorkflowImpl, + MockVnfTerminateWorkflowFailed, + ], + activities, + ): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertTrue(isinstance(err.exception.cause.cause, ChildWorkflowError)) + + async def test_ns_terminate__check_model_is_removed_cancelled__activity_error_is_raised( + self, + ): + activities = [ + MockGetVnfDetails(), + mock_get_model_names, + mock_remove_model, + mock_check_model_is_removed_cancelled, + ] + async with self.env, self.get_worker( + self.task_queue, self.workflows, activities + ): + with self.assertRaises(WorkflowFailureError) as err: + await self.execute_workflow(self.wf_input) + self.assertTrue(isinstance(err.exception.cause, ActivityError)) + + class TestNsDeleteRecordsWorkflow(asynctest.TestCase): async def setUp(self): self.env = await WorkflowEnvironment.start_time_skipping()