From: Daniel Arndt Date: Fri, 14 Jul 2023 12:23:01 +0000 (-0300) Subject: Implement terminate VNF workflow X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F75%2F13675%2F8;p=osm%2FLCM.git Implement terminate VNF workflow Change-Id: I8c6987c214c4f450387dadcbe143957ed3a4a5b7 Signed-off-by: Daniel Arndt Signed-off-by: Mark Beierl --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index 680ffe6..f48f381 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -74,6 +74,7 @@ from osm_lcm.temporal.vnf_activities import ( from osm_lcm.temporal.vnf_workflows import ( VnfInstantiateWorkflowImpl, VnfPrepareWorkflowImpl, + VnfTerminateWorkflowImpl, ) from temporalio.client import Client from temporalio.worker import Worker @@ -177,6 +178,7 @@ class NGLcm: VduInstantiateWorkflowImpl, VnfInstantiateWorkflowImpl, VnfPrepareWorkflowImpl, + VnfTerminateWorkflowImpl, ] activities = [ UpdateNsStateImpl(self.db), diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py index 3da7f43..e5957dd 100644 --- a/osm_lcm/temporal/vnf_workflows.py +++ b/osm_lcm/temporal/vnf_workflows.py @@ -34,8 +34,12 @@ from osm_common.temporal.activities.vnf import ( from osm_common.temporal.workflows.vnf import ( VnfInstantiateWorkflow, VnfPrepareWorkflow, + VnfTerminateWorkflow, +) +from osm_common.temporal.workflows.vdu import ( + VduInstantiateWorkflow, + VduTerminateWorkflow, ) -from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils @@ -43,7 +47,12 @@ from osm_lcm.temporal.vnf_activities import VnfSpecifications from temporalio import workflow from temporalio.common import RetryPolicy from temporalio.converter import value_to_type -from temporalio.exceptions import ActivityError, ChildWorkflowError +from temporalio.exceptions import ( + ActivityError, + ChildWorkflowError, + FailureError, + ApplicationError, +) _SANDBOXED = False retry_policy = RetryPolicy(maximum_attempts=3) @@ -347,3 +356,102 @@ class VnfPrepareWorkflowImpl(VnfPrepareWorkflow): f"{VnfPrepareWorkflow.__name__} failed with {err_details}" ) raise e + + +@workflow.defn(name=VnfTerminateWorkflow.__name__, sandboxed=False) +class VnfTerminateWorkflowImpl(VnfTerminateWorkflow): + @workflow.run + async def run(self, workflow_input: VnfTerminateWorkflow.Input) -> None: + try: + vnfr: dict = value_to_type( + GetVnfRecord.Output, + await workflow.execute_activity( + activity=GetVnfRecord.__name__, + arg=GetVnfRecord.Input(vnfr_uuid=workflow_input.vnfr_uuid), + activity_id=f"{GetVnfRecord.__name__}-{workflow_input.vnfr_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ).vnfr + if ( + vnfr.get("instantiationState") + != VnfInstantiationState.INSTANTIATED.name + ): + self.logger.warning( + f"pre-condition not met. VNF(`{workflow_input.vnfr_uuid}`) is not instantiated. Continuing anyway..." + ) + vnfd_uuid = vnfr.get("vnfd-id") + if vnfd_uuid is None: + raise ApplicationError( + "`vnfd-id` is not set in VNFR. Cannot proceed with termination.", + non_retryable=True, + ) + model_name = vnfr.get("namespace") + if model_name is None: + raise ApplicationError( + "`vim-account-id` is not set in VNFR", non_retryable=True + ) + + vnfd: dict = value_to_type( + GetVnfDescriptor.Output, + await workflow.execute_activity( + activity=GetVnfDescriptor.__name__, + arg=GetVnfDescriptor.Input(vnfd_uuid=vnfd_uuid), + activity_id=f"{GetVnfDescriptor.__name__}-{vnfd_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ).vnfd + # Discover relations + # TODO: Implement this later + + # Remove relations + # TODO: Implement this later + + # For each VDU, execute VduTerminateWorkflow + vdus = vnfd.get("vdu", []) + vim_uuid = vnfr.get("vim-account-id") + if vim_uuid is None: + raise ApplicationError( + "`vim-account-id` is not set in VNFD", non_retryable=True + ) + await asyncio.gather( + *[ + workflow.execute_child_workflow( + workflow=VduTerminateWorkflow.__name__, + arg=VduTerminateWorkflow.Input( + vim_uuid=vim_uuid, + application_name=vdu["id"], + model_name=model_name, + ), + id=f"{VduTerminateWorkflow.__name__}-{vdu['id']}", + task_queue=LCM_TASK_QUEUE, + retry_policy=retry_policy, + ) + for vdu in vdus + if "id" in vdu + ] + ) + + # Change VNF instantiation state to NOT_INSTANTIATED + await workflow.execute_activity( + activity=ChangeVnfInstantiationState.__name__, + arg=ChangeVnfInstantiationState.Input( + vnfr_uuid=workflow_input.vnfr_uuid, + state=VnfInstantiationState.NOT_INSTANTIATED, + ), + activity_id=f"{ChangeVnfInstantiationState.__name__}-{workflow_input.vnfr_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ) + except FailureError as e: + if not hasattr(e, "cause") or e.cause is None: + raise e + err_details = str(e.cause.with_traceback(e.__traceback__)) + self.logger.error( + f"{VnfTerminateWorkflow.__name__} failed with {err_details}" + ) + raise e diff --git a/osm_lcm/tests/test_vnf_workflows.py b/osm_lcm/tests/test_vnf_workflows.py index 537e692..4904429 100644 --- a/osm_lcm/tests/test_vnf_workflows.py +++ b/osm_lcm/tests/test_vnf_workflows.py @@ -14,13 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +import asynctest import uuid + +from asynctest import CoroutineMock, Mock, patch from copy import deepcopy from datetime import timedelta +from parameterized import parameterized +from temporalio import activity, workflow +from temporalio.client import WorkflowFailureError +from temporalio.exceptions import ( + ActivityError, + ChildWorkflowError, + RetryState, +) +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker +from typing import Any -import asyncio -import asynctest -from asynctest import CoroutineMock, Mock, patch from osm_common.temporal.activities.vnf import ( ChangeVnfInstantiationState, ChangeVnfState, @@ -31,20 +43,27 @@ from osm_common.temporal.activities.vnf import ( SendNotificationForVnf, SetVnfModel, ) -from osm_common.temporal.dataclasses_common import CharmInfo, VduComputeConstraints -from osm_common.temporal.states import VnfInstantiationState, VnfState -from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow -from osm_common.temporal.workflows.vnf import VnfPrepareWorkflow +from osm_common.temporal.dataclasses_common import ( + CharmInfo, + VduComputeConstraints, +) +from osm_common.temporal.states import ( + VnfInstantiationState, + VnfState, +) from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE -from temporalio import activity, workflow -from temporalio.client import WorkflowFailureError -from temporalio.exceptions import ActivityError, ChildWorkflowError, RetryState -from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Worker - +from osm_common.temporal.workflows.vdu import ( + VduInstantiateWorkflow, + VduTerminateWorkflow, +) +from osm_common.temporal.workflows.vnf import ( + VnfPrepareWorkflow, + VnfTerminateWorkflow, +) from osm_lcm.temporal.vnf_workflows import ( VnfInstantiateWorkflowImpl, VnfPrepareWorkflowImpl, + VnfTerminateWorkflowImpl, ) SANDBOXED = False @@ -169,6 +188,20 @@ class GetComputeConstraintsException(Exception): pass +class MockActivity: + def __init__(self, *args: Any, **kwargs: Any): + self.tracker = Mock(return_value=None) + + async def __call__(self, *args: Any, **kwargs: Any) -> Any: + return self.tracker(*args, **kwargs) + + def __setattr__(self, __name: str, __value: Any) -> None: + if __name in ("return_value", "side_effect"): + self.tracker.__setattr__(__name, __value) + else: + super().__setattr__(__name, __value) + + # Mock Activities @activity.defn(name=SetVnfModel.__name__) async def mock_set_vnf_model(set_vnf_model_input: SetVnfModel.Input) -> None: @@ -204,14 +237,15 @@ async def mock_get_task_queue_raise_exception( @activity.defn(name=GetVnfDescriptor.__name__) -class MockGetVnfDescriptor: - def __init__(self, vnfd=sample_vnfd) -> None: - self.vnfd = vnfd +class MockGetVnfDescriptor(MockActivity): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tracker.return_value = GetVnfDescriptor.Output(vnfd=sample_vnfd) + - async def __call__( - self, get_vnf_descriptor_input: GetVnfDescriptor.Input - ) -> GetVnfDescriptor.Output: - return GetVnfDescriptor.Output(vnfd=self.vnfd) +@activity.defn(name=ChangeVnfInstantiationState.__name__) +class MockChangeVnfInstantiationState(MockActivity): + pass @activity.defn(name=GetVnfDescriptor.__name__) @@ -229,10 +263,10 @@ async def mock_get_vnf_descriptor_raise_exception( @activity.defn(name=GetVnfRecord.__name__) -async def mock_get_vnf_record( - get_vnf_record_input: GetVnfRecord.Input, -) -> GetVnfRecord.Output: - return GetVnfRecord.Output(vnfr=sample_vnfr) +class MockGetVnfRecord(MockActivity): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + self.tracker.return_value = GetVnfRecord.Output(vnfr=sample_vnfr) @activity.defn(name=GetVnfRecord.__name__) @@ -294,6 +328,20 @@ class MockVduInstantiateWorkflowSlow: await asyncio.sleep(0.5) +@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=SANDBOXED) +class MockVduTerminateWorkflow: + @workflow.run + async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None: + pass + + +@workflow.defn(name=VduTerminateWorkflow.__name__, sandboxed=SANDBOXED) +class MockVduTerminateWorkflowRaisesException: + @workflow.run + async def run(self, workflow_input: VduTerminateWorkflow.Input) -> None: + raise TestException(f"{self.__class__.__name__} failed.") + + @workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=SANDBOXED) class MockVduInstantiateWorkflowFailed: @workflow.run @@ -481,7 +529,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] mock_get_vdu_instantiate_input.return_value = ( @@ -514,7 +562,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] mock_get_vdu_instantiate_input.return_value = ( @@ -551,7 +599,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] mock_get_vdu_instantiate_input.return_value = ( @@ -588,17 +636,20 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): MockPrepareVnfWorkflow, MockVduInstantiateWorkflowSlow, ] - vnfd = { - "vdu": [vdu] * num_child_workflows, - } + mock_get_vnf_descriptor = MockGetVnfDescriptor() + mock_get_vnf_descriptor.return_value = GetVnfDescriptor.Output( + vnfd={ + "vdu": [vdu] * num_child_workflows, + } + ) activities = [ mock_get_task_queue, self.mock_change_vnf_instantiation_state, self.mock_change_vnf_state, self.mock_send_notification_for_vnf, mock_set_vnf_model, - MockGetVnfDescriptor(vnfd), - mock_get_vnf_record, + mock_get_vnf_descriptor, + MockGetVnfRecord(), mock_get_vim_cloud, ] @@ -634,7 +685,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -654,7 +705,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -674,7 +725,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -694,7 +745,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -714,7 +765,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -740,7 +791,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -766,7 +817,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, mock_get_vnf_descriptor_raise_exception, - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -806,7 +857,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud_raise_exception, ] async with self.env, self.get_worker( @@ -828,7 +879,7 @@ class TestVnfInstantiateWorkflow(asynctest.TestCase): self.mock_send_notification_for_vnf, mock_set_vnf_model, MockGetVnfDescriptor(), - mock_get_vnf_record, + MockGetVnfRecord(), mock_get_vim_cloud, ] async with self.env, self.get_worker( @@ -1307,5 +1358,167 @@ class TestGetVduInstantiateInfo(asynctest.TestCase): ) +class TestVnfTerminateWorkflow(asynctest.TestCase): + async def setUp(self): + self.env = await WorkflowEnvironment.start_time_skipping() + self.client = self.env.client + + # Maps workflow base classes to implementation classes. + # Can be overridden by individual tests to swap out implementations by + # setting self.workflows[] = + self.workflows = { + VduTerminateWorkflow: MockVduTerminateWorkflow, + VnfTerminateWorkflow: VnfTerminateWorkflowImpl, + } + self.mock_get_vnf_record = MockGetVnfRecord() + self.mock_get_vnf_descriptor = MockGetVnfDescriptor() + self.mock_change_vnf_instantiation_state = MockChangeVnfInstantiationState() + self.activities = [ + self.mock_get_vnf_record, + self.mock_get_vnf_descriptor, + self.mock_change_vnf_instantiation_state, + ] + self.task_queue = LCM_TASK_QUEUE + self.workflow_input = VnfTerminateWorkflow.Input( + vnfr_uuid="my_vnfr_uuid", + ) + + def get_worker(self, task_queue: str, activities: list) -> Worker: + return Worker( + self.client, + task_queue=task_queue, + workflows=list(self.workflows.values()), + activities=activities, + debug_mode=DEBUG_MODE, + ) + + async def execute_workflow(self, wf_input: VnfTerminateWorkflow.Input): + return await self.client.execute_workflow( + VnfTerminateWorkflowImpl.run, + arg=wf_input, + id=uuid.uuid4().hex, + task_queue=self.task_queue, + task_timeout=TASK_TIMEOUT, + execution_timeout=EXECUTION_TIMEOUT, + ) + + @patch("temporalio.workflow.execute_child_workflow") + async def test_vnf_terminate_workflow__all_activities_called( + self, mock_execute_child_workflow: CoroutineMock + ): + """Test all the expected activities and child workflows are called""" + async with self.env, self.get_worker(self.task_queue, self.activities): + await self.execute_workflow(self.workflow_input) + + self.mock_get_vnf_record.tracker.assert_called_once() + self.mock_get_vnf_descriptor.tracker.assert_called_once() + mock_execute_child_workflow.assert_called_once() + self.mock_change_vnf_instantiation_state.tracker.assert_called_once() + + async def test_vnf_terminate_workflow__not_instantiated__logs_warning(self): + """Test warning is logged when VNF is not instantiated""" + self.mock_get_vnf_record.return_value = GetVnfRecord.Output( + vnfr={ + "instantiationState": "NOT_INSTANTIATED", + "vnfd-id": "test-vnfd-id", + "vim-account-id": vim_account_id, + "namespace": model_name, + } + ) + async with self.env, self.get_worker(self.task_queue, self.activities): + with self.assertLogs(level="INFO") as logs: + await self.execute_workflow(self.workflow_input) + + any( + "VNF(`my_vnfr_uuid`) is not instantiated" in record.message + and record.levelname == "WARNING" + for record in logs.records + ) + + @patch("temporalio.workflow.execute_child_workflow") + async def test_vnf_terminate_workflow__no_vdu__successful( + self, mock_execute_child_workflow: CoroutineMock + ): + self.mock_get_vnf_descriptor.return_value = GetVnfDescriptor.Output( + vnfd={ + "id": "test-vnfd-id", + "vdu": [], + } + ) + + async with self.env, self.get_worker(self.task_queue, self.activities): + await self.execute_workflow(self.workflow_input) + + mock_execute_child_workflow.assert_not_called() + + async def test_vnf_terminate_workflow__activity_error__is_raised(self): + """Validate that activity errors bubble up through the workflow""" + self.mock_get_vnf_record.side_effect = TestException("get_vnf_record failed") + + async with self.env, self.get_worker(self.task_queue, self.activities): + with self.assertRaises(WorkflowFailureError): + await self.execute_workflow(self.workflow_input) + + async def test_vnf_terminate_workflow__child_workflow_error__is_raised(self): + """Validates that child workflow errors bubble up to the parent workflow + + This might seem unnecessary, but depending on how the co-routines are + awaited, errors might be silently swallowed. + """ + self.workflows[VduTerminateWorkflow] = MockVduTerminateWorkflowRaisesException + async with self.env, self.get_worker(self.task_queue, self.activities): + with self.assertRaises(WorkflowFailureError): + await self.execute_workflow(self.workflow_input) + + @parameterized.expand(["vnfd-id", "vim-account-id", "namespace"]) + async def test_vnf_terminate_workflow__no_vnfd_id__raises_error( + self, vnfr_required_value + ): + """Test that a missing required value in the VNFR raises an error""" + vnfr = { + "instantiationState": "NOT_INSTANTIATED", + "vnfd-id": "test-vnfd-id", + "vim-account-id": "test-vim-account-id", + "namespace": "test-namespace", + } + del vnfr[vnfr_required_value] + self.mock_get_vnf_record.return_value = GetVnfRecord.Output(vnfr=vnfr) + + async with self.env, self.get_worker(self.task_queue, self.activities): + with self.assertRaises(WorkflowFailureError): + await self.execute_workflow(self.workflow_input) + + @patch("temporalio.workflow.execute_child_workflow") + async def test_vnf_terminate_workflow__no_vdu_id__is_ignored( + self, mock_execute_child_workflow: CoroutineMock + ): + """Test that VDUs without an ID are ignored + + This is not a business requirement, but a defensive measure to avoid + errors in case the VNFD is malformed.""" + self.mock_get_vnf_descriptor.return_value = GetVnfDescriptor.Output( + vnfd={ + "id": "test-vnfd-id", + "vdu": [ + { + "id": "test-vdu-id", + }, + { + "not-id": "test-not-id", # No ID in this VDU, it will be ignored + }, + ], + } + ) + + async with self.env, self.get_worker(self.task_queue, self.activities): + await self.execute_workflow(self.workflow_input) + + mock_execute_child_workflow.assert_called_once() + self.assertEquals( + mock_execute_child_workflow.call_args.kwargs["arg"].application_name, + "test-vdu-id", + ) + + if __name__ == "__main__": asynctest.main()