import traceback
from osm_common.temporal.activities.paas import (
- DeployCharm,
CheckCharmStatus,
+ CheckCharmIsRemoved,
+ DeployCharm,
+ RemoveCharm,
+ ResolveCharmErrors,
+)
+from osm_common.temporal.workflows.vdu import (
+ VduInstantiateWorkflow,
+ VduTerminateWorkflow,
)
-from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from temporalio import workflow
default_schedule_to_close_timeout = timedelta(minutes=10)
-@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=False)
+@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=_SANDBOXED)
class VduInstantiateWorkflowImpl(VduInstantiateWorkflow):
@workflow.run
async def run(self, workflow_input: VduInstantiateWorkflow.Input) -> None:
f"{VduInstantiateWorkflow.__name__} failed with {err_details}"
)
raise e
+
+
+@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=_SANDBOXED)
+class VduTerminateWorkflowImpl(VduTerminateWorkflow):
+ @workflow.run
+ async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None:
+ try:
+ vim_uuid = workflow_input.vim_uuid
+ model_name = workflow_input.model_name
+ application_name = workflow_input.application_name
+
+ self.logger.info(f"Terminating VDU `{application_name}`.")
+ try:
+ await VduTerminateWorkflowImpl.remove_application_and_storage(
+ vim_uuid, model_name, application_name, False
+ )
+ self.logger.info(
+ f"Waiting for VDU `{workflow_input.application_name}` to be removed."
+ )
+ except ActivityError as remove_application_error:
+ raise remove_application_error
+
+ try:
+ await VduTerminateWorkflowImpl.check_charm_is_removed(
+ vim_uuid, model_name, application_name
+ )
+ self.logger.info(f"VDU `{application_name}` is removed.")
+ return
+
+ except ActivityError:
+ try:
+ self.logger.info(f"Resolving VDU `{application_name}` errors.")
+ await VduTerminateWorkflowImpl.resolve_charm_errors(
+ vim_uuid, model_name, application_name
+ )
+
+ except ActivityError:
+ try:
+ self.logger.info(
+ f"Removing VDU `{application_name}` forcefully."
+ )
+ await VduTerminateWorkflowImpl.remove_application_and_storage(
+ vim_uuid, model_name, application_name, True
+ )
+
+ except ActivityError as remove_application_error:
+ raise remove_application_error
+
+ try:
+ await VduTerminateWorkflowImpl.check_charm_is_removed(
+ vim_uuid, model_name, application_name
+ )
+ self.logger.info(f"VDU `{application_name}` is removed.")
+ return
+
+ except ActivityError as check_charm_removal_error:
+ self.logger.error(
+ f"VDU {application_name} could not be removed."
+ )
+ raise check_charm_removal_error
+
+ try:
+ self.logger.info(
+ f"Removing VDU `{application_name}` after error resolution."
+ )
+ await VduTerminateWorkflowImpl.remove_application_and_storage(
+ vim_uuid, model_name, application_name, False
+ )
+ except ActivityError as remove_application_error:
+ raise remove_application_error
+
+ try:
+ await VduTerminateWorkflowImpl.check_charm_is_removed(
+ vim_uuid, model_name, application_name
+ )
+ self.logger.info(f"VDU `{application_name}` is removed.")
+ return
+
+ except ActivityError:
+ self.logger.info(f"Removing VDU `{application_name}` forcefully.")
+ await VduTerminateWorkflowImpl.remove_application_and_storage(
+ vim_uuid, model_name, application_name, True
+ )
+
+ try:
+ await VduTerminateWorkflowImpl.check_charm_is_removed(
+ vim_uuid, model_name, application_name
+ )
+ self.logger.info(f"VDU `{application_name}` is removed.")
+ return
+
+ except ActivityError as check_charm_removal_error:
+ self.logger.error(
+ f"VDU {application_name} could not be removed."
+ )
+ raise check_charm_removal_error
+
+ except ActivityError as e:
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(
+ f"{VduTerminateWorkflow.__name__} failed with {err_details}"
+ )
+ raise e
+
+ except Exception as e:
+ err_details = str(traceback.format_exc())
+ self.logger.error(
+ f"{VduTerminateWorkflow.__name__} failed with {err_details}"
+ )
+ raise e
+
+ @staticmethod
+ async def remove_application_and_storage(
+ vim_uuid, model_name, application_name, force_remove
+ ):
+ await workflow.execute_activity(
+ activity=RemoveCharm.__name__,
+ arg=RemoveCharm.Input(
+ vim_uuid=vim_uuid,
+ model_name=model_name,
+ application_name=application_name,
+ force_remove=force_remove,
+ ),
+ activity_id=f"{RemoveCharm.__name__}-{vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ @staticmethod
+ async def check_charm_is_removed(vim_uuid, model_name, application_name):
+ await workflow.execute_activity(
+ activity=CheckCharmIsRemoved.__name__,
+ arg=CheckCharmIsRemoved.Input(
+ vim_uuid=vim_uuid,
+ model_name=model_name,
+ application_name=application_name,
+ ),
+ activity_id=f"{CheckCharmIsRemoved.__name__}-{vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ start_to_close_timeout=timedelta(minutes=5),
+ heartbeat_timeout=timedelta(seconds=30),
+ retry_policy=retry_policy,
+ )
+
+ @staticmethod
+ async def resolve_charm_errors(vim_uuid, model_name, application_name):
+ await workflow.execute_activity(
+ activity=ResolveCharmErrors.__name__,
+ arg=ResolveCharmErrors.Input(
+ vim_uuid=vim_uuid,
+ model_name=model_name,
+ application_name=application_name,
+ ),
+ activity_id=f"{ResolveCharmErrors.__name__}-{vim_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ start_to_close_timeout=timedelta(minutes=3),
+ heartbeat_timeout=timedelta(seconds=30),
+ retry_policy=retry_policy,
+ )
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import asyncio
+from datetime import timedelta
import asynctest
-from datetime import timedelta
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_common.temporal.activities.paas import (
DeployCharm,
CheckCharmStatus,
+ CheckCharmIsRemoved,
+ RemoveCharm,
+ ResolveCharmErrors,
)
from osm_common.temporal.dataclasses_common import CharmInfo, VduComputeConstraints
-from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflowImpl
+from osm_lcm.temporal.vdu_workflows import (
+ VduInstantiateWorkflowImpl,
+ VduTerminateWorkflowImpl,
+)
from temporalio import activity
from temporalio.client import WorkflowFailureError
from temporalio.exceptions import (
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
-TASK_TIMEOUT = timedelta(seconds=0.5)
-EXECUTION_TIMEOUT = timedelta(seconds=1)
+TASK_TIMEOUT = timedelta(seconds=1)
+EXECUTION_TIMEOUT = timedelta(seconds=10)
class TestException(Exception):
raise TestException(f"{CheckCharmStatus.__name__} failed.")
-class TestVduWorkflows(asynctest.TestCase):
+@activity.defn(name=CheckCharmIsRemoved.__name__)
+async def check_charm_is_removed_mocked(
+ check_charm_is_removed: CheckCharmIsRemoved.Input,
+) -> None:
+ pass
+
+
+@activity.defn(name=CheckCharmIsRemoved.__name__)
+async def check_charm_is_removed_mocked_raises(
+ check_charm_is_removed: CheckCharmIsRemoved.Input,
+) -> None:
+ raise TestException(f"{CheckCharmIsRemoved.__name__} failed.")
+
+
+@activity.defn(name=CheckCharmIsRemoved.__name__)
+async def check_charm_is_removed_mocked_cancelled(
+ check_charm_is_removed: CheckCharmIsRemoved.Input,
+) -> None:
+ raise asyncio.exceptions.CancelledError(
+ f"{CheckCharmIsRemoved.__name__} cancelled."
+ )
+
+
+@activity.defn(name=RemoveCharm.__name__)
+async def remove_charm_mocked(remove_charm: RemoveCharm.Input) -> None:
+ pass
+
+
+@activity.defn(name=RemoveCharm.__name__)
+async def remove_charm_mocked_raises(remove_charm: RemoveCharm.Input) -> None:
+ raise TestException(f"{RemoveCharm.__name__} failed.")
+
+
+@activity.defn(name=ResolveCharmErrors.__name__)
+async def resolve_charm_errors_mocked(
+ resolve_charm_errors: ResolveCharmErrors.Input,
+) -> None:
+ pass
+
+
+@activity.defn(name=ResolveCharmErrors.__name__)
+async def resolve_charm_errors_mocked_raises(
+ resolve_charm_errors: ResolveCharmErrors.Input,
+) -> None:
+ raise TestException(f"{ResolveCharmErrors.__name__} failed.")
+
+
+@activity.defn(name=ResolveCharmErrors.__name__)
+async def resolve_charm_errors_mocked_cancelled(
+ resolve_charm_errors: ResolveCharmErrors.Input,
+) -> None:
+ raise asyncio.exceptions.CancelledError(f"{ResolveCharmErrors.__name__} cancelled.")
+
+
+class TestVduInstantiateWorkflow(asynctest.TestCase):
task_queue_name = LCM_TASK_QUEUE
vim_id = "some-vim-uuid"
namespace = "some-namespace"
str(err.exception.cause.cause),
f"TestException: {CheckCharmStatus.__name__} failed.",
)
+
+
+class TestVduTerminateWorkflow(asynctest.TestCase):
+ task_queue_name = LCM_TASK_QUEUE
+ vim_id = "some-vim-uuid"
+ namespace = "some-namespace"
+ app_name = "my_app_name"
+ workflow_id = namespace + "-" + app_name
+ vdu_terminate_wf_input = VduTerminateWorkflowImpl.Input(vim_id, namespace, app_name)
+
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+
+ def get_worker(self, activities: list) -> Worker:
+ return Worker(
+ self.client,
+ task_queue=self.task_queue_name,
+ workflows=[VduTerminateWorkflowImpl],
+ activities=activities,
+ debug_mode=True,
+ )
+
+ async def test_vdu_terminate__all_activities_are_successful__workflow_succeded(
+ self,
+ ):
+ activities = [
+ check_charm_is_removed_mocked,
+ remove_charm_mocked,
+ resolve_charm_errors_mocked,
+ ]
+ async with self.env, self.get_worker(activities):
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+
+ async def test_vdu_terminate__check_charm_is_removed_raises__workflow_fails(self):
+ activities = [
+ check_charm_is_removed_mocked_raises,
+ remove_charm_mocked,
+ resolve_charm_errors_mocked,
+ ]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {CheckCharmIsRemoved.__name__} failed.",
+ )
+
+ async def test_vdu_terminate__check_charm_is_removed_raises__activity_error_is_raised(
+ self,
+ ):
+ activities = [
+ check_charm_is_removed_mocked_raises,
+ remove_charm_mocked,
+ resolve_charm_errors_mocked,
+ ]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertTrue(err.exception.cause, ActivityError)
+
+ async def test_vdu_terminate__check_charm_is_removed_cancelled__activity_error_is_raised(
+ self,
+ ):
+ activities = [
+ check_charm_is_removed_mocked_cancelled,
+ remove_charm_mocked,
+ resolve_charm_errors_mocked,
+ ]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertTrue(err.exception.cause, ActivityError)
+
+ async def test_vdu_terminate__remove_charm_raises__workflow_failed(self):
+ activities = [
+ check_charm_is_removed_mocked,
+ remove_charm_mocked_raises,
+ resolve_charm_errors_mocked,
+ ]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {RemoveCharm.__name__} failed.",
+ )
+
+ async def test_vdu_terminate__remove_charm_raises__activity_error_is_raised(self):
+ activities = [
+ check_charm_is_removed_mocked,
+ remove_charm_mocked_raises,
+ resolve_charm_errors_mocked,
+ ]
+ async with self.env, self.get_worker(activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertTrue(err.exception.cause, ActivityError)
+
+ async def test_vdu_terminate__resolve_charm_errors_raises__workflow_succeded(self):
+ activities = [
+ check_charm_is_removed_mocked,
+ remove_charm_mocked,
+ resolve_charm_errors_mocked_raises,
+ ]
+ async with self.env, self.get_worker(activities):
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+
+ async def test_vdu_terminate__resolve_charm_errors_cancelled__workflow_succeded(
+ self,
+ ):
+ activities = [
+ check_charm_is_removed_mocked,
+ remove_charm_mocked,
+ resolve_charm_errors_mocked_cancelled,
+ ]
+ async with self.env, self.get_worker(activities):
+ await self.client.execute_workflow(
+ VduTerminateWorkflowImpl.run,
+ arg=self.vdu_terminate_wf_input,
+ id=self.workflow_id,
+ task_queue=self.task_queue_name,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )