OSMENG-1095 VDU Terminate Workflow 61/13661/3
authorgatici <gulsum.atici@canonical.com>
Mon, 10 Jul 2023 22:06:44 +0000 (01:06 +0300)
committerbeierlm <mark.beierl@canonical.com>
Mon, 17 Jul 2023 18:06:41 +0000 (20:06 +0200)
Change-Id: Id8c0476c3de0619d05e860dea2f335e3ac1406d7
Signed-off-by: gatici <gulsum.atici@canonical.com>
osm_lcm/temporal/vdu_workflows.py
osm_lcm/tests/test_vdu_workflow.py

index 8dc5ba9..f449b54 100644 (file)
@@ -18,10 +18,16 @@ from datetime import timedelta
 import traceback
 
 from osm_common.temporal.activities.paas import (
-    DeployCharm,
     CheckCharmStatus,
+    CheckCharmIsRemoved,
+    DeployCharm,
+    RemoveCharm,
+    ResolveCharmErrors,
+)
+from osm_common.temporal.workflows.vdu import (
+    VduInstantiateWorkflow,
+    VduTerminateWorkflow,
 )
-from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow
 from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
 
 from temporalio import workflow
@@ -33,7 +39,7 @@ retry_policy = RetryPolicy(maximum_attempts=3)
 default_schedule_to_close_timeout = timedelta(minutes=10)
 
 
-@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=False)
+@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=_SANDBOXED)
 class VduInstantiateWorkflowImpl(VduInstantiateWorkflow):
     @workflow.run
     async def run(self, workflow_input: VduInstantiateWorkflow.Input) -> None:
@@ -88,3 +94,163 @@ class VduInstantiateWorkflowImpl(VduInstantiateWorkflow):
                 f"{VduInstantiateWorkflow.__name__} failed with {err_details}"
             )
             raise e
+
+
+@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=_SANDBOXED)
+class VduTerminateWorkflowImpl(VduTerminateWorkflow):
+    @workflow.run
+    async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None:
+        try:
+            vim_uuid = workflow_input.vim_uuid
+            model_name = workflow_input.model_name
+            application_name = workflow_input.application_name
+
+            self.logger.info(f"Terminating VDU `{application_name}`.")
+            try:
+                await VduTerminateWorkflowImpl.remove_application_and_storage(
+                    vim_uuid, model_name, application_name, False
+                )
+                self.logger.info(
+                    f"Waiting for VDU `{workflow_input.application_name}` to be removed."
+                )
+            except ActivityError as remove_application_error:
+                raise remove_application_error
+
+            try:
+                await VduTerminateWorkflowImpl.check_charm_is_removed(
+                    vim_uuid, model_name, application_name
+                )
+                self.logger.info(f"VDU `{application_name}` is removed.")
+                return
+
+            except ActivityError:
+                try:
+                    self.logger.info(f"Resolving VDU `{application_name}` errors.")
+                    await VduTerminateWorkflowImpl.resolve_charm_errors(
+                        vim_uuid, model_name, application_name
+                    )
+
+                except ActivityError:
+                    try:
+                        self.logger.info(
+                            f"Removing VDU `{application_name}` forcefully."
+                        )
+                        await VduTerminateWorkflowImpl.remove_application_and_storage(
+                            vim_uuid, model_name, application_name, True
+                        )
+
+                    except ActivityError as remove_application_error:
+                        raise remove_application_error
+
+                    try:
+                        await VduTerminateWorkflowImpl.check_charm_is_removed(
+                            vim_uuid, model_name, application_name
+                        )
+                        self.logger.info(f"VDU `{application_name}` is removed.")
+                        return
+
+                    except ActivityError as check_charm_removal_error:
+                        self.logger.error(
+                            f"VDU {application_name} could not be removed."
+                        )
+                        raise check_charm_removal_error
+
+                try:
+                    self.logger.info(
+                        f"Removing VDU `{application_name}` after error resolution."
+                    )
+                    await VduTerminateWorkflowImpl.remove_application_and_storage(
+                        vim_uuid, model_name, application_name, False
+                    )
+                except ActivityError as remove_application_error:
+                    raise remove_application_error
+
+                try:
+                    await VduTerminateWorkflowImpl.check_charm_is_removed(
+                        vim_uuid, model_name, application_name
+                    )
+                    self.logger.info(f"VDU `{application_name}` is removed.")
+                    return
+
+                except ActivityError:
+                    self.logger.info(f"Removing VDU `{application_name}` forcefully.")
+                    await VduTerminateWorkflowImpl.remove_application_and_storage(
+                        vim_uuid, model_name, application_name, True
+                    )
+
+                    try:
+                        await VduTerminateWorkflowImpl.check_charm_is_removed(
+                            vim_uuid, model_name, application_name
+                        )
+                        self.logger.info(f"VDU `{application_name}` is removed.")
+                        return
+
+                    except ActivityError as check_charm_removal_error:
+                        self.logger.error(
+                            f"VDU {application_name} could not be removed."
+                        )
+                        raise check_charm_removal_error
+
+        except ActivityError as e:
+            err_details = str(e.cause.with_traceback(e.__traceback__))
+            self.logger.error(
+                f"{VduTerminateWorkflow.__name__} failed with {err_details}"
+            )
+            raise e
+
+        except Exception as e:
+            err_details = str(traceback.format_exc())
+            self.logger.error(
+                f"{VduTerminateWorkflow.__name__} failed with {err_details}"
+            )
+            raise e
+
+    @staticmethod
+    async def remove_application_and_storage(
+        vim_uuid, model_name, application_name, force_remove
+    ):
+        await workflow.execute_activity(
+            activity=RemoveCharm.__name__,
+            arg=RemoveCharm.Input(
+                vim_uuid=vim_uuid,
+                model_name=model_name,
+                application_name=application_name,
+                force_remove=force_remove,
+            ),
+            activity_id=f"{RemoveCharm.__name__}-{vim_uuid}",
+            task_queue=LCM_TASK_QUEUE,
+            schedule_to_close_timeout=default_schedule_to_close_timeout,
+            retry_policy=retry_policy,
+        )
+
+    @staticmethod
+    async def check_charm_is_removed(vim_uuid, model_name, application_name):
+        await workflow.execute_activity(
+            activity=CheckCharmIsRemoved.__name__,
+            arg=CheckCharmIsRemoved.Input(
+                vim_uuid=vim_uuid,
+                model_name=model_name,
+                application_name=application_name,
+            ),
+            activity_id=f"{CheckCharmIsRemoved.__name__}-{vim_uuid}",
+            task_queue=LCM_TASK_QUEUE,
+            start_to_close_timeout=timedelta(minutes=5),
+            heartbeat_timeout=timedelta(seconds=30),
+            retry_policy=retry_policy,
+        )
+
+    @staticmethod
+    async def resolve_charm_errors(vim_uuid, model_name, application_name):
+        await workflow.execute_activity(
+            activity=ResolveCharmErrors.__name__,
+            arg=ResolveCharmErrors.Input(
+                vim_uuid=vim_uuid,
+                model_name=model_name,
+                application_name=application_name,
+            ),
+            activity_id=f"{ResolveCharmErrors.__name__}-{vim_uuid}",
+            task_queue=LCM_TASK_QUEUE,
+            start_to_close_timeout=timedelta(minutes=3),
+            heartbeat_timeout=timedelta(seconds=30),
+            retry_policy=retry_policy,
+        )
index 988618f..a5cc97f 100644 (file)
 # implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import asyncio
+from datetime import timedelta
 
 import asynctest
-from datetime import timedelta
 
 from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
 from osm_common.temporal.activities.paas import (
     DeployCharm,
     CheckCharmStatus,
+    CheckCharmIsRemoved,
+    RemoveCharm,
+    ResolveCharmErrors,
 )
 from osm_common.temporal.dataclasses_common import CharmInfo, VduComputeConstraints
-from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflowImpl
+from osm_lcm.temporal.vdu_workflows import (
+    VduInstantiateWorkflowImpl,
+    VduTerminateWorkflowImpl,
+)
 from temporalio import activity
 from temporalio.client import WorkflowFailureError
 from temporalio.exceptions import (
@@ -32,8 +39,8 @@ from temporalio.exceptions import (
 from temporalio.testing import WorkflowEnvironment
 from temporalio.worker import Worker
 
-TASK_TIMEOUT = timedelta(seconds=0.5)
-EXECUTION_TIMEOUT = timedelta(seconds=1)
+TASK_TIMEOUT = timedelta(seconds=1)
+EXECUTION_TIMEOUT = timedelta(seconds=10)
 
 
 class TestException(Exception):
@@ -62,7 +69,61 @@ async def check_charm_status_mocked_raises(
     raise TestException(f"{CheckCharmStatus.__name__} failed.")
 
 
-class TestVduWorkflows(asynctest.TestCase):
+@activity.defn(name=CheckCharmIsRemoved.__name__)
+async def check_charm_is_removed_mocked(
+    check_charm_is_removed: CheckCharmIsRemoved.Input,
+) -> None:
+    pass
+
+
+@activity.defn(name=CheckCharmIsRemoved.__name__)
+async def check_charm_is_removed_mocked_raises(
+    check_charm_is_removed: CheckCharmIsRemoved.Input,
+) -> None:
+    raise TestException(f"{CheckCharmIsRemoved.__name__} failed.")
+
+
+@activity.defn(name=CheckCharmIsRemoved.__name__)
+async def check_charm_is_removed_mocked_cancelled(
+    check_charm_is_removed: CheckCharmIsRemoved.Input,
+) -> None:
+    raise asyncio.exceptions.CancelledError(
+        f"{CheckCharmIsRemoved.__name__} cancelled."
+    )
+
+
+@activity.defn(name=RemoveCharm.__name__)
+async def remove_charm_mocked(remove_charm: RemoveCharm.Input) -> None:
+    pass
+
+
+@activity.defn(name=RemoveCharm.__name__)
+async def remove_charm_mocked_raises(remove_charm: RemoveCharm.Input) -> None:
+    raise TestException(f"{RemoveCharm.__name__} failed.")
+
+
+@activity.defn(name=ResolveCharmErrors.__name__)
+async def resolve_charm_errors_mocked(
+    resolve_charm_errors: ResolveCharmErrors.Input,
+) -> None:
+    pass
+
+
+@activity.defn(name=ResolveCharmErrors.__name__)
+async def resolve_charm_errors_mocked_raises(
+    resolve_charm_errors: ResolveCharmErrors.Input,
+) -> None:
+    raise TestException(f"{ResolveCharmErrors.__name__} failed.")
+
+
+@activity.defn(name=ResolveCharmErrors.__name__)
+async def resolve_charm_errors_mocked_cancelled(
+    resolve_charm_errors: ResolveCharmErrors.Input,
+) -> None:
+    raise asyncio.exceptions.CancelledError(f"{ResolveCharmErrors.__name__} cancelled.")
+
+
+class TestVduInstantiateWorkflow(asynctest.TestCase):
     task_queue_name = LCM_TASK_QUEUE
     vim_id = "some-vim-uuid"
     namespace = "some-namespace"
@@ -179,3 +240,177 @@ class TestVduWorkflows(asynctest.TestCase):
                 str(err.exception.cause.cause),
                 f"TestException: {CheckCharmStatus.__name__} failed.",
             )
+
+
+class TestVduTerminateWorkflow(asynctest.TestCase):
+    task_queue_name = LCM_TASK_QUEUE
+    vim_id = "some-vim-uuid"
+    namespace = "some-namespace"
+    app_name = "my_app_name"
+    workflow_id = namespace + "-" + app_name
+    vdu_terminate_wf_input = VduTerminateWorkflowImpl.Input(vim_id, namespace, app_name)
+
+    async def setUp(self):
+        self.env = await WorkflowEnvironment.start_time_skipping()
+        self.client = self.env.client
+
+    def get_worker(self, activities: list) -> Worker:
+        return Worker(
+            self.client,
+            task_queue=self.task_queue_name,
+            workflows=[VduTerminateWorkflowImpl],
+            activities=activities,
+            debug_mode=True,
+        )
+
+    async def test_vdu_terminate__all_activities_are_successful__workflow_succeded(
+        self,
+    ):
+        activities = [
+            check_charm_is_removed_mocked,
+            remove_charm_mocked,
+            resolve_charm_errors_mocked,
+        ]
+        async with self.env, self.get_worker(activities):
+            await self.client.execute_workflow(
+                VduTerminateWorkflowImpl.run,
+                arg=self.vdu_terminate_wf_input,
+                id=self.workflow_id,
+                task_queue=self.task_queue_name,
+                task_timeout=TASK_TIMEOUT,
+                execution_timeout=EXECUTION_TIMEOUT,
+            )
+
+    async def test_vdu_terminate__check_charm_is_removed_raises__workflow_fails(self):
+        activities = [
+            check_charm_is_removed_mocked_raises,
+            remove_charm_mocked,
+            resolve_charm_errors_mocked,
+        ]
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.client.execute_workflow(
+                    VduTerminateWorkflowImpl.run,
+                    arg=self.vdu_terminate_wf_input,
+                    id=self.workflow_id,
+                    task_queue=self.task_queue_name,
+                    task_timeout=TASK_TIMEOUT,
+                    execution_timeout=EXECUTION_TIMEOUT,
+                )
+            self.assertEqual(
+                str(err.exception.cause.cause),
+                f"TestException: {CheckCharmIsRemoved.__name__} failed.",
+            )
+
+    async def test_vdu_terminate__check_charm_is_removed_raises__activity_error_is_raised(
+        self,
+    ):
+        activities = [
+            check_charm_is_removed_mocked_raises,
+            remove_charm_mocked,
+            resolve_charm_errors_mocked,
+        ]
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.client.execute_workflow(
+                    VduTerminateWorkflowImpl.run,
+                    arg=self.vdu_terminate_wf_input,
+                    id=self.workflow_id,
+                    task_queue=self.task_queue_name,
+                    task_timeout=TASK_TIMEOUT,
+                    execution_timeout=EXECUTION_TIMEOUT,
+                )
+            self.assertTrue(err.exception.cause, ActivityError)
+
+    async def test_vdu_terminate__check_charm_is_removed_cancelled__activity_error_is_raised(
+        self,
+    ):
+        activities = [
+            check_charm_is_removed_mocked_cancelled,
+            remove_charm_mocked,
+            resolve_charm_errors_mocked,
+        ]
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.client.execute_workflow(
+                    VduTerminateWorkflowImpl.run,
+                    arg=self.vdu_terminate_wf_input,
+                    id=self.workflow_id,
+                    task_queue=self.task_queue_name,
+                    task_timeout=TASK_TIMEOUT,
+                    execution_timeout=EXECUTION_TIMEOUT,
+                )
+            self.assertTrue(err.exception.cause, ActivityError)
+
+    async def test_vdu_terminate__remove_charm_raises__workflow_failed(self):
+        activities = [
+            check_charm_is_removed_mocked,
+            remove_charm_mocked_raises,
+            resolve_charm_errors_mocked,
+        ]
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.client.execute_workflow(
+                    VduTerminateWorkflowImpl.run,
+                    arg=self.vdu_terminate_wf_input,
+                    id=self.workflow_id,
+                    task_queue=self.task_queue_name,
+                    task_timeout=TASK_TIMEOUT,
+                    execution_timeout=EXECUTION_TIMEOUT,
+                )
+            self.assertEqual(
+                str(err.exception.cause.cause),
+                f"TestException: {RemoveCharm.__name__} failed.",
+            )
+
+    async def test_vdu_terminate__remove_charm_raises__activity_error_is_raised(self):
+        activities = [
+            check_charm_is_removed_mocked,
+            remove_charm_mocked_raises,
+            resolve_charm_errors_mocked,
+        ]
+        async with self.env, self.get_worker(activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.client.execute_workflow(
+                    VduTerminateWorkflowImpl.run,
+                    arg=self.vdu_terminate_wf_input,
+                    id=self.workflow_id,
+                    task_queue=self.task_queue_name,
+                    task_timeout=TASK_TIMEOUT,
+                    execution_timeout=EXECUTION_TIMEOUT,
+                )
+            self.assertTrue(err.exception.cause, ActivityError)
+
+    async def test_vdu_terminate__resolve_charm_errors_raises__workflow_succeded(self):
+        activities = [
+            check_charm_is_removed_mocked,
+            remove_charm_mocked,
+            resolve_charm_errors_mocked_raises,
+        ]
+        async with self.env, self.get_worker(activities):
+            await self.client.execute_workflow(
+                VduTerminateWorkflowImpl.run,
+                arg=self.vdu_terminate_wf_input,
+                id=self.workflow_id,
+                task_queue=self.task_queue_name,
+                task_timeout=TASK_TIMEOUT,
+                execution_timeout=EXECUTION_TIMEOUT,
+            )
+
+    async def test_vdu_terminate__resolve_charm_errors_cancelled__workflow_succeded(
+        self,
+    ):
+        activities = [
+            check_charm_is_removed_mocked,
+            remove_charm_mocked,
+            resolve_charm_errors_mocked_cancelled,
+        ]
+        async with self.env, self.get_worker(activities):
+            await self.client.execute_workflow(
+                VduTerminateWorkflowImpl.run,
+                arg=self.vdu_terminate_wf_input,
+                id=self.workflow_id,
+                task_queue=self.task_queue_name,
+                task_timeout=TASK_TIMEOUT,
+                execution_timeout=EXECUTION_TIMEOUT,
+            )