from temporalio.client import WorkflowFailureError
from temporalio.exceptions import (
ActivityError,
+ ApplicationError,
ChildWorkflowError,
RetryState,
)
from osm_common.temporal.activities.vnf import (
ChangeVnfInstantiationState,
ChangeVnfState,
+ DeleteVnfRecord,
GetTaskQueue,
GetVimCloud,
GetVnfDescriptor,
VnfTerminateWorkflow,
)
from osm_lcm.temporal.vnf_workflows import (
+ VnfDeleteWorkflowImpl,
VnfInstantiateWorkflowImpl,
VnfPrepareWorkflowImpl,
VnfTerminateWorkflowImpl,
{"key": "track", "value": "latest"},
{"key": "channel", "value": "stable"},
]
-wf_input = VnfInstantiateWorkflowImpl.Input(
- vnfr_uuid=vnfr_uuid,
- model_name=model_name,
- instantiation_config={},
-)
vdu = {
"id": vdu_id,
"name": "hackfest_basic-VM",
self.tracker.return_value = GetVnfRecord.Output(vnfr=sample_vnfr)
+@activity.defn(name=GetVnfRecord.__name__)
+async def mock_get_vnf_record_not_instantiated_state(
+ get_vnf_record_input: GetVnfRecord.Input,
+) -> GetVnfRecord.Output:
+ vnfr = deepcopy(sample_vnfr)
+ vnfr["instantiationState"] = VnfInstantiationState.NOT_INSTANTIATED.name
+ return GetVnfRecord.Output(vnfr=vnfr)
+
+
+@activity.defn(name=GetVnfRecord.__name__)
+async def mock_get_vnf_record_instantiated_state(
+ get_vnf_record_input: GetVnfRecord.Input,
+) -> GetVnfRecord.Output:
+ vnfr = deepcopy(sample_vnfr)
+ vnfr["instantiationState"] = VnfInstantiationState.INSTANTIATED.name
+ return GetVnfRecord.Output(vnfr=vnfr)
+
+
@activity.defn(name=GetVnfRecord.__name__)
async def mock_get_vnf_record_raise_exception(
get_vnf_record_input: GetVnfRecord.Input,
self.env = await WorkflowEnvironment.start_time_skipping()
self.client = self.env.client
self.task_queue = LCM_TASK_QUEUE
+ self.wf_input = VnfPrepareWorkflowImpl.Input(
+ vnfr_uuid=vnfr_uuid, model_name=model_name
+ )
async def test_vnf_prepare_workflow__successful(self):
async with self.env:
):
await self.client.execute_workflow(
VnfPrepareWorkflowImpl.run,
- arg=wf_input,
- id=f"{VnfPrepareWorkflow.__name__}-{wf_input.vnfr_uuid}",
+ arg=self.wf_input,
+ id=f"{VnfPrepareWorkflow.__name__}-{self.wf_input.vnfr_uuid}",
task_queue=self.task_queue,
task_timeout=TASK_TIMEOUT,
execution_timeout=EXECUTION_TIMEOUT,
with self.assertRaises(WorkflowFailureError) as err:
await self.client.execute_workflow(
VnfPrepareWorkflowImpl.run,
- arg=wf_input,
- id=f"{VnfPrepareWorkflow.__name__}-{wf_input.vnfr_uuid}",
+ arg=self.wf_input,
+ id=f"{VnfPrepareWorkflow.__name__}-{self.wf_input.vnfr_uuid}",
task_queue=self.task_queue,
task_timeout=TASK_TIMEOUT,
execution_timeout=EXECUTION_TIMEOUT,
with self.assertRaises(WorkflowFailureError) as err:
await self.client.execute_workflow(
VnfPrepareWorkflowImpl.run,
- arg=wf_input,
- id=f"{VnfPrepareWorkflow.__name__}-{wf_input.vnfr_uuid}",
+ arg=self.wf_input,
+ id=f"{VnfPrepareWorkflow.__name__}-{self.wf_input.vnfr_uuid}",
task_queue=self.task_queue,
task_timeout=TASK_TIMEOUT,
execution_timeout=EXECUTION_TIMEOUT,
MockVduInstantiateWorkflow,
]
self.task_queue = LCM_TASK_QUEUE
+ self.wf_input = VnfInstantiateWorkflowImpl.Input(
+ vnfr_uuid=vnfr_uuid,
+ model_name=model_name,
+ instantiation_config={},
+ )
def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker:
return Worker(
debug_mode=DEBUG_MODE,
)
- async def execute_workflow(self, wf_input=wf_input):
+ async def execute_workflow(self, wf_input):
return await self.client.execute_workflow(
VnfInstantiateWorkflowImpl.run,
arg=wf_input,
async with self.env, self.get_worker(
self.task_queue, workflows, activities
), self.get_worker(self.task_queue, workflows, activities):
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.validate_vnf_instantiation_state_is_updated(
VnfInstantiationState.INSTANTIATED
)
async with self.env, self.get_worker(
self.task_queue, workflows, activities
), self.get_worker(self.task_queue, workflows, activities):
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.validate_vnf_instantiation_state_is_updated(VnfState.STARTED)
@patch(
sample_vdu_instantiate_wf_id_1,
)
async with self.env, self.get_worker(self.task_queue, workflows, activities):
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(mock_execute_child_workflow.called)
self.assertEquals(mock_execute_child_workflow.call_count, 2)
# Check that PrepareVnfWorkflow is executed
for _ in range(num_child_workflows)
]
async with self.env, self.get_worker(self.task_queue, workflows, activities):
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertEquals(
mock_execute_child_workflow.call_count, num_child_workflows + 1
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ActivityError))
async def test_vnf_instantiate_workflow__activity_change_vnf_instantiation_state_failed__vnf_state_not_updated(
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError):
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.mock_change_vnf_state_tracker.assert_not_called()
async def test_vnf_instantiate_workflow__change_vnf_state_failed__raise_activity_error(
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ActivityError))
async def test_vnf_instantiate_workflow__get_task_queue_failed__raises_activity_error(
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ActivityError))
async def test_vnf_instantiate_workflow__wf_vnf_prepare_failed__raises_child_wf_error(
activities,
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause.cause, ChildWorkflowError))
async def test_vnf_instantiate_workflow__wf_vdu_instantiate_failed__raises_child_wf_error(
activities,
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ChildWorkflowError))
async def test_vnf_instantiate_workflow__get_vnf_descriptor_failed__raises_activity_error(
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ActivityError))
async def test_vnf_instantiate_workflow__get_vnf_record_failed__raises_activity_error(
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ActivityError))
async def test_vnf_instantiate_workflow__get_vim_cloud_failed__raises_activity_error(
self.task_queue, self.workflows, activities
):
with self.assertRaises(WorkflowFailureError) as err:
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
self.assertTrue(isinstance(err.exception.cause, ActivityError))
@patch("osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflowImpl.instantiate_vdus")
async with self.env, self.get_worker(
self.task_queue, workflows, activities
), self.get_worker(juju_task_queue, workflows, activities):
- await self.execute_workflow()
+ await self.execute_workflow(self.wf_input)
mock_instantiate_vdus.assert_called_once_with(
vnfr=sample_vnfr,
vnfd=sample_vnfd,
)
+class TestVnfDeleteWorkflow(asynctest.TestCase):
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+ self.task_queue = LCM_TASK_QUEUE
+ self.wf_input = VnfDeleteWorkflowImpl.Input(vnfr_uuid=vnfr_uuid)
+ self.mock_delete_vnf_record_tracker = Mock()
+
+ def assert_vnf_record_is_called(self):
+ self.assertEqual(
+ self.mock_delete_vnf_record_tracker.call_args_list[0].args[0],
+ DeleteVnfRecord.Input(vnfr_uuid=vnfr_uuid),
+ )
+
+ @activity.defn(name=DeleteVnfRecord.__name__)
+ async def mock_delete_vnf_record(
+ self,
+ delete_vnf_record_input: DeleteVnfRecord.Input,
+ ) -> None:
+ self.mock_delete_vnf_record_tracker(delete_vnf_record_input)
+
+ @activity.defn(name=DeleteVnfRecord.__name__)
+ async def mock_delete_vnf_record_raises_exception(
+ self,
+ delete_vnf_record_input: DeleteVnfRecord.Input,
+ ) -> None:
+ self.mock_delete_vnf_record_tracker(delete_vnf_record_input)
+ raise TestException(f"{DeleteVnfRecord.__name__} failed.")
+
+ async def test_vnf_delete_workflow__successful(self):
+ async with self.env:
+ async with Worker(
+ self.client,
+ debug_mode=DEBUG_MODE,
+ task_queue=self.task_queue,
+ workflows=[VnfDeleteWorkflowImpl],
+ activities=[
+ mock_get_vnf_record_not_instantiated_state,
+ self.mock_delete_vnf_record,
+ ],
+ ):
+ await self.client.execute_workflow(
+ VnfDeleteWorkflowImpl.run,
+ arg=self.wf_input,
+ id=f"{VnfDeleteWorkflowImpl.__name__}-{self.wf_input.vnfr_uuid}",
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assert_vnf_record_is_called()
+
+ async def test_vnf_delete_workflow__instantiated_state__fails(self):
+ async with self.env:
+ async with Worker(
+ self.client,
+ debug_mode=DEBUG_MODE,
+ task_queue=self.task_queue,
+ workflows=[VnfDeleteWorkflowImpl],
+ activities=[mock_get_vnf_record_instantiated_state],
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VnfDeleteWorkflowImpl.run,
+ arg=self.wf_input,
+ id=f"{VnfDeleteWorkflowImpl.__name__}-{self.wf_input.vnfr_uuid}",
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertIsInstance(err.exception.cause, ApplicationError)
+ self.assertEqual(
+ str(err.exception.cause),
+ "VNF must be in instantiation state 'NOT_INSTANTIATED'",
+ )
+ self.mock_delete_vnf_record_tracker.assert_not_called()
+
+ async def test_vnf_delete_workflow__get_vnf_record_fails__workflow_fails(self):
+ async with self.env:
+ async with Worker(
+ self.client,
+ debug_mode=DEBUG_MODE,
+ task_queue=self.task_queue,
+ workflows=[VnfDeleteWorkflowImpl],
+ activities=[mock_get_vnf_record_raise_exception],
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VnfDeleteWorkflowImpl.run,
+ arg=self.wf_input,
+ id=f"{VnfDeleteWorkflowImpl.__name__}-{self.wf_input.vnfr_uuid}",
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertIsInstance(err.exception.cause, ActivityError)
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {GetVnfRecord.__name__} failed.",
+ )
+ self.mock_delete_vnf_record_tracker.assert_not_called()
+
+ async def test_vnf_delete_workflow__delete_vnf_record_fails__workflow_fails(self):
+ async with self.env:
+ async with Worker(
+ self.client,
+ debug_mode=DEBUG_MODE,
+ task_queue=self.task_queue,
+ workflows=[VnfDeleteWorkflowImpl],
+ activities=[
+ mock_get_vnf_record_not_instantiated_state,
+ self.mock_delete_vnf_record_raises_exception,
+ ],
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VnfDeleteWorkflowImpl.run,
+ arg=self.wf_input,
+ id=f"{VnfDeleteWorkflowImpl.__name__}-{self.wf_input.vnfr_uuid}",
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+ self.assertIsInstance(err.exception.cause, ActivityError)
+ self.assertEqual(
+ str(err.exception.cause.cause),
+ f"TestException: {DeleteVnfRecord.__name__} failed.",
+ )
+ self.assert_vnf_record_is_called()
+
+
if __name__ == "__main__":
asynctest.main()