# limitations under the License.
import asyncio
+from datetime import timedelta
import traceback
from osm_common.temporal.activities.paas import CreateModel
from osm_common.temporal.activities.ns import (
+ DeleteNsRecord,
GetVnfDetails,
GetNsRecord,
UpdateNsState,
)
from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
-from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow
+from osm_common.temporal.workflows.ns import (
+ NsInstantiateWorkflow,
+ NsDeleteRecordsWorkflow,
+)
+from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_common.temporal.states import NsState
from temporalio import workflow
+from temporalio.common import RetryPolicy
from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError, ChildWorkflowError
+from temporalio.exceptions import (
+ ActivityError,
+ ApplicationError,
+ ChildWorkflowError,
+ FailureError,
+)
+
+_SANDBOXED = False
+retry_policy = RetryPolicy(maximum_attempts=3)
+no_retry_policy = RetryPolicy(maximum_attempts=1)
+default_schedule_to_close_timeout = timedelta(minutes=10)
-@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=False)
+@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=_SANDBOXED)
class NsInstantiateWorkflowImpl(LcmOperationWorkflow):
@workflow.run
async def wrap_nslcmop(self, workflow_input: NsInstantiateWorkflow.Input) -> None:
if vnf_config.get("member-vnf-index") == vnf_member_index_ref:
return vnf_config
return {}
+
+
+@workflow.defn(name=NsDeleteRecordsWorkflow.__name__, sandboxed=_SANDBOXED)
+class NsDeleteRecordsWorkflowImpl(NsDeleteRecordsWorkflow):
+ @workflow.run
+ async def run(self, workflow_input: NsDeleteRecordsWorkflow.Input) -> None:
+ ns_uuid = workflow_input.ns_uuid
+ try:
+ nsr = value_to_type(
+ GetNsRecord.Output,
+ await workflow.execute_activity(
+ activity=GetNsRecord.__name__,
+ arg=GetNsRecord.Input(ns_uuid),
+ activity_id=f"{GetNsRecord.__name__}-{ns_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ ).nsr
+ instantiation_state = nsr.get("nsState")
+ if instantiation_state != NsState.NOT_INSTANTIATED.name:
+ raise ApplicationError(
+ f"NS must be in {NsState.NOT_INSTANTIATED.name} state",
+ non_retryable=True,
+ )
+ vnf_details = value_to_type(
+ GetVnfDetails.Output,
+ await workflow.execute_activity(
+ activity=GetVnfDetails.__name__,
+ arg=GetVnfDetails.Input(ns_uuid=ns_uuid),
+ activity_id=f"{GetVnfDetails.__name__}-{ns_uuid}",
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ )
+
+ await asyncio.gather(
+ *(
+ workflow.execute_child_workflow(
+ workflow=VnfDeleteWorkflow.__name__,
+ arg=VnfDeleteWorkflow.Input(
+ vnfr_uuid=vnfr_uuid,
+ ),
+ id=f"{VnfDeleteWorkflow.__name__}-{vnfr_uuid}",
+ )
+ for vnfr_uuid, _ in vnf_details.vnf_details
+ )
+ )
+ await workflow.execute_activity(
+ activity=DeleteNsRecord.__name__,
+ arg=DeleteNsRecord.Input(ns_uuid),
+ activity_id=f"{DeleteNsRecord.__name__}-{ns_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+
+ except FailureError as e:
+ if not hasattr(e, "cause") or e.cause is None:
+ self.logger.error(
+ f"{NsDeleteRecordsWorkflow.__name__} failed with {str(e)}"
+ )
+ raise e
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(
+ f"{NsDeleteRecordsWorkflow.__name__} failed with {err_details}"
+ )
+ raise e
import asynctest
import copy
from datetime import timedelta
+from typing import Any
from unittest.mock import Mock, patch
+import uuid
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_common.temporal.activities.lcm import (
GetNsRecord,
GetVnfDetails,
UpdateNsState,
+ DeleteNsRecord,
)
from osm_common.temporal.activities.paas import (
CreateModel,
)
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow
+from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
from osm_common.temporal.states import LcmOperationState, NsState
from osm_lcm.temporal.ns_workflows import (
+ NsDeleteRecordsWorkflow,
+ NsDeleteRecordsWorkflowImpl,
NsInstantiateWorkflow,
NsInstantiateWorkflowImpl,
)
from temporalio.common import RetryPolicy
from temporalio.exceptions import (
ActivityError,
+ ApplicationError,
ChildWorkflowError,
RetryState,
TimeoutError,
pass
+class MockActivity:
+ def __init__(self, *args: Any, **kwargs: Any):
+ self.tracker = Mock(return_value=None)
+
+ async def __call__(self, *args: Any, **kwargs: Any) -> Any:
+ return self.tracker(*args, **kwargs)
+
+ def __setattr__(self, __name: str, __value: Any) -> None:
+ if __name in ("return_value", "side_effect"):
+ self.tracker.__setattr__(__name, __value)
+ else:
+ super().__setattr__(__name, __value)
+
+
@activity.defn(name=CreateModel.__name__)
async def mock_create_model(create_model_input: CreateModel.Input) -> None:
pass
@activity.defn(name=GetVnfDetails.__name__)
-async def mock_get_vnf_details(
- get_vnf_details_input: GetVnfDetails.Input,
-) -> GetVnfDetails.Output:
- return GetVnfDetails.Output(vnf_details=vnf_details)
+class MockGetVnfDetails(MockActivity):
+ def __init__(self, *args: Any, **kwargs: Any):
+ super().__init__(*args, **kwargs)
+ self.tracker.return_value = GetVnfDetails.Output(vnf_details=vnf_details)
@activity.defn(name=GetVnfDetails.__name__)
-async def mock_get_vnf_details_raises(
- et_vnf_details_input: GetVnfDetails.Input,
-) -> GetVnfDetails.Output:
- raise TestException(f"{GetVnfDetails.__name__} failed.")
+class MockGetVnfDetailsRaises(MockActivity):
+ def __init__(self, *args: Any, **kwargs: Any):
+ super().__init__(*args, **kwargs)
+ self.tracker.side_effect = TestException(f"{GetVnfDetails.__name__} failed.")
@activity.defn(name=GetNsRecord.__name__)
raise TestException(f"{GetNsRecord.__name__} failed.")
+@activity.defn(name=GetNsRecord.__name__)
+async def mock_get_ns_record_not_instantiated_state(
+ get_ns_record_input: GetNsRecord.Input,
+) -> GetNsRecord.Output:
+ nsr = copy.deepcopy(sample_nsr)
+ nsr["nsState"] = NsState.NOT_INSTANTIATED.name
+ return GetNsRecord.Output(nsr=nsr)
+
+
+@activity.defn(name=GetNsRecord.__name__)
+async def mock_get_ns_record_instantiated_state(
+ get_ns_record_input: GetNsRecord.Input,
+) -> GetNsRecord.Output:
+ nsr = copy.deepcopy(sample_nsr)
+ nsr["nsState"] = NsState.INSTANTIATED.name
+ return GetNsRecord.Output(nsr=nsr)
+
+
@workflow.defn(name=VnfInstantiateWorkflow.__name__, sandboxed=SANDBOXED)
class MockVnfInstantiateWorkflow:
@workflow.run
)
+@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfDeleteWorkflow:
+ @workflow.run
+ async def run(self, input: VnfDeleteWorkflow.Input) -> None:
+ pass
+
+
+@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfDeleteWorkflowFailed:
+ @workflow.run
+ async def run(self, input: VnfDeleteWorkflow.Input) -> None:
+ raise ChildWorkflowError(
+ message=f"{VnfDeleteWorkflow.__name__} child workflow failed.",
+ namespace="default",
+ workflow_id="123",
+ run_id="1",
+ workflow_type=VnfDeleteWorkflow.__name__,
+ initiated_event_id=0,
+ started_event_id=0,
+ retry_state=RetryState.NON_RETRYABLE_FAILURE,
+ )
+
+
@patch(
"osm_lcm.temporal.ns_workflows.NsInstantiateWorkflowImpl._get_namespace",
new=mock_get_namespace,
async def test_instantiate_workflow__successful__update_lcm_op_state(self):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
async def test_instantiate_workflow__successful__update_ns_state(self):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model_raises,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model_raises,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model_raises,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model_raises,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record_raise_exception,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record_raise_exception,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details_raises,
+ MockGetVnfDetailsRaises(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details_raises,
+ MockGetVnfDetailsRaises(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
# Because of workflow task timeout policy, workflow times out
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
# Workflow task failure
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
# Workflow task failure
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
):
activities = [
mock_create_model,
- mock_get_vnf_details,
+ MockGetVnfDetails(),
mock_get_ns_record,
self.mock_update_ns_state,
self.mock_update_lcm_operation_state,
vnf_member_index_ref_2, sample_nsr
)
self.assertEqual(result, {})
+
+
+class TestNsDeleteRecordsWorkflow(asynctest.TestCase):
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+ self.task_queue = LCM_TASK_QUEUE
+ self.wf_input = NsDeleteRecordsWorkflow.Input(ns_uuid=ns_uuid)
+ self.mock_delete_ns_record_tracker = Mock()
+
+ def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker:
+ return Worker(
+ self.client,
+ task_queue=task_queue,
+ workflows=workflows,
+ activities=activities,
+ debug_mode=DEBUG_MODE,
+ )
+
+ async def execute_workflow(self, wf_input):
+ return await self.client.execute_workflow(
+ NsDeleteRecordsWorkflowImpl.run,
+ arg=wf_input,
+ id=uuid.uuid4().hex,
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+
+ def assert_ns_record_is_called(self):
+ self.assertEqual(
+ self.mock_delete_ns_record_tracker.call_args_list[0].args[0],
+ DeleteNsRecord.Input(ns_uuid=ns_uuid),
+ )
+
+ @activity.defn(name=DeleteNsRecord.__name__)
+ async def mock_delete_ns_record(
+ self,
+ delete_ns_record_input: DeleteNsRecord.Input,
+ ) -> None:
+ self.mock_delete_ns_record_tracker(delete_ns_record_input)
+
+ @activity.defn(name=DeleteNsRecord.__name__)
+ async def mock_delete_ns_record_raises_exception(
+ self,
+ delete_ns_record_input: DeleteNsRecord.Input,
+ ) -> None:
+ self.mock_delete_ns_record_tracker(delete_ns_record_input)
+ raise TestException(f"{DeleteNsRecord.__name__} failed.")
+
+ async def test_delete_ns_records_workflow__successful(self):
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_ns_record_not_instantiated_state,
+ self.mock_delete_ns_record,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ await self.execute_workflow(self.wf_input)
+ self.assert_ns_record_is_called()
+
+ @patch(
+ "temporalio.workflow.execute_child_workflow",
+ wraps=workflow.execute_child_workflow,
+ )
+ async def test_ns_terminate_workflow__successful__executes_vnf_delete_child_workflows_concurrently(
+ self,
+ mock_execute_child_workflow: asynctest.CoroutineMock,
+ ):
+ # Choose a number of child workflows that would take longer to execute than the execution timeout
+ num_child_workflows = EXECUTION_TIMEOUT.seconds * 2 + 1
+ mock_get_vnf_details = MockGetVnfDetails()
+ mock_get_vnf_details.return_value = GetVnfDetails.Output(
+ vnf_details=[
+ (f"vnfr-{uuid.uuid4().hex}", vnf_member_index_ref_1)
+ for _ in range(num_child_workflows)
+ ]
+ )
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+ activities = [
+ mock_get_vnf_details,
+ mock_get_ns_record_not_instantiated_state,
+ self.mock_delete_ns_record,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ await self.execute_workflow(self.wf_input)
+ self.assert_ns_record_is_called()
+ self.assertEqual(mock_execute_child_workflow.call_count, num_child_workflows)
+ # Check that VnfDeleteWorkflow is executed
+ for i in range(0, num_child_workflows):
+ call_args = mock_execute_child_workflow.call_args_list[i].kwargs["workflow"]
+ self.assertEqual(call_args, VnfDeleteWorkflow.__name__)
+
+ async def test_delete_ns_records_workflow__instantiated_state__fails(self):
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_ns_record_instantiated_state,
+ self.mock_delete_ns_record,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertIsInstance(err.exception.cause, ApplicationError)
+ self.assertEqual(
+ str(err.exception.cause),
+ "NS must be in NOT_INSTANTIATED state",
+ )
+ self.mock_delete_ns_record_tracker.assert_not_called()
+
+ async def test_delete_ns_records_workflow__get_vnf_details_fails__workflow_fails(
+ self,
+ ):
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+ activities = [
+ MockGetVnfDetailsRaises(),
+ mock_get_ns_record_not_instantiated_state,
+ self.mock_delete_ns_record,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertIsInstance(err.exception.cause, ActivityError)
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {GetVnfDetails.__name__} failed.",
+ )
+ self.mock_delete_ns_record_tracker.assert_not_called()
+
+ async def test_delete_ns_records_workflow__get_ns_record_fails__workflow_fails(
+ self,
+ ):
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_ns_record_raise_exception,
+ self.mock_delete_ns_record,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertIsInstance(err.exception.cause, ActivityError)
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {GetNsRecord.__name__} failed.",
+ )
+ self.mock_delete_ns_record_tracker.assert_not_called()
+
+ async def test_delete_ns_records_workflow__delete_ns_record_fails__workflow_fails(
+ self,
+ ):
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_ns_record_not_instantiated_state,
+ self.mock_delete_ns_record_raises_exception,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertIsInstance(err.exception.cause, ActivityError)
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {DeleteNsRecord.__name__} failed.",
+ )
+ self.mock_delete_ns_record_tracker.assert_called()
+
+ async def test_delete_ns_records_workflow__vnf_delete_workflow_failed__workflow_fails(
+ self,
+ ):
+ workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflowFailed]
+ activities = [
+ MockGetVnfDetails(),
+ mock_get_ns_record_not_instantiated_state,
+ self.mock_delete_ns_record,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow(self.wf_input)
+ self.assertIsInstance(err.exception.cause, ChildWorkflowError)
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ "VnfDeleteWorkflow child workflow failed.",
+ )
+ self.mock_delete_ns_record_tracker.assert_not_called()