from datetime import timedelta
import traceback
-from osm_common.temporal.activities.paas import CreateModel
+from osm_common.temporal.activities.paas import (
+ CreateModel,
+ RemoveModel,
+ CheckModelIsRemoved,
+)
from osm_common.temporal.activities.ns import (
DeleteNsRecord,
GetVnfDetails,
GetNsRecord,
UpdateNsState,
)
+from osm_common.temporal.activities.vnf import (
+ GetModelNames,
+)
from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
from osm_common.temporal.workflows.ns import (
NsInstantiateWorkflow,
+ NsTerminateWorkflow,
NsDeleteRecordsWorkflow,
)
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
+from osm_common.temporal.workflows.vnf import (
+ VnfInstantiateWorkflow,
+ VnfTerminateWorkflow,
+ VnfDeleteWorkflow,
+)
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_common.temporal.states import NsState
from temporalio import workflow
return {}
+@workflow.defn(name=NsTerminateWorkflow.__name__, sandboxed=False)
+class NsTerminateWorkflowImpl(NsTerminateWorkflow):
+ @workflow.run
+ async def run(self, workflow_input: NsTerminateWorkflow.Input) -> None:
+ try:
+ vnfs_details = value_to_type(
+ GetVnfDetails.Output,
+ await workflow.execute_activity(
+ activity=GetVnfDetails.__name__,
+ arg=GetVnfDetails.Input(ns_uuid=workflow_input.ns_uuid),
+ activity_id=f"{GetVnfDetails.__name__}-{workflow_input.ns_uuid}",
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ )
+ await asyncio.gather(
+ *(
+ workflow.execute_child_workflow(
+ workflow=VnfTerminateWorkflow.__name__,
+ arg=VnfTerminateWorkflow.Input(
+ vnfr_uuid=vnfr_uuid,
+ ),
+ id=f"{VnfTerminateWorkflow.__name__}-{vnfr_uuid}",
+ )
+ for vnfr_uuid, _ in vnfs_details.vnf_details
+ )
+ )
+ models_names = value_to_type(
+ GetModelNames.Output,
+ await workflow.execute_activity(
+ activity=GetModelNames.__name__,
+ arg=GetModelNames.Input(ns_uuid=workflow_input.ns_uuid),
+ activity_id=f"{GetModelNames.__name__}-{workflow_input.ns_uuid}",
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ )
+ await asyncio.gather(
+ *(
+ NsTerminateWorkflowImpl.remove_model(
+ vim_uuid=workflow_input.vim_uuid,
+ model_name=model_name,
+ force_remove=False,
+ )
+ for model_name in models_names.model_names
+ )
+ )
+
+ try:
+ await asyncio.gather(
+ *(
+ NsTerminateWorkflowImpl.check_model_is_removed(
+ vim_uuid=workflow_input.vim_uuid,
+ model_name=model_name,
+ )
+ for model_name in models_names.model_names
+ )
+ )
+
+ except ActivityError:
+ self.logger.info("Removing models forcefully.")
+ await asyncio.gather(
+ *(
+ NsTerminateWorkflowImpl.remove_model(
+ vim_uuid=workflow_input.vim_uuid,
+ model_name=model_name,
+ force_remove=True,
+ )
+ for model_name in models_names.model_names
+ )
+ )
+
+ try:
+ await asyncio.gather(
+ *(
+ NsTerminateWorkflowImpl.check_model_is_removed(
+ vim_uuid=workflow_input.vim_uuid,
+ model_name=model_name,
+ )
+ for model_name in models_names.model_names
+ )
+ )
+ except ActivityError as check_model_removal_error:
+ self.logger.error("Some models could not be removed.")
+ raise check_model_removal_error
+
+ except FailureError as e:
+ if not hasattr(e, "cause") or e.cause is None:
+ raise e
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(
+ f"{NsTerminateWorkflow.__name__} failed with {err_details}"
+ )
+ raise e
+
+ @staticmethod
+ async def remove_model(vim_uuid, model_name, force_remove):
+ await workflow.execute_activity(
+ activity=RemoveModel.__name__,
+ arg=RemoveModel.Input(
+ vim_uuid=vim_uuid,
+ model_name=model_name,
+ force_remove=force_remove,
+ ),
+ activity_id=f"{RemoveModel.__name__}-{vim_uuid}-{model_name}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ @staticmethod
+ async def check_model_is_removed(vim_uuid, model_name):
+ await workflow.execute_activity(
+ activity=CheckModelIsRemoved.__name__,
+ arg=CheckModelIsRemoved.Input(
+ vim_uuid=vim_uuid,
+ model_name=model_name,
+ ),
+ activity_id=f"{CheckModelIsRemoved.__name__}-{vim_uuid}-{model_name}",
+ task_queue=LCM_TASK_QUEUE,
+ start_to_close_timeout=timedelta(minutes=5),
+ heartbeat_timeout=timedelta(seconds=30),
+ retry_policy=retry_policy,
+ )
+
+
@workflow.defn(name=NsDeleteRecordsWorkflow.__name__, sandboxed=_SANDBOXED)
class NsDeleteRecordsWorkflowImpl(NsDeleteRecordsWorkflow):
@workflow.run
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import asynctest
+import asyncio
import copy
from datetime import timedelta
from typing import Any
from unittest.mock import Mock, patch
import uuid
-from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+import asynctest
+from temporalio import activity, workflow
+from temporalio.client import WorkflowFailureError
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import (
+ ActivityError,
+ ApplicationError,
+ ChildWorkflowError,
+ RetryState,
+ TimeoutError,
+)
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
from osm_common.temporal.activities.lcm import (
UpdateNsLcmOperationState,
)
)
from osm_common.temporal.activities.paas import (
CreateModel,
+ RemoveModel,
+ CheckModelIsRemoved,
)
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
-from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
+from osm_common.temporal.activities.vnf import GetModelNames
from osm_common.temporal.states import LcmOperationState, NsState
+from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
+from osm_common.temporal.workflows.vnf import (
+ VnfInstantiateWorkflow,
+ VnfTerminateWorkflow,
+ VnfDeleteWorkflow,
+)
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_lcm.temporal.ns_workflows import (
NsDeleteRecordsWorkflow,
NsDeleteRecordsWorkflowImpl,
NsInstantiateWorkflow,
NsInstantiateWorkflowImpl,
+ NsTerminateWorkflow,
+ NsTerminateWorkflowImpl,
)
-from temporalio import activity, workflow
-from temporalio.client import WorkflowFailureError
-from temporalio.common import RetryPolicy
-from temporalio.exceptions import (
- ActivityError,
- ApplicationError,
- ChildWorkflowError,
- RetryState,
- TimeoutError,
-)
-from temporalio.testing import WorkflowEnvironment
-from temporalio.worker import Worker
-
vnfr_uuid_1 = "828d91ee-fa04-43bb-8471-f66ea74597e7"
vnfr_uuid_2 = "c4bbeb41-df7e-4daa-863d-c8fd29fac96d"
)
SANDBOXED = False
DEBUG_MODE = True
-TASK_TIMEOUT = timedelta(seconds=0.5)
-EXECUTION_TIMEOUT = timedelta(seconds=1)
+TASK_TIMEOUT = timedelta(seconds=5)
+EXECUTION_TIMEOUT = timedelta(seconds=10)
class TestException(Exception):
raise TestException(f"{CreateModel.__name__} failed.")
+@activity.defn(name=RemoveModel.__name__)
+async def mock_remove_model(remove_model_input: RemoveModel.Input) -> None:
+ pass
+
+
+@activity.defn(name=RemoveModel.__name__)
+async def mock_remove_model_raises(remove_model_input: RemoveModel.Input) -> None:
+ raise TestException(f"{RemoveModel.__name__} failed.")
+
+
+@activity.defn(name=CheckModelIsRemoved.__name__)
+async def mock_check_model_is_removed(
+ check_model_is_removed_input: CheckModelIsRemoved.Input,
+) -> None:
+ pass
+
+
+@activity.defn(name=CheckModelIsRemoved.__name__)
+async def mock_check_model_is_removed_raises(
+ check_model_is_removed_input: CheckModelIsRemoved.Input,
+) -> None:
+ raise TestException(f"{CheckModelIsRemoved.__name__} failed.")
+
+
+@activity.defn(name=CheckModelIsRemoved.__name__)
+async def mock_check_model_is_removed_cancelled(
+ check_model_is_removed_input: CheckModelIsRemoved.Input,
+) -> None:
+ raise asyncio.exceptions.CancelledError(
+ f"{CheckModelIsRemoved.__name__} cancelled."
+ )
+
+
+@activity.defn(name=GetModelNames.__name__)
+async def mock_get_model_names(
+ get_model_names_input: GetModelNames.Input,
+) -> GetModelNames.Output:
+ return GetModelNames.Output({"namespace-1", "namespace-2"})
+
+
+@activity.defn(name=GetModelNames.__name__)
+async def mock_get_model_names_raises(
+ get_model_names_input: GetModelNames.Input,
+) -> GetModelNames.Output:
+ raise TestException(f"{GetModelNames.__name__} failed.")
+
+
@activity.defn(name=GetVnfDetails.__name__)
class MockGetVnfDetails(MockActivity):
def __init__(self, *args: Any, **kwargs: Any):
)
+@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfTerminateWorkflow:
+ @workflow.run
+ async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None:
+ pass
+
+
+@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfTerminateWorkflowFailed:
+ @workflow.run
+ async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None:
+ raise ChildWorkflowError(
+ message=f"{VnfTerminateWorkflow.__name__} child workflow failed.",
+ namespace="default",
+ workflow_id="123",
+ run_id="1",
+ workflow_type=VnfTerminateWorkflow.__name__,
+ initiated_event_id=0,
+ started_event_id=0,
+ retry_state=RetryState.NON_RETRYABLE_FAILURE,
+ )
+
+
@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED)
class MockVnfDeleteWorkflow:
@workflow.run
self.assertEqual(result, {})
+class TestNsTerminateWorkflow(asynctest.TestCase):
+ async def setUp(self):
+ self.wf_input = NsTerminateWorkflow.Input(
+ ns_uuid="ns-uuid",
+ vim_uuid="vim-uuid",
+ )
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+
+ # Maps workflow base classes to implementation classes.
+ # Can be overridden by individual tests to swap out implementations by
+ # setting self.workflows[<workflow_base_class>] = <implementation_class>
+ self.workflows = [MockVnfTerminateWorkflow, NsTerminateWorkflowImpl]
+ self.mock_get_vnf_details = MockGetVnfDetails()
+ self.mock_get_models_names = mock_get_model_names
+ self.task_queue = LCM_TASK_QUEUE
+ self.workflow_input = NsTerminateWorkflow.Input(
+ ns_uuid="my-ns-uuid",
+ vim_uuid="my-vim-uuid",
+ )
+
+ def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker:
+ return Worker(
+ self.client,
+ task_queue=task_queue,
+ workflows=workflows,
+ activities=activities,
+ debug_mode=DEBUG_MODE,
+ )
+
+ async def execute_workflow(self, wf_input):
+ return await self.client.execute_workflow(
+ NsTerminateWorkflowImpl.run,
+ arg=wf_input,
+ id=uuid.uuid4().hex,
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+
+ @patch(
+ "temporalio.workflow.execute_child_workflow",
+ wraps=workflow.execute_child_workflow,
+ )
+ async def test_ns_terminate_workflow__successful__executes_child_workflow(
+ self,
+ mock_execute_child_workflow: asynctest.CoroutineMock,
+ ):
+ workflows = [
+ MockVnfTerminateWorkflow,
+ NsTerminateWorkflowImpl,
+ ]
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_model_names,
+ mock_remove_model,
+ mock_check_model_is_removed,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(mock_execute_child_workflow.called)
+ self.assertEqual(mock_execute_child_workflow.call_count, 2)
+ call_args = mock_execute_child_workflow.call_args_list[0].kwargs["workflow"]
+ self.assertEqual(call_args, VnfTerminateWorkflow.__name__)
+
+ @patch(
+ "temporalio.workflow.execute_child_workflow",
+ wraps=workflow.execute_child_workflow,
+ )
+ async def test_ns_terminate_workflow__successful__executes_child_workflows_concurrently(
+ self,
+ mock_execute_child_workflow: asynctest.CoroutineMock,
+ ):
+ # Choose a number of child workflows that would take longer to execute than the execution timeout
+ num_child_workflows = EXECUTION_TIMEOUT.seconds * 2 + 1
+ workflows = [
+ MockVnfTerminateWorkflow,
+ NsTerminateWorkflowImpl,
+ ]
+ mock_get_vnf_details = MockGetVnfDetails()
+ mock_get_vnf_details.return_value = GetVnfDetails.Output(
+ vnf_details=[
+ (f"vnfr-{uuid.uuid4().hex}", vnf_member_index_ref_1)
+ for _ in range(num_child_workflows)
+ ]
+ )
+ activities = [
+ mock_get_vnf_details,
+ mock_get_model_names,
+ mock_remove_model,
+ mock_check_model_is_removed,
+ ]
+
+ # Set up the return values with a unique workflow ID for each child workflow
+ mock_get_model_names.side_effect = [
+ f"namespace-{uuid.uuid4().hex}" for _ in range(num_child_workflows)
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ await self.execute_workflow(self.wf_input)
+
+ self.assertEqual(mock_execute_child_workflow.call_count, num_child_workflows)
+ # Check that VnfTerminateWorkflow is executed
+ for i in range(0, num_child_workflows):
+ call_args = mock_execute_child_workflow.call_args_list[i].kwargs["workflow"]
+ self.assertEqual(call_args, VnfTerminateWorkflow.__name__)
+
+ async def test_ns_terminate_workflow__get_vnf_details_failed__raises_activity_error(
+ self,
+ ):
+ activities = [
+ MockGetVnfDetailsRaises(),
+ mock_get_model_names,
+ mock_remove_model,
+ mock_check_model_is_removed,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+ async def test_ns_terminate_workflow__get_model_names_failed__raises_activity_error(
+ self,
+ ):
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_model_names_raises,
+ mock_remove_model,
+ mock_check_model_is_removed,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+ async def test_ns_terminate_workflow__remove_model_failed__raises_activity_error(
+ self,
+ ):
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_model_names,
+ mock_remove_model_raises,
+ mock_check_model_is_removed,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+ async def test_ns_terminate_workflow__check_model_removed_failed__raises_activity_error(
+ self,
+ ):
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_model_names,
+ mock_remove_model,
+ mock_check_model_is_removed_raises,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+ async def test_ns_terminate_workflow__wf_vnf_terminate_failed__raises_child_wf_error(
+ self,
+ ):
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_model_names,
+ mock_remove_model,
+ mock_check_model_is_removed,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue,
+ [
+ NsTerminateWorkflowImpl,
+ MockVnfTerminateWorkflowFailed,
+ ],
+ activities,
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(isinstance(err.exception.cause.cause, ChildWorkflowError))
+
+ async def test_ns_terminate__check_model_is_removed_cancelled__activity_error_is_raised(
+ self,
+ ):
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_model_names,
+ mock_remove_model,
+ mock_check_model_is_removed_cancelled,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertTrue(isinstance(err.exception.cause, ActivityError))
+
+
class TestNsDeleteRecordsWorkflow(asynctest.TestCase):
async def setUp(self):
self.env = await WorkflowEnvironment.start_time_skipping()