# See the License for the specific language governing permissions and
# limitations under the License.
-
import asynctest
-import logging
-from osm_common.dataclasses.temporal_dataclasses import VnfInstantiateInput
+from temporalio import activity
+from temporalio import workflow
+from temporalio.client import WorkflowFailureError
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+from unittest.mock import Mock
+
+from osm_common.dataclasses.temporal_dataclasses import (
+ ChangeVnfInstantiationStateInput,
+ ChangeVnfStateInput,
+ CharmInfo,
+ GetTaskQueueInput,
+ GetTaskQueueOutput,
+ GetVnfDetailsInput,
+ GetVnfDetailsOutput,
+ VduInstantiateInput,
+ VnfInstantiateInput,
+ VnfInstantiationState,
+ VnfState,
+)
from osm_common.temporal_constants import (
- LCM_TASK_QUEUE,
+ ACTIVITY_CHANGE_VNF_STATE,
+ ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
+ ACTIVITY_GET_TASK_QUEUE,
+ ACTIVITY_GET_VNF_DETAILS,
+ ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_SET_VNF_MODEL,
+ LCM_TASK_QUEUE,
+ WORKFLOW_VDU_INSTANTIATE,
WORKFLOW_VNF_PREPARE,
)
-from osm_lcm.temporal.vnf_workflows import VnfPrepareWorkflow
-from temporalio import activity
-from temporalio.testing import WorkflowEnvironment
-from temporalio.worker import Worker
+from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow
wf_input = VnfInstantiateInput(
vnfr_uuid="86b53d92-4f5a-402e-8ac2-585ec6b64698",
model_name="a-model-name",
)
+juju_task_queue = "juju_task_queue"
+vnfr_uuid = "9f472177-95c0-4335-b357-5cdc17a79965"
+sample_vnfr = {
+ "_id": vnfr_uuid,
+ "id": vnfr_uuid,
+ "nsr-id-ref": "dcf4c922-5a73-41bf-a6ca-870c22d6336c",
+ "vnfd-ref": "jar_vnfd_scalable",
+ "vnfd-id": "f1b38eac-190c-485d-9a74-c6e169c929d8",
+ "vim-account-id": "9b0bedc3-ea8e-42fd-acc9-dd03f4dee73c",
+}
+vdu = {
+ "id": "hackfest_basic-VM",
+ "name": "hackfest_basic-VM",
+ "sw-image-desc": "ubuntu18.04",
+ "virtual-compute-desc": "hackfest_basic-VM-compute",
+ "virtual-storage-desc": ["hackfest_basic-VM-storage"],
+}
+vnfd_id = "97784f19-d254-4252-946c-cf92d85443ca"
+sample_vnfd = {
+ "_id": vnfd_id,
+ "id": "sol006-vnf",
+ "provider": "Canonical",
+ "product-name": "test-vnf",
+ "software-version": "1.0",
+ "vdu": [vdu],
+}
+SANDBOXED = False
+DEBUG_MODE = True
+
+class TestException(Exception):
+ pass
+
+# Mock Activities
@activity.defn(name=ACTIVITY_SET_VNF_MODEL)
-async def set_vnf_model_mocked(set_vnf_model_input: VnfInstantiateInput) -> None:
- logging.debug(
- f"VNF {set_vnf_model_input.vnfr_uuid} model name is updated to {set_vnf_model_input.model_name}."
- )
+async def mock_set_vnf_model(set_vnf_model_input: VnfInstantiateInput) -> None:
+ pass
+
+
+@activity.defn(name=ACTIVITY_SET_VNF_MODEL)
+async def mock_set_vnf_model_raise_exception(
+ set_vnf_model_input: VnfInstantiateInput,
+) -> None:
+ raise TestException(f"{ACTIVITY_SET_VNF_MODEL} failed.")
+
+
+@activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
+async def mock_get_task_queue(
+ get_task_queue_input: GetTaskQueueInput,
+) -> GetTaskQueueOutput:
+ return GetTaskQueueOutput(LCM_TASK_QUEUE)
+
+
+@activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
+async def mock_get_different_task_queue(
+ get_task_queue_input: GetTaskQueueInput,
+) -> GetTaskQueueOutput:
+ return GetTaskQueueOutput(juju_task_queue)
+
+
+@activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
+async def mock_get_task_queue_raise_exception(
+ get_task_queue_input: GetTaskQueueInput,
+) -> GetTaskQueueOutput:
+ raise TestException(f"{ACTIVITY_GET_TASK_QUEUE} failed.")
+
+
+@activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
+async def mock_get_vnf_details(
+ get_vnf_details_input: GetVnfDetailsInput,
+) -> GetVnfDetailsOutput:
+ return GetVnfDetailsOutput(vnfr=sample_vnfr, vnfd=sample_vnfd)
+
+
+@activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
+async def mock_get_vnf_details_empty_output(
+ get_vnf_details_input: GetVnfDetailsInput,
+) -> GetVnfDetailsOutput:
+ return GetVnfDetailsOutput(vnfr={}, vnfd={})
+
+
+@activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
+async def mock_get_vnf_details_raise_exception(
+ get_vnf_details_input: GetVnfDetailsInput,
+) -> GetVnfDetailsOutput:
+ raise TestException(f"{ACTIVITY_GET_VNF_DETAILS} failed.")
+
+
+# Mock Workflowa
+@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=SANDBOXED)
+class MockPrepareVnfWorkflow:
+ @workflow.run
+ async def run(self, input: VnfInstantiateInput) -> None:
+ pass
+
+
+@workflow.defn(name=WORKFLOW_VDU_INSTANTIATE, sandboxed=SANDBOXED)
+class MockVduInstantiateWorkflow:
+ @workflow.run
+ async def run(self, input: VduInstantiateInput) -> None:
+ pass
class TestVnfPrepareWorkflow(asynctest.TestCase):
async def setUp(self):
self.env = await WorkflowEnvironment.start_time_skipping()
self.client = self.env.client
+ self.task_queue = LCM_TASK_QUEUE
- async def test_vnf_prepare_workflow(self):
+ async def test_vnf_prepare_workflow_successful(self):
async with self.env:
async with Worker(
self.client,
- task_queue=LCM_TASK_QUEUE,
+ debug_mode=DEBUG_MODE,
+ task_queue=self.task_queue,
workflows=[VnfPrepareWorkflow],
- activities=[set_vnf_model_mocked],
+ activities=[mock_set_vnf_model],
):
await self.client.execute_workflow(
VnfPrepareWorkflow.run,
arg=wf_input,
id=f"{WORKFLOW_VNF_PREPARE}-{wf_input.vnfr_uuid}",
- task_queue=LCM_TASK_QUEUE,
+ task_queue=self.task_queue,
)
+
+ async def test_vnf_prepare_workflow_fails(self):
+ async with self.env:
+ async with Worker(
+ self.client,
+ debug_mode=DEBUG_MODE,
+ task_queue=self.task_queue,
+ workflows=[VnfPrepareWorkflow],
+ activities=[mock_set_vnf_model_raise_exception],
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.client.execute_workflow(
+ VnfPrepareWorkflow.run,
+ arg=wf_input,
+ id=f"{WORKFLOW_VNF_PREPARE}-{wf_input.vnfr_uuid}",
+ task_queue=self.task_queue,
+ )
+ self.assertEqual(str(err.exception), "Workflow execution failed")
+ self.assertEqual(
+ str(err.exception.cause.cause.message),
+ "set-vnf-model failed.",
+ )
+
+
+class TestVnfInstantiateWorkflow(asynctest.TestCase):
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+ self.mock_change_vnf_instantiation_state_tracker = Mock()
+ self.mock_change_vnf_state_tracker = Mock()
+ self.mock_send_notification_for_vnf_tracker = Mock()
+ self.workflows = [
+ VnfInstantiateWorkflow,
+ MockPrepareVnfWorkflow,
+ MockVduInstantiateWorkflow,
+ ]
+ self.task_queue = LCM_TASK_QUEUE
+
+ def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker:
+ return Worker(
+ self.client,
+ task_queue=task_queue,
+ workflows=workflows,
+ activities=activities,
+ debug_mode=DEBUG_MODE,
+ )
+
+ async def execute_workflow(self):
+ return await self.client.execute_workflow(
+ VnfInstantiateWorkflow.run,
+ arg=VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name="a-model-name"),
+ id="wf1",
+ task_queue=self.task_queue,
+ )
+
+ def check_state_change_call_counts(self):
+ self.assertEqual(self.mock_change_vnf_instantiation_state_tracker.call_count, 2)
+ self.assertEqual(self.mock_change_vnf_state_tracker.call_count, 2)
+ self.assertEqual(self.mock_send_notification_for_vnf_tracker.call_count, 2)
+
+ def workflow_is_succeeded(self):
+ call_mock_change_vnf_instantiation_state_tracker = (
+ self.mock_change_vnf_instantiation_state_tracker.call_args_list
+ )
+ call_mock_change_vnf_state_tracker = (
+ self.mock_change_vnf_state_tracker.call_args_list
+ )
+ self.assertEqual(
+ call_mock_change_vnf_instantiation_state_tracker[1].args[0],
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
+ )
+ self.assertEqual(
+ call_mock_change_vnf_state_tracker[1].args[0],
+ ChangeVnfStateInput(vnfr_uuid=vnfr_uuid, state=VnfState.STARTED),
+ )
+
+ def workflow_is_failed(self, error):
+ call_mock_change_vnf_instantiation_state_tracker = (
+ self.mock_change_vnf_instantiation_state_tracker.call_args_list
+ )
+ call_mock_change_vnf_state_tracker = (
+ self.mock_change_vnf_state_tracker.call_args_list
+ )
+ self.assertEqual(
+ call_mock_change_vnf_instantiation_state_tracker[1].args[0],
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED
+ ),
+ )
+ self.assertEqual(
+ call_mock_change_vnf_state_tracker[1].args[0],
+ ChangeVnfStateInput(vnfr_uuid=vnfr_uuid, state=VnfState.STOPPED),
+ )
+ self.assertEqual(str(error.exception), "Workflow execution failed")
+
+ @activity.defn(name=ACTIVITY_CHANGE_VNF_STATE)
+ async def mock_change_vnf_state(self, nf_state_input: ChangeVnfStateInput) -> None:
+ self.mock_change_vnf_state_tracker(nf_state_input)
+
+ @activity.defn(name=ACTIVITY_CHANGE_VNF_STATE)
+ async def mock_change_vnf_state_exception(
+ self, nf_state_input: ChangeVnfStateInput
+ ) -> None:
+ self.mock_change_vnf_state_tracker(nf_state_input)
+ raise TestException(f"{ACTIVITY_CHANGE_VNF_STATE} failed.")
+
+ @activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
+ async def mock_change_vnf_instantiation_state(
+ self,
+ nf_instantiation_state_input: ChangeVnfInstantiationStateInput,
+ ) -> None:
+ self.mock_change_vnf_instantiation_state_tracker(nf_instantiation_state_input)
+
+ @activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
+ async def mock_change_vnf_instantiation_state_exception(
+ self,
+ nf_instantiation_state_input: ChangeVnfInstantiationStateInput,
+ ) -> None:
+ self.mock_change_vnf_instantiation_state_tracker(nf_instantiation_state_input)
+ raise TestException(f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE} failed.")
+
+ @activity.defn(name=ACTIVITY_SEND_NOTIFICATION_FOR_VNF)
+ async def mock_send_notification_for_vnf(self, input) -> None:
+ self.mock_send_notification_for_vnf_tracker()
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_successful(self, mock_instantiate_vdus):
+ activities = [
+ mock_get_task_queue,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ await self.execute_workflow()
+ self.check_state_change_call_counts()
+ self.workflow_is_succeeded()
+ mock_instantiate_vdus.assert_called_once_with(
+ sample_vnfr, sample_vnfd, LCM_TASK_QUEUE
+ )
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_change_vnf_instantiation_state_exception(
+ self, mock_instantiate_vdus
+ ):
+ activities = [
+ mock_get_task_queue,
+ self.mock_change_vnf_instantiation_state_exception,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow()
+ self.assertEqual(self.mock_change_vnf_instantiation_state_tracker.call_count, 6)
+ self.assertEqual(self.mock_change_vnf_state_tracker.call_count, 0)
+ self.assertEqual(self.mock_send_notification_for_vnf_tracker.call_count, 0)
+ call_mock_change_vnf_instantiation_state_tracker = (
+ self.mock_change_vnf_instantiation_state_tracker.call_args_list
+ )
+ self.assertEqual(
+ call_mock_change_vnf_instantiation_state_tracker[1].args[0],
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED
+ ),
+ )
+ mock_instantiate_vdus.assert_not_called()
+ self.assertEqual(str(err.exception), "Workflow execution failed")
+ self.assertEqual(
+ str(err.exception.cause.cause.message),
+ "change-vnf-instantiation-state failed.",
+ )
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_change_vnf_state_exception(
+ self, mock_instantiate_vdus
+ ):
+ activities = [
+ mock_get_task_queue,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state_exception,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow()
+ self.assertEqual(self.mock_change_vnf_instantiation_state_tracker.call_count, 2)
+ self.assertEqual(self.mock_change_vnf_state_tracker.call_count, 6)
+ self.assertEqual(self.mock_send_notification_for_vnf_tracker.call_count, 0)
+ self.workflow_is_failed(err)
+ mock_instantiate_vdus.assert_not_called()
+ self.assertEqual(
+ str(err.exception.cause.cause.message),
+ "change-vnf-state failed.",
+ )
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_empty_vnf_details(
+ self, mock_instantiate_vdus
+ ):
+ activities = [
+ mock_get_task_queue,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details_empty_output,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ await self.execute_workflow()
+ self.check_state_change_call_counts()
+ self.workflow_is_succeeded()
+ mock_instantiate_vdus.assert_called_once_with({}, {}, LCM_TASK_QUEUE)
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_get_task_queue_raises_exception(
+ self, mock_instantiate_vdus
+ ):
+ activities = [
+ mock_get_task_queue_raise_exception,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow()
+ self.check_state_change_call_counts()
+ self.workflow_is_failed(err)
+ mock_instantiate_vdus.assert_not_called()
+ self.assertEqual(
+ str(err.exception.cause.cause.message),
+ "get_task_queue failed.",
+ )
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_get_vnf_details_raises_exception(
+ self, mock_instantiate_vdus
+ ):
+ workflows = [VnfInstantiateWorkflow, MockPrepareVnfWorkflow]
+ activities = [
+ mock_get_task_queue,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details_raise_exception,
+ ]
+ async with self.env, self.get_worker(self.task_queue, workflows, activities):
+ with self.assertRaises(WorkflowFailureError) as err:
+ await self.execute_workflow()
+ self.check_state_change_call_counts()
+ self.workflow_is_failed(err)
+ mock_instantiate_vdus.assert_not_called()
+ self.assertEqual(
+ str(err.exception.cause.cause.message),
+ "get_vnf_details failed.",
+ )
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.get_vdu_instantiate_input"
+ )
+ async def test_vnf_instantiate_workflow_calls_vdu_instantiate_workflow(
+ self, mock_vdu_instantiate_input
+ ):
+ mock_vdu_instantiate_input.return_value = (
+ VduInstantiateInput(
+ vim_uuid="123",
+ model_name="a-model-name",
+ charm_info=CharmInfo(
+ app_name="my-app", channel="latest", entity_url="my-url"
+ ),
+ ),
+ "vdu_instantiate_workflow_id",
+ )
+ activities = [
+ mock_get_task_queue,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, self.workflows, activities
+ ):
+ await self.execute_workflow()
+ self.check_state_change_call_counts()
+ self.workflow_is_succeeded()
+ call_mock_vdu_instantiate_input = mock_vdu_instantiate_input.call_args
+ self.assertEqual(call_mock_vdu_instantiate_input.args[0], sample_vnfr)
+ self.assertEqual(call_mock_vdu_instantiate_input.args[1], sample_vnfd)
+ self.assertEqual(call_mock_vdu_instantiate_input.args[2], vdu)
+
+ @asynctest.mock.patch(
+ "osm_lcm.temporal.vnf_workflows.VnfInstantiateWorkflow.instantiate_vdus"
+ )
+ async def test_vnf_instantiate_workflow_successful_use_different_task_queue(
+ self, mock_instantiate_vdus
+ ):
+ workflows = [VnfInstantiateWorkflow, MockPrepareVnfWorkflow]
+ activities = [
+ mock_get_different_task_queue,
+ self.mock_change_vnf_instantiation_state,
+ self.mock_change_vnf_state,
+ self.mock_send_notification_for_vnf,
+ mock_set_vnf_model,
+ mock_get_vnf_details,
+ ]
+ async with self.env, self.get_worker(
+ self.task_queue, workflows, activities
+ ), self.get_worker(juju_task_queue, workflows, activities):
+ await self.execute_workflow()
+ self.check_state_change_call_counts()
+ self.workflow_is_succeeded()
+ mock_instantiate_vdus.assert_called_once_with(
+ sample_vnfr, sample_vnfd, juju_task_queue
+ )
+
+
+if __name__ == "__main__":
+ asynctest.main()