From: gatici Date: Mon, 10 Jul 2023 22:06:44 +0000 (+0300) Subject: OSMENG-1095 VDU Terminate Workflow X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=ac5b194a36f902dda1d3e912120eea1189f49859;p=osm%2FLCM.git OSMENG-1095 VDU Terminate Workflow Change-Id: Id8c0476c3de0619d05e860dea2f335e3ac1406d7 Signed-off-by: gatici --- diff --git a/osm_lcm/temporal/vdu_workflows.py b/osm_lcm/temporal/vdu_workflows.py index 8dc5ba9..f449b54 100644 --- a/osm_lcm/temporal/vdu_workflows.py +++ b/osm_lcm/temporal/vdu_workflows.py @@ -18,10 +18,16 @@ from datetime import timedelta import traceback from osm_common.temporal.activities.paas import ( - DeployCharm, CheckCharmStatus, + CheckCharmIsRemoved, + DeployCharm, + RemoveCharm, + ResolveCharmErrors, +) +from osm_common.temporal.workflows.vdu import ( + VduInstantiateWorkflow, + VduTerminateWorkflow, ) -from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from temporalio import workflow @@ -33,7 +39,7 @@ retry_policy = RetryPolicy(maximum_attempts=3) default_schedule_to_close_timeout = timedelta(minutes=10) -@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=False) +@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=_SANDBOXED) class VduInstantiateWorkflowImpl(VduInstantiateWorkflow): @workflow.run async def run(self, workflow_input: VduInstantiateWorkflow.Input) -> None: @@ -88,3 +94,163 @@ class VduInstantiateWorkflowImpl(VduInstantiateWorkflow): f"{VduInstantiateWorkflow.__name__} failed with {err_details}" ) raise e + + +@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=_SANDBOXED) +class VduTerminateWorkflowImpl(VduTerminateWorkflow): + @workflow.run + async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None: + try: + vim_uuid = workflow_input.vim_uuid + model_name = workflow_input.model_name + application_name = workflow_input.application_name + + self.logger.info(f"Terminating VDU `{application_name}`.") + try: + await VduTerminateWorkflowImpl.remove_application_and_storage( + vim_uuid, model_name, application_name, False + ) + self.logger.info( + f"Waiting for VDU `{workflow_input.application_name}` to be removed." + ) + except ActivityError as remove_application_error: + raise remove_application_error + + try: + await VduTerminateWorkflowImpl.check_charm_is_removed( + vim_uuid, model_name, application_name + ) + self.logger.info(f"VDU `{application_name}` is removed.") + return + + except ActivityError: + try: + self.logger.info(f"Resolving VDU `{application_name}` errors.") + await VduTerminateWorkflowImpl.resolve_charm_errors( + vim_uuid, model_name, application_name + ) + + except ActivityError: + try: + self.logger.info( + f"Removing VDU `{application_name}` forcefully." + ) + await VduTerminateWorkflowImpl.remove_application_and_storage( + vim_uuid, model_name, application_name, True + ) + + except ActivityError as remove_application_error: + raise remove_application_error + + try: + await VduTerminateWorkflowImpl.check_charm_is_removed( + vim_uuid, model_name, application_name + ) + self.logger.info(f"VDU `{application_name}` is removed.") + return + + except ActivityError as check_charm_removal_error: + self.logger.error( + f"VDU {application_name} could not be removed." + ) + raise check_charm_removal_error + + try: + self.logger.info( + f"Removing VDU `{application_name}` after error resolution." + ) + await VduTerminateWorkflowImpl.remove_application_and_storage( + vim_uuid, model_name, application_name, False + ) + except ActivityError as remove_application_error: + raise remove_application_error + + try: + await VduTerminateWorkflowImpl.check_charm_is_removed( + vim_uuid, model_name, application_name + ) + self.logger.info(f"VDU `{application_name}` is removed.") + return + + except ActivityError: + self.logger.info(f"Removing VDU `{application_name}` forcefully.") + await VduTerminateWorkflowImpl.remove_application_and_storage( + vim_uuid, model_name, application_name, True + ) + + try: + await VduTerminateWorkflowImpl.check_charm_is_removed( + vim_uuid, model_name, application_name + ) + self.logger.info(f"VDU `{application_name}` is removed.") + return + + except ActivityError as check_charm_removal_error: + self.logger.error( + f"VDU {application_name} could not be removed." + ) + raise check_charm_removal_error + + except ActivityError as e: + err_details = str(e.cause.with_traceback(e.__traceback__)) + self.logger.error( + f"{VduTerminateWorkflow.__name__} failed with {err_details}" + ) + raise e + + except Exception as e: + err_details = str(traceback.format_exc()) + self.logger.error( + f"{VduTerminateWorkflow.__name__} failed with {err_details}" + ) + raise e + + @staticmethod + async def remove_application_and_storage( + vim_uuid, model_name, application_name, force_remove + ): + await workflow.execute_activity( + activity=RemoveCharm.__name__, + arg=RemoveCharm.Input( + vim_uuid=vim_uuid, + model_name=model_name, + application_name=application_name, + force_remove=force_remove, + ), + activity_id=f"{RemoveCharm.__name__}-{vim_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + + @staticmethod + async def check_charm_is_removed(vim_uuid, model_name, application_name): + await workflow.execute_activity( + activity=CheckCharmIsRemoved.__name__, + arg=CheckCharmIsRemoved.Input( + vim_uuid=vim_uuid, + model_name=model_name, + application_name=application_name, + ), + activity_id=f"{CheckCharmIsRemoved.__name__}-{vim_uuid}", + task_queue=LCM_TASK_QUEUE, + start_to_close_timeout=timedelta(minutes=5), + heartbeat_timeout=timedelta(seconds=30), + retry_policy=retry_policy, + ) + + @staticmethod + async def resolve_charm_errors(vim_uuid, model_name, application_name): + await workflow.execute_activity( + activity=ResolveCharmErrors.__name__, + arg=ResolveCharmErrors.Input( + vim_uuid=vim_uuid, + model_name=model_name, + application_name=application_name, + ), + activity_id=f"{ResolveCharmErrors.__name__}-{vim_uuid}", + task_queue=LCM_TASK_QUEUE, + start_to_close_timeout=timedelta(minutes=3), + heartbeat_timeout=timedelta(seconds=30), + retry_policy=retry_policy, + ) diff --git a/osm_lcm/tests/test_vdu_workflow.py b/osm_lcm/tests/test_vdu_workflow.py index 988618f..a5cc97f 100644 --- a/osm_lcm/tests/test_vdu_workflow.py +++ b/osm_lcm/tests/test_vdu_workflow.py @@ -13,17 +13,24 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +from datetime import timedelta import asynctest -from datetime import timedelta from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from osm_common.temporal.activities.paas import ( DeployCharm, CheckCharmStatus, + CheckCharmIsRemoved, + RemoveCharm, + ResolveCharmErrors, ) from osm_common.temporal.dataclasses_common import CharmInfo, VduComputeConstraints -from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflowImpl +from osm_lcm.temporal.vdu_workflows import ( + VduInstantiateWorkflowImpl, + VduTerminateWorkflowImpl, +) from temporalio import activity from temporalio.client import WorkflowFailureError from temporalio.exceptions import ( @@ -32,8 +39,8 @@ from temporalio.exceptions import ( from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -TASK_TIMEOUT = timedelta(seconds=0.5) -EXECUTION_TIMEOUT = timedelta(seconds=1) +TASK_TIMEOUT = timedelta(seconds=1) +EXECUTION_TIMEOUT = timedelta(seconds=10) class TestException(Exception): @@ -62,7 +69,61 @@ async def check_charm_status_mocked_raises( raise TestException(f"{CheckCharmStatus.__name__} failed.") -class TestVduWorkflows(asynctest.TestCase): +@activity.defn(name=CheckCharmIsRemoved.__name__) +async def check_charm_is_removed_mocked( + check_charm_is_removed: CheckCharmIsRemoved.Input, +) -> None: + pass + + +@activity.defn(name=CheckCharmIsRemoved.__name__) +async def check_charm_is_removed_mocked_raises( + check_charm_is_removed: CheckCharmIsRemoved.Input, +) -> None: + raise TestException(f"{CheckCharmIsRemoved.__name__} failed.") + + +@activity.defn(name=CheckCharmIsRemoved.__name__) +async def check_charm_is_removed_mocked_cancelled( + check_charm_is_removed: CheckCharmIsRemoved.Input, +) -> None: + raise asyncio.exceptions.CancelledError( + f"{CheckCharmIsRemoved.__name__} cancelled." + ) + + +@activity.defn(name=RemoveCharm.__name__) +async def remove_charm_mocked(remove_charm: RemoveCharm.Input) -> None: + pass + + +@activity.defn(name=RemoveCharm.__name__) +async def remove_charm_mocked_raises(remove_charm: RemoveCharm.Input) -> None: + raise TestException(f"{RemoveCharm.__name__} failed.") + + +@activity.defn(name=ResolveCharmErrors.__name__) +async def resolve_charm_errors_mocked( + resolve_charm_errors: ResolveCharmErrors.Input, +) -> None: + pass + + +@activity.defn(name=ResolveCharmErrors.__name__) +async def resolve_charm_errors_mocked_raises( + resolve_charm_errors: ResolveCharmErrors.Input, +) -> None: + raise TestException(f"{ResolveCharmErrors.__name__} failed.") + + +@activity.defn(name=ResolveCharmErrors.__name__) +async def resolve_charm_errors_mocked_cancelled( + resolve_charm_errors: ResolveCharmErrors.Input, +) -> None: + raise asyncio.exceptions.CancelledError(f"{ResolveCharmErrors.__name__} cancelled.") + + +class TestVduInstantiateWorkflow(asynctest.TestCase): task_queue_name = LCM_TASK_QUEUE vim_id = "some-vim-uuid" namespace = "some-namespace" @@ -179,3 +240,177 @@ class TestVduWorkflows(asynctest.TestCase): str(err.exception.cause.cause), f"TestException: {CheckCharmStatus.__name__} failed.", ) + + +class TestVduTerminateWorkflow(asynctest.TestCase): + task_queue_name = LCM_TASK_QUEUE + vim_id = "some-vim-uuid" + namespace = "some-namespace" + app_name = "my_app_name" + workflow_id = namespace + "-" + app_name + vdu_terminate_wf_input = VduTerminateWorkflowImpl.Input(vim_id, namespace, app_name) + + async def setUp(self): + self.env = await WorkflowEnvironment.start_time_skipping() + self.client = self.env.client + + def get_worker(self, activities: list) -> Worker: + return Worker( + self.client, + task_queue=self.task_queue_name, + workflows=[VduTerminateWorkflowImpl], + activities=activities, + debug_mode=True, + ) + + async def test_vdu_terminate__all_activities_are_successful__workflow_succeded( + self, + ): + activities = [ + check_charm_is_removed_mocked, + remove_charm_mocked, + resolve_charm_errors_mocked, + ] + async with self.env, self.get_worker(activities): + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + + async def test_vdu_terminate__check_charm_is_removed_raises__workflow_fails(self): + activities = [ + check_charm_is_removed_mocked_raises, + remove_charm_mocked, + resolve_charm_errors_mocked, + ] + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + self.assertEqual( + str(err.exception.cause.cause), + f"TestException: {CheckCharmIsRemoved.__name__} failed.", + ) + + async def test_vdu_terminate__check_charm_is_removed_raises__activity_error_is_raised( + self, + ): + activities = [ + check_charm_is_removed_mocked_raises, + remove_charm_mocked, + resolve_charm_errors_mocked, + ] + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + self.assertTrue(err.exception.cause, ActivityError) + + async def test_vdu_terminate__check_charm_is_removed_cancelled__activity_error_is_raised( + self, + ): + activities = [ + check_charm_is_removed_mocked_cancelled, + remove_charm_mocked, + resolve_charm_errors_mocked, + ] + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + self.assertTrue(err.exception.cause, ActivityError) + + async def test_vdu_terminate__remove_charm_raises__workflow_failed(self): + activities = [ + check_charm_is_removed_mocked, + remove_charm_mocked_raises, + resolve_charm_errors_mocked, + ] + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + self.assertEqual( + str(err.exception.cause.cause), + f"TestException: {RemoveCharm.__name__} failed.", + ) + + async def test_vdu_terminate__remove_charm_raises__activity_error_is_raised(self): + activities = [ + check_charm_is_removed_mocked, + remove_charm_mocked_raises, + resolve_charm_errors_mocked, + ] + async with self.env, self.get_worker(activities): + with self.assertRaises(WorkflowFailureError) as err: + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + self.assertTrue(err.exception.cause, ActivityError) + + async def test_vdu_terminate__resolve_charm_errors_raises__workflow_succeded(self): + activities = [ + check_charm_is_removed_mocked, + remove_charm_mocked, + resolve_charm_errors_mocked_raises, + ] + async with self.env, self.get_worker(activities): + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + + async def test_vdu_terminate__resolve_charm_errors_cancelled__workflow_succeded( + self, + ): + activities = [ + check_charm_is_removed_mocked, + remove_charm_mocked, + resolve_charm_errors_mocked_cancelled, + ] + async with self.env, self.get_worker(activities): + await self.client.execute_workflow( + VduTerminateWorkflowImpl.run, + arg=self.vdu_terminate_wf_input, + id=self.workflow_id, + task_queue=self.task_queue_name, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + )