OSMENG-1088: NS Terminate Workflow 89/13789/6 paas
authorDario Faccin <dario.faccin@canonical.com>
Mon, 31 Jul 2023 07:23:49 +0000 (09:23 +0200)
committerDario Faccin <dario.faccin@canonical.com>
Thu, 3 Aug 2023 08:05:06 +0000 (10:05 +0200)
Add workflow implementation

Change-Id: I5ee73a4bdbc4dd31b1fd6f6155b49ff4af6d3fb1
Signed-off-by: Dario Faccin <dario.faccin@canonical.com>
osm_lcm/temporal/ns_workflows.py
osm_lcm/tests/test_ns_workflows.py

index 570e6bb..b11fc02 100644 (file)
@@ -18,19 +18,31 @@ import asyncio
 from datetime import timedelta
 import traceback
 
-from osm_common.temporal.activities.paas import CreateModel
+from osm_common.temporal.activities.paas import (
+    CreateModel,
+    RemoveModel,
+    CheckModelIsRemoved,
+)
 from osm_common.temporal.activities.ns import (
     DeleteNsRecord,
     GetVnfDetails,
     GetNsRecord,
     UpdateNsState,
 )
+from osm_common.temporal.activities.vnf import (
+    GetModelNames,
+)
 from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
 from osm_common.temporal.workflows.ns import (
     NsInstantiateWorkflow,
+    NsTerminateWorkflow,
     NsDeleteRecordsWorkflow,
 )
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
+from osm_common.temporal.workflows.vnf import (
+    VnfInstantiateWorkflow,
+    VnfTerminateWorkflow,
+    VnfDeleteWorkflow,
+)
 from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
 from osm_common.temporal.states import NsState
 from temporalio import workflow
@@ -171,6 +183,132 @@ class NsInstantiateWorkflowImpl(LcmOperationWorkflow):
         return {}
 
 
+@workflow.defn(name=NsTerminateWorkflow.__name__, sandboxed=False)
+class NsTerminateWorkflowImpl(NsTerminateWorkflow):
+    @workflow.run
+    async def run(self, workflow_input: NsTerminateWorkflow.Input) -> None:
+        try:
+            vnfs_details = value_to_type(
+                GetVnfDetails.Output,
+                await workflow.execute_activity(
+                    activity=GetVnfDetails.__name__,
+                    arg=GetVnfDetails.Input(ns_uuid=workflow_input.ns_uuid),
+                    activity_id=f"{GetVnfDetails.__name__}-{workflow_input.ns_uuid}",
+                    schedule_to_close_timeout=default_schedule_to_close_timeout,
+                    retry_policy=retry_policy,
+                ),
+            )
+            await asyncio.gather(
+                *(
+                    workflow.execute_child_workflow(
+                        workflow=VnfTerminateWorkflow.__name__,
+                        arg=VnfTerminateWorkflow.Input(
+                            vnfr_uuid=vnfr_uuid,
+                        ),
+                        id=f"{VnfTerminateWorkflow.__name__}-{vnfr_uuid}",
+                    )
+                    for vnfr_uuid, _ in vnfs_details.vnf_details
+                )
+            )
+            models_names = value_to_type(
+                GetModelNames.Output,
+                await workflow.execute_activity(
+                    activity=GetModelNames.__name__,
+                    arg=GetModelNames.Input(ns_uuid=workflow_input.ns_uuid),
+                    activity_id=f"{GetModelNames.__name__}-{workflow_input.ns_uuid}",
+                    schedule_to_close_timeout=default_schedule_to_close_timeout,
+                    retry_policy=retry_policy,
+                ),
+            )
+            await asyncio.gather(
+                *(
+                    NsTerminateWorkflowImpl.remove_model(
+                        vim_uuid=workflow_input.vim_uuid,
+                        model_name=model_name,
+                        force_remove=False,
+                    )
+                    for model_name in models_names.model_names
+                )
+            )
+
+            try:
+                await asyncio.gather(
+                    *(
+                        NsTerminateWorkflowImpl.check_model_is_removed(
+                            vim_uuid=workflow_input.vim_uuid,
+                            model_name=model_name,
+                        )
+                        for model_name in models_names.model_names
+                    )
+                )
+
+            except ActivityError:
+                self.logger.info("Removing models forcefully.")
+                await asyncio.gather(
+                    *(
+                        NsTerminateWorkflowImpl.remove_model(
+                            vim_uuid=workflow_input.vim_uuid,
+                            model_name=model_name,
+                            force_remove=True,
+                        )
+                        for model_name in models_names.model_names
+                    )
+                )
+
+                try:
+                    await asyncio.gather(
+                        *(
+                            NsTerminateWorkflowImpl.check_model_is_removed(
+                                vim_uuid=workflow_input.vim_uuid,
+                                model_name=model_name,
+                            )
+                            for model_name in models_names.model_names
+                        )
+                    )
+                except ActivityError as check_model_removal_error:
+                    self.logger.error("Some models could not be removed.")
+                    raise check_model_removal_error
+
+        except FailureError as e:
+            if not hasattr(e, "cause") or e.cause is None:
+                raise e
+            err_details = str(e.cause.with_traceback(e.__traceback__))
+            self.logger.error(
+                f"{NsTerminateWorkflow.__name__} failed with {err_details}"
+            )
+            raise e
+
+    @staticmethod
+    async def remove_model(vim_uuid, model_name, force_remove):
+        await workflow.execute_activity(
+            activity=RemoveModel.__name__,
+            arg=RemoveModel.Input(
+                vim_uuid=vim_uuid,
+                model_name=model_name,
+                force_remove=force_remove,
+            ),
+            activity_id=f"{RemoveModel.__name__}-{vim_uuid}-{model_name}",
+            task_queue=LCM_TASK_QUEUE,
+            schedule_to_close_timeout=default_schedule_to_close_timeout,
+            retry_policy=retry_policy,
+        )
+
+    @staticmethod
+    async def check_model_is_removed(vim_uuid, model_name):
+        await workflow.execute_activity(
+            activity=CheckModelIsRemoved.__name__,
+            arg=CheckModelIsRemoved.Input(
+                vim_uuid=vim_uuid,
+                model_name=model_name,
+            ),
+            activity_id=f"{CheckModelIsRemoved.__name__}-{vim_uuid}-{model_name}",
+            task_queue=LCM_TASK_QUEUE,
+            start_to_close_timeout=timedelta(minutes=5),
+            heartbeat_timeout=timedelta(seconds=30),
+            retry_policy=retry_policy,
+        )
+
+
 @workflow.defn(name=NsDeleteRecordsWorkflow.__name__, sandboxed=_SANDBOXED)
 class NsDeleteRecordsWorkflowImpl(NsDeleteRecordsWorkflow):
     @workflow.run
index 77a05bf..de0c147 100644 (file)
 # implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import asynctest
+import asyncio
 import copy
 from datetime import timedelta
 from typing import Any
 from unittest.mock import Mock, patch
 import uuid
 
-from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+import asynctest
+from temporalio import activity, workflow
+from temporalio.client import WorkflowFailureError
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import (
+    ActivityError,
+    ApplicationError,
+    ChildWorkflowError,
+    RetryState,
+    TimeoutError,
+)
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
 from osm_common.temporal.activities.lcm import (
     UpdateNsLcmOperationState,
 )
@@ -32,29 +45,26 @@ from osm_common.temporal.activities.ns import (
 )
 from osm_common.temporal.activities.paas import (
     CreateModel,
+    RemoveModel,
+    CheckModelIsRemoved,
 )
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
-from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
+from osm_common.temporal.activities.vnf import GetModelNames
 from osm_common.temporal.states import LcmOperationState, NsState
+from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
+from osm_common.temporal.workflows.vnf import (
+    VnfInstantiateWorkflow,
+    VnfTerminateWorkflow,
+    VnfDeleteWorkflow,
+)
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
 from osm_lcm.temporal.ns_workflows import (
     NsDeleteRecordsWorkflow,
     NsDeleteRecordsWorkflowImpl,
     NsInstantiateWorkflow,
     NsInstantiateWorkflowImpl,
+    NsTerminateWorkflow,
+    NsTerminateWorkflowImpl,
 )
-from temporalio import activity, workflow
-from temporalio.client import WorkflowFailureError
-from temporalio.common import RetryPolicy
-from temporalio.exceptions import (
-    ActivityError,
-    ApplicationError,
-    ChildWorkflowError,
-    RetryState,
-    TimeoutError,
-)
-from temporalio.testing import WorkflowEnvironment
-from temporalio.worker import Worker
-
 
 vnfr_uuid_1 = "828d91ee-fa04-43bb-8471-f66ea74597e7"
 vnfr_uuid_2 = "c4bbeb41-df7e-4daa-863d-c8fd29fac96d"
@@ -104,8 +114,8 @@ retry_policy = RetryPolicy(
 )
 SANDBOXED = False
 DEBUG_MODE = True
-TASK_TIMEOUT = timedelta(seconds=0.5)
-EXECUTION_TIMEOUT = timedelta(seconds=1)
+TASK_TIMEOUT = timedelta(seconds=5)
+EXECUTION_TIMEOUT = timedelta(seconds=10)
 
 
 class TestException(Exception):
@@ -136,6 +146,53 @@ async def mock_create_model_raises(create_model_input: CreateModel.Input) -> Non
     raise TestException(f"{CreateModel.__name__} failed.")
 
 
+@activity.defn(name=RemoveModel.__name__)
+async def mock_remove_model(remove_model_input: RemoveModel.Input) -> None:
+    pass
+
+
+@activity.defn(name=RemoveModel.__name__)
+async def mock_remove_model_raises(remove_model_input: RemoveModel.Input) -> None:
+    raise TestException(f"{RemoveModel.__name__} failed.")
+
+
+@activity.defn(name=CheckModelIsRemoved.__name__)
+async def mock_check_model_is_removed(
+    check_model_is_removed_input: CheckModelIsRemoved.Input,
+) -> None:
+    pass
+
+
+@activity.defn(name=CheckModelIsRemoved.__name__)
+async def mock_check_model_is_removed_raises(
+    check_model_is_removed_input: CheckModelIsRemoved.Input,
+) -> None:
+    raise TestException(f"{CheckModelIsRemoved.__name__} failed.")
+
+
+@activity.defn(name=CheckModelIsRemoved.__name__)
+async def mock_check_model_is_removed_cancelled(
+    check_model_is_removed_input: CheckModelIsRemoved.Input,
+) -> None:
+    raise asyncio.exceptions.CancelledError(
+        f"{CheckModelIsRemoved.__name__} cancelled."
+    )
+
+
+@activity.defn(name=GetModelNames.__name__)
+async def mock_get_model_names(
+    get_model_names_input: GetModelNames.Input,
+) -> GetModelNames.Output:
+    return GetModelNames.Output({"namespace-1", "namespace-2"})
+
+
+@activity.defn(name=GetModelNames.__name__)
+async def mock_get_model_names_raises(
+    get_model_names_input: GetModelNames.Input,
+) -> GetModelNames.Output:
+    raise TestException(f"{GetModelNames.__name__} failed.")
+
+
 @activity.defn(name=GetVnfDetails.__name__)
 class MockGetVnfDetails(MockActivity):
     def __init__(self, *args: Any, **kwargs: Any):
@@ -205,6 +262,29 @@ class MockVnfInstantiateWorkflowFailed:
         )
 
 
+@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfTerminateWorkflow:
+    @workflow.run
+    async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None:
+        pass
+
+
+@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfTerminateWorkflowFailed:
+    @workflow.run
+    async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None:
+        raise ChildWorkflowError(
+            message=f"{VnfTerminateWorkflow.__name__} child workflow failed.",
+            namespace="default",
+            workflow_id="123",
+            run_id="1",
+            workflow_type=VnfTerminateWorkflow.__name__,
+            initiated_event_id=0,
+            started_event_id=0,
+            retry_state=RetryState.NON_RETRYABLE_FAILURE,
+        )
+
+
 @workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED)
 class MockVnfDeleteWorkflow:
     @workflow.run
@@ -667,6 +747,214 @@ class TestGetVnfConfig(asynctest.TestCase):
         self.assertEqual(result, {})
 
 
+class TestNsTerminateWorkflow(asynctest.TestCase):
+    async def setUp(self):
+        self.wf_input = NsTerminateWorkflow.Input(
+            ns_uuid="ns-uuid",
+            vim_uuid="vim-uuid",
+        )
+        self.env = await WorkflowEnvironment.start_time_skipping()
+        self.client = self.env.client
+
+        # Maps workflow base classes to implementation classes.
+        # Can be overridden by individual tests to swap out implementations by
+        # setting self.workflows[<workflow_base_class>] = <implementation_class>
+        self.workflows = [MockVnfTerminateWorkflow, NsTerminateWorkflowImpl]
+        self.mock_get_vnf_details = MockGetVnfDetails()
+        self.mock_get_models_names = mock_get_model_names
+        self.task_queue = LCM_TASK_QUEUE
+        self.workflow_input = NsTerminateWorkflow.Input(
+            ns_uuid="my-ns-uuid",
+            vim_uuid="my-vim-uuid",
+        )
+
+    def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker:
+        return Worker(
+            self.client,
+            task_queue=task_queue,
+            workflows=workflows,
+            activities=activities,
+            debug_mode=DEBUG_MODE,
+        )
+
+    async def execute_workflow(self, wf_input):
+        return await self.client.execute_workflow(
+            NsTerminateWorkflowImpl.run,
+            arg=wf_input,
+            id=uuid.uuid4().hex,
+            task_queue=self.task_queue,
+            task_timeout=TASK_TIMEOUT,
+            execution_timeout=EXECUTION_TIMEOUT,
+        )
+
+    @patch(
+        "temporalio.workflow.execute_child_workflow",
+        wraps=workflow.execute_child_workflow,
+    )
+    async def test_ns_terminate_workflow__successful__executes_child_workflow(
+        self,
+        mock_execute_child_workflow: asynctest.CoroutineMock,
+    ):
+        workflows = [
+            MockVnfTerminateWorkflow,
+            NsTerminateWorkflowImpl,
+        ]
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_model_names,
+            mock_remove_model,
+            mock_check_model_is_removed,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            await self.execute_workflow(self.wf_input)
+        self.assertTrue(mock_execute_child_workflow.called)
+        self.assertEqual(mock_execute_child_workflow.call_count, 2)
+        call_args = mock_execute_child_workflow.call_args_list[0].kwargs["workflow"]
+        self.assertEqual(call_args, VnfTerminateWorkflow.__name__)
+
+    @patch(
+        "temporalio.workflow.execute_child_workflow",
+        wraps=workflow.execute_child_workflow,
+    )
+    async def test_ns_terminate_workflow__successful__executes_child_workflows_concurrently(
+        self,
+        mock_execute_child_workflow: asynctest.CoroutineMock,
+    ):
+        # Choose a number of child workflows that would take longer to execute than the execution timeout
+        num_child_workflows = EXECUTION_TIMEOUT.seconds * 2 + 1
+        workflows = [
+            MockVnfTerminateWorkflow,
+            NsTerminateWorkflowImpl,
+        ]
+        mock_get_vnf_details = MockGetVnfDetails()
+        mock_get_vnf_details.return_value = GetVnfDetails.Output(
+            vnf_details=[
+                (f"vnfr-{uuid.uuid4().hex}", vnf_member_index_ref_1)
+                for _ in range(num_child_workflows)
+            ]
+        )
+        activities = [
+            mock_get_vnf_details,
+            mock_get_model_names,
+            mock_remove_model,
+            mock_check_model_is_removed,
+        ]
+
+        # Set up the return values with a unique workflow ID for each child workflow
+        mock_get_model_names.side_effect = [
+            f"namespace-{uuid.uuid4().hex}" for _ in range(num_child_workflows)
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            await self.execute_workflow(self.wf_input)
+
+        self.assertEqual(mock_execute_child_workflow.call_count, num_child_workflows)
+        # Check that VnfTerminateWorkflow is executed
+        for i in range(0, num_child_workflows):
+            call_args = mock_execute_child_workflow.call_args_list[i].kwargs["workflow"]
+            self.assertEqual(call_args, VnfTerminateWorkflow.__name__)
+
+    async def test_ns_terminate_workflow__get_vnf_details_failed__raises_activity_error(
+        self,
+    ):
+        activities = [
+            MockGetVnfDetailsRaises(),
+            mock_get_model_names,
+            mock_remove_model,
+            mock_check_model_is_removed,
+        ]
+        async with self.env, self.get_worker(
+            self.task_queue, self.workflows, activities
+        ):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+    async def test_ns_terminate_workflow__get_model_names_failed__raises_activity_error(
+        self,
+    ):
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_model_names_raises,
+            mock_remove_model,
+            mock_check_model_is_removed,
+        ]
+        async with self.env, self.get_worker(
+            self.task_queue, self.workflows, activities
+        ):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+    async def test_ns_terminate_workflow__remove_model_failed__raises_activity_error(
+        self,
+    ):
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_model_names,
+            mock_remove_model_raises,
+            mock_check_model_is_removed,
+        ]
+        async with self.env, self.get_worker(
+            self.task_queue, self.workflows, activities
+        ):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+    async def test_ns_terminate_workflow__check_model_removed_failed__raises_activity_error(
+        self,
+    ):
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_model_names,
+            mock_remove_model,
+            mock_check_model_is_removed_raises,
+        ]
+        async with self.env, self.get_worker(
+            self.task_queue, self.workflows, activities
+        ):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+    async def test_ns_terminate_workflow__wf_vnf_terminate_failed__raises_child_wf_error(
+        self,
+    ):
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_model_names,
+            mock_remove_model,
+            mock_check_model_is_removed,
+        ]
+        async with self.env, self.get_worker(
+            self.task_queue,
+            [
+                NsTerminateWorkflowImpl,
+                MockVnfTerminateWorkflowFailed,
+            ],
+            activities,
+        ):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertTrue(isinstance(err.exception.cause.cause, ChildWorkflowError))
+
+    async def test_ns_terminate__check_model_is_removed_cancelled__activity_error_is_raised(
+        self,
+    ):
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_model_names,
+            mock_remove_model,
+            mock_check_model_is_removed_cancelled,
+        ]
+        async with self.env, self.get_worker(
+            self.task_queue, self.workflows, activities
+        ):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+
 class TestNsDeleteRecordsWorkflow(asynctest.TestCase):
     async def setUp(self):
         self.env = await WorkflowEnvironment.start_time_skipping()