@workflow.run
async def run(self, input: VnfInstantiateInput) -> None:
self.logger.info(f"Deploying VNF {input.vnfr_uuid}")
- vnf_instantiation_state = ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED
- )
- vnf_state = ChangeVnfStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED
- )
try:
- await self.update_states(
- vnf_instantiation_state=vnf_instantiation_state,
- vnf_state=vnf_state,
+ await self.update_vnf_instantiation_state(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid,
+ state=VnfInstantiationState.NOT_INSTANTIATED,
+ ),
+ )
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid,
+ state=VnfInstantiationState.NOT_INSTANTIATED,
+ ),
)
vnf_task_queue = value_to_type(
GetTaskQueueOutput,
task_queue=vnf_task_queue.task_queue,
cloud=get_cloud.cloud,
)
- await self.update_states(
- vnf_instantiation_state=ChangeVnfInstantiationStateInput(
+ await self.update_vnf_instantiation_state(
+ ChangeVnfInstantiationStateInput(
vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
),
- vnf_state=ChangeVnfStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfState.STARTED
+ )
+ await self.update_vnf_state(
+ ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STARTED),
+ )
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
),
)
except ActivityError as e:
err_details = str(e.cause.with_traceback(e.__traceback__))
- await self.update_states(
- vnf_instantiation_state=vnf_instantiation_state,
- vnf_state=vnf_state,
+ await self.update_vnf_instantiation_state(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
+ )
+ await self.update_vnf_state(
+ ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED),
+ )
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
)
self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
raise e
except ChildWorkflowError as e:
err_details = str(e.cause.cause.with_traceback(e.cause.__traceback__))
- await self.update_states(
- vnf_instantiation_state=vnf_instantiation_state,
- vnf_state=vnf_state,
+ await self.update_vnf_instantiation_state(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
+ )
+ await self.update_vnf_state(
+ ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED),
+ )
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
)
self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
raise e
except Exception as e:
err_details = str(traceback.format_exc())
- await self.update_states(
- vnf_instantiation_state=vnf_instantiation_state,
- vnf_state=vnf_state,
+ await self.update_vnf_instantiation_state(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
+ )
+ await self.update_vnf_state(
+ ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED),
+ )
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
)
self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
raise e
)
return vdu_instantiate_input, vdu_instantiate_workflow_id
- async def update_states(
- self,
- vnf_instantiation_state: ChangeVnfInstantiationStateInput,
- vnf_state: ChangeVnfStateInput,
- ):
- await self.update_vnf_instantiation_state(vnf_instantiation_state)
- await self.update_vnf_state(vnf_state)
- await self.send_notification_for_vnf(vnf_instantiation_state)
-
@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED)
class VnfPrepareWorkflow:
def check_state_change_call_counts(self):
self.assertEqual(self.mock_change_vnf_instantiation_state_tracker.call_count, 2)
- self.assertEqual(self.mock_change_vnf_state_tracker.call_count, 2)
+ self.assertEqual(self.mock_change_vnf_state_tracker.call_count, 1)
self.assertEqual(self.mock_send_notification_for_vnf_tracker.call_count, 2)
def workflow_is_succeeded(self):
),
)
self.assertEqual(
- call_mock_change_vnf_state_tracker[1].args[0],
+ call_mock_change_vnf_state_tracker[0].args[0],
ChangeVnfStateInput(vnfr_uuid=vnfr_uuid, state=VnfState.STARTED),
)
self.assertEqual(
call_mock_change_vnf_instantiation_state_tracker[1].args[0],
ChangeVnfInstantiationStateInput(
- vnfr_uuid=vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED
+ vnfr_uuid=vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
),
)
self.assertEqual(
- call_mock_change_vnf_state_tracker[1].args[0],
+ call_mock_change_vnf_state_tracker[0].args[0],
ChangeVnfStateInput(vnfr_uuid=vnfr_uuid, state=VnfState.STOPPED),
)
self.assertEqual(str(error.exception), "Workflow execution failed")
):
with self.assertRaises(WorkflowFailureError) as err:
await self.execute_workflow()
- self.assertEqual(self.mock_change_vnf_instantiation_state_tracker.call_count, 2)
+ self.assertEqual(self.mock_change_vnf_instantiation_state_tracker.call_count, 3)
self.assertEqual(self.mock_change_vnf_state_tracker.call_count, 6)
- self.assertEqual(self.mock_send_notification_for_vnf_tracker.call_count, 0)
- self.workflow_is_failed(err)
- mock_instantiate_vdus.assert_not_called()
+ self.assertEqual(self.mock_send_notification_for_vnf_tracker.call_count, 1)
+ # using workflow_is_succeeded method since vnfInstantationState is being changed to INSTANTIATED and
+ # vnfState to STARTED (not STOPPED)
+ self.workflow_is_succeeded()
self.assertEqual(
str(err.exception.cause.cause.message),
"change-vnf-state failed.",