from osm_common.temporal.workflows.vnf import (
VnfInstantiateWorkflow,
VnfPrepareWorkflow,
+ VnfTerminateWorkflow,
+)
+from osm_common.temporal.workflows.vdu import (
+ VduInstantiateWorkflow,
+ VduTerminateWorkflow,
)
-from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError, ChildWorkflowError
+from temporalio.exceptions import (
+ ActivityError,
+ ChildWorkflowError,
+ FailureError,
+ ApplicationError,
+)
_SANDBOXED = False
retry_policy = RetryPolicy(maximum_attempts=3)
f"{VnfPrepareWorkflow.__name__} failed with {err_details}"
)
raise e
+
+
+@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=False)
+class VnfTerminateWorkflowImpl(VnfTerminateWorkflow):
+ @workflow.run
+ async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None:
+ try:
+ vnfr: dict = value_to_type(
+ GetVnfRecord.Output,
+ await workflow.execute_activity(
+ activity=GetVnfRecord.__name__,
+ arg=GetVnfRecord.Input(vnfr_uuid=workflow_input.vnfr_uuid),
+ activity_id=f"{GetVnfRecord.__name__}-{workflow_input.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ ).vnfr
+ if (
+ vnfr.get("instantiationState")
+ != VnfInstantiationState.INSTANTIATED.name
+ ):
+ self.logger.warning(
+ f"pre-condition not met. VNF(`{workflow_input.vnfr_uuid}`) is not instantiated. Continuing anyway..."
+ )
+ vnfd_uuid = vnfr.get("vnfd-id")
+ if vnfd_uuid is None:
+ raise ApplicationError(
+ "`vnfd-id` is not set in VNFR. Cannot proceed with termination.",
+ non_retryable=True,
+ )
+ model_name = vnfr.get("namespace")
+ if model_name is None:
+ raise ApplicationError(
+ "`vim-account-id` is not set in VNFR", non_retryable=True
+ )
+
+ vnfd: dict = value_to_type(
+ GetVnfDescriptor.Output,
+ await workflow.execute_activity(
+ activity=GetVnfDescriptor.__name__,
+ arg=GetVnfDescriptor.Input(vnfd_uuid=vnfd_uuid),
+ activity_id=f"{GetVnfDescriptor.__name__}-{vnfd_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ ).vnfd
+ # Discover relations
+ # TODO: Implement this later
+
+ # Remove relations
+ # TODO: Implement this later
+
+ # For each VDU, execute VduTerminateWorkflow
+ vdus = vnfd.get("vdu", [])
+ vim_uuid = vnfr.get("vim-account-id")
+ if vim_uuid is None:
+ raise ApplicationError(
+ "`vim-account-id` is not set in VNFD", non_retryable=True
+ )
+ await asyncio.gather(
+ *[
+ workflow.execute_child_workflow(
+ workflow=VduTerminateWorkflow.__name__,
+ arg=VduTerminateWorkflow.Input(
+ vim_uuid=vim_uuid,
+ application_name=vdu["id"],
+ model_name=model_name,
+ ),
+ id=f"{VduTerminateWorkflow.__name__}-{vdu['id']}",
+ task_queue=LCM_TASK_QUEUE,
+ retry_policy=retry_policy,
+ )
+ for vdu in vdus
+ if "id" in vdu
+ ]
+ )
+
+ # Change VNF instantiation state to NOT_INSTANTIATED
+ await workflow.execute_activity(
+ activity=ChangeVnfInstantiationState.__name__,
+ arg=ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.NOT_INSTANTIATED,
+ ),
+ activity_id=f"{ChangeVnfInstantiationState.__name__}-{workflow_input.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ )
+ except FailureError as e:
+ if not hasattr(e, "cause") or e.cause is None:
+ raise e
+ err_details = str(e.cause.with_traceback(e.__traceback__))
+ self.logger.error(
+ f"{VnfTerminateWorkflow.__name__} failed with {err_details}"
+ )
+ raise e
# See the License for the specific language governing permissions and
# limitations under the License.
+import asyncio
+import asynctest
import uuid
+
+from asynctest import CoroutineMock, Mock, patch
from copy import deepcopy
from datetime import timedelta
+from parameterized import parameterized
+from temporalio import activity, workflow
+from temporalio.client import WorkflowFailureError
+from temporalio.exceptions import (
+ ActivityError,
+ ChildWorkflowError,
+ RetryState,
+)
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+from typing import Any
-import asyncio
-import asynctest
-from asynctest import CoroutineMock, Mock, patch
from osm_common.temporal.activities.vnf import (
ChangeVnfInstantiationState,
ChangeVnfState,
SendNotificationForVnf,
SetVnfModel,
)
-from osm_common.temporal.dataclasses_common import CharmInfo, VduComputeConstraints
-from osm_common.temporal.states import VnfInstantiationState, VnfState
-from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow
-from osm_common.temporal.workflows.vnf import VnfPrepareWorkflow
+from osm_common.temporal.dataclasses_common import (
+ CharmInfo,
+ VduComputeConstraints,
+)
+from osm_common.temporal.states import (
+ VnfInstantiationState,
+ VnfState,
+)
from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
-from temporalio import activity, workflow
-from temporalio.client import WorkflowFailureError
-from temporalio.exceptions import ActivityError, ChildWorkflowError, RetryState
-from temporalio.testing import WorkflowEnvironment
-from temporalio.worker import Worker
-
+from osm_common.temporal.workflows.vdu import (
+ VduInstantiateWorkflow,
+ VduTerminateWorkflow,
+)
+from osm_common.temporal.workflows.vnf import (
+ VnfPrepareWorkflow,
+ VnfTerminateWorkflow,
+)
from osm_lcm.temporal.vnf_workflows import (
VnfInstantiateWorkflowImpl,
VnfPrepareWorkflowImpl,
+ VnfTerminateWorkflowImpl,
)
SANDBOXED = False
pass
+class MockActivity:
+ def __init__(self, *args: Any, **kwargs: Any):
+ self.tracker = Mock(return_value=None)
+
+ async def __call__(self, *args: Any, **kwargs: Any) -> Any:
+ return self.tracker(*args, **kwargs)
+
+ def __setattr__(self, __name: str, __value: Any) -> None:
+ if __name in ("return_value", "side_effect"):
+ self.tracker.__setattr__(__name, __value)
+ else:
+ super().__setattr__(__name, __value)
+
+
# Mock Activities
@activity.defn(name=SetVnfModel.__name__)
async def mock_set_vnf_model(set_vnf_model_input: SetVnfModel.Input) -> None:
@activity.defn(name=GetVnfDescriptor.__name__)
-class MockGetVnfDescriptor:
- def __init__(self, vnfd=sample_vnfd) -> None:
- self.vnfd = vnfd
+class MockGetVnfDescriptor(MockActivity):
+ def __init__(self, *args: Any, **kwargs: Any):
+ super().__init__(*args, **kwargs)
+ self.tracker.return_value = GetVnfDescriptor.Output(vnfd=sample_vnfd)
+
- async def __call__(
- self, get_vnf_descriptor_input: GetVnfDescriptor.Input
- ) -> GetVnfDescriptor.Output:
- return GetVnfDescriptor.Output(vnfd=self.vnfd)
+@activity.defn(name=ChangeVnfInstantiationState.__name__)
+class MockChangeVnfInstantiationState(MockActivity):
+ pass
@activity.defn(name=GetVnfDescriptor.__name__)
@activity.defn(name=GetVnfRecord.__name__)
-async def mock_get_vnf_record(
- get_vnf_record_input: GetVnfRecord.Input,
-) -> GetVnfRecord.Output:
- return GetVnfRecord.Output(vnfr=sample_vnfr)
+class MockGetVnfRecord(MockActivity):
+ def __init__(self, *args: Any, **kwargs: Any):
+ super().__init__(*args, **kwargs)
+ self.tracker.return_value = GetVnfRecord.Output(vnfr=sample_vnfr)
@activity.defn(name=GetVnfRecord.__name__)
await asyncio.sleep(0.5)
+@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVduTerminateWorkflow:
+ @workflow.run
+ async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None:
+ pass
+
+
+@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVduTerminateWorkflowRaisesException:
+ @workflow.run
+ async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None:
+ raise TestException(f"{self.__class__.__name__} failed.")
+
+
@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=SANDBOXED)
class MockVduInstantiateWorkflowFailed:
@workflow.run
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
mock_get_vdu_instantiate_input.return_value = (
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
mock_get_vdu_instantiate_input.return_value = (
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
mock_get_vdu_instantiate_input.return_value = (
MockPrepareVnfWorkflow,
MockVduInstantiateWorkflowSlow,
]
- vnfd = {
- "vdu": [vdu] * num_child_workflows,
- }
+ mock_get_vnf_descriptor = MockGetVnfDescriptor()
+ mock_get_vnf_descriptor.return_value = GetVnfDescriptor.Output(
+ vnfd={
+ "vdu": [vdu] * num_child_workflows,
+ }
+ )
activities = [
mock_get_task_queue,
self.mock_change_vnf_instantiation_state,
self.mock_change_vnf_state,
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
- MockGetVnfDescriptor(vnfd),
- mock_get_vnf_record,
+ mock_get_vnf_descriptor,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
mock_get_vnf_descriptor_raise_exception,
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud_raise_exception,
]
async with self.env, self.get_worker(
self.mock_send_notification_for_vnf,
mock_set_vnf_model,
MockGetVnfDescriptor(),
- mock_get_vnf_record,
+ MockGetVnfRecord(),
mock_get_vim_cloud,
]
async with self.env, self.get_worker(
)
+class TestVnfTerminateWorkflow(asynctest.TestCase):
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.client = self.env.client
+
+ # Maps workflow base classes to implementation classes.
+ # Can be overridden by individual tests to swap out implementations by
+ # setting self.workflows[<workflow_base_class>] = <implementation_class>
+ self.workflows = {
+ VduTerminateWorkflow: MockVduTerminateWorkflow,
+ VnfTerminateWorkflow: VnfTerminateWorkflowImpl,
+ }
+ self.mock_get_vnf_record = MockGetVnfRecord()
+ self.mock_get_vnf_descriptor = MockGetVnfDescriptor()
+ self.mock_change_vnf_instantiation_state = MockChangeVnfInstantiationState()
+ self.activities = [
+ self.mock_get_vnf_record,
+ self.mock_get_vnf_descriptor,
+ self.mock_change_vnf_instantiation_state,
+ ]
+ self.task_queue = LCM_TASK_QUEUE
+ self.workflow_input = VnfTerminateWorkflow.Input(
+ vnfr_uuid="my_vnfr_uuid",
+ )
+
+ def get_worker(self, task_queue: str, activities: list) -> Worker:
+ return Worker(
+ self.client,
+ task_queue=task_queue,
+ workflows=list(self.workflows.values()),
+ activities=activities,
+ debug_mode=DEBUG_MODE,
+ )
+
+ async def execute_workflow(self, wf_input: VnfTerminateWorkflow.Input):
+ return await self.client.execute_workflow(
+ VnfTerminateWorkflowImpl.run,
+ arg=wf_input,
+ id=uuid.uuid4().hex,
+ task_queue=self.task_queue,
+ task_timeout=TASK_TIMEOUT,
+ execution_timeout=EXECUTION_TIMEOUT,
+ )
+
+ @patch("temporalio.workflow.execute_child_workflow")
+ async def test_vnf_terminate_workflow__all_activities_called(
+ self, mock_execute_child_workflow: CoroutineMock
+ ):
+ """Test all the expected activities and child workflows are called"""
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ await self.execute_workflow(self.workflow_input)
+
+ self.mock_get_vnf_record.tracker.assert_called_once()
+ self.mock_get_vnf_descriptor.tracker.assert_called_once()
+ mock_execute_child_workflow.assert_called_once()
+ self.mock_change_vnf_instantiation_state.tracker.assert_called_once()
+
+ async def test_vnf_terminate_workflow__not_instantiated__logs_warning(self):
+ """Test warning is logged when VNF is not instantiated"""
+ self.mock_get_vnf_record.return_value = GetVnfRecord.Output(
+ vnfr={
+ "instantiationState": "NOT_INSTANTIATED",
+ "vnfd-id": "test-vnfd-id",
+ "vim-account-id": vim_account_id,
+ "namespace": model_name,
+ }
+ )
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ with self.assertLogs(level="INFO") as logs:
+ await self.execute_workflow(self.workflow_input)
+
+ any(
+ "VNF(`my_vnfr_uuid`) is not instantiated" in record.message
+ and record.levelname == "WARNING"
+ for record in logs.records
+ )
+
+ @patch("temporalio.workflow.execute_child_workflow")
+ async def test_vnf_terminate_workflow__no_vdu__successful(
+ self, mock_execute_child_workflow: CoroutineMock
+ ):
+ self.mock_get_vnf_descriptor.return_value = GetVnfDescriptor.Output(
+ vnfd={
+ "id": "test-vnfd-id",
+ "vdu": [],
+ }
+ )
+
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ await self.execute_workflow(self.workflow_input)
+
+ mock_execute_child_workflow.assert_not_called()
+
+ async def test_vnf_terminate_workflow__activity_error__is_raised(self):
+ """Validate that activity errors bubble up through the workflow"""
+ self.mock_get_vnf_record.side_effect = TestException("get_vnf_record failed")
+
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ with self.assertRaises(WorkflowFailureError):
+ await self.execute_workflow(self.workflow_input)
+
+ async def test_vnf_terminate_workflow__child_workflow_error__is_raised(self):
+ """Validates that child workflow errors bubble up to the parent workflow
+
+ This might seem unnecessary, but depending on how the co-routines are
+ awaited, errors might be silently swallowed.
+ """
+ self.workflows[VduTerminateWorkflow] = MockVduTerminateWorkflowRaisesException
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ with self.assertRaises(WorkflowFailureError):
+ await self.execute_workflow(self.workflow_input)
+
+ @parameterized.expand(["vnfd-id", "vim-account-id", "namespace"])
+ async def test_vnf_terminate_workflow__no_vnfd_id__raises_error(
+ self, vnfr_required_value
+ ):
+ """Test that a missing required value in the VNFR raises an error"""
+ vnfr = {
+ "instantiationState": "NOT_INSTANTIATED",
+ "vnfd-id": "test-vnfd-id",
+ "vim-account-id": "test-vim-account-id",
+ "namespace": "test-namespace",
+ }
+ del vnfr[vnfr_required_value]
+ self.mock_get_vnf_record.return_value = GetVnfRecord.Output(vnfr=vnfr)
+
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ with self.assertRaises(WorkflowFailureError):
+ await self.execute_workflow(self.workflow_input)
+
+ @patch("temporalio.workflow.execute_child_workflow")
+ async def test_vnf_terminate_workflow__no_vdu_id__is_ignored(
+ self, mock_execute_child_workflow: CoroutineMock
+ ):
+ """Test that VDUs without an ID are ignored
+
+ This is not a business requirement, but a defensive measure to avoid
+ errors in case the VNFD is malformed."""
+ self.mock_get_vnf_descriptor.return_value = GetVnfDescriptor.Output(
+ vnfd={
+ "id": "test-vnfd-id",
+ "vdu": [
+ {
+ "id": "test-vdu-id",
+ },
+ {
+ "not-id": "test-not-id", # No ID in this VDU, it will be ignored
+ },
+ ],
+ }
+ )
+
+ async with self.env, self.get_worker(self.task_queue, self.activities):
+ await self.execute_workflow(self.workflow_input)
+
+ mock_execute_child_workflow.assert_called_once()
+ self.assertEquals(
+ mock_execute_child_workflow.call_args.kwargs["arg"].application_name,
+ "test-vdu-id",
+ )
+
+
if __name__ == "__main__":
asynctest.main()