VnfPrepareWorkflowImpl,
]
activities = [
- UpdateNsStateImpl(self.db).__call__,
- GetVnfDetailsImpl(self.db).__call__,
- GetNsRecordImpl(self.db).__call__,
- UpdateNsLcmOperationStateImpl(self.db).__call__,
- NsLcmNoOpImpl().__call__,
- CreateModelImpl(self.db, paas_connector_instance).__call__,
- DeployCharmImpl(paas_connector_instance).__call__,
- CheckCharmStatusImpl(paas_connector_instance).__call__,
- TestVimConnectivityImpl(paas_connector_instance).__call__,
- UpdateVimOperationStateImpl(self.db).__call__,
- UpdateVimStateImpl(self.db).__call__,
- DeleteVimRecordImpl(self.db).__call__,
- ChangeVnfStateImpl(self.db).__call__,
- ChangeVnfInstantiationStateImpl(self.db).__call__,
- GetTaskQueueImpl(self.db).__call__,
- GetVimCloudImpl(self.db).__call__,
- GetVnfDescriptorImpl(self.db).__call__,
- GetVnfRecordImpl(self.db).__call__,
- SendNotificationForVnfImpl().__call__,
- SetVnfModelImpl(self.db).__call__,
+ UpdateNsStateImpl(self.db),
+ GetVnfDetailsImpl(self.db),
+ GetNsRecordImpl(self.db),
+ UpdateNsLcmOperationStateImpl(self.db),
+ NsLcmNoOpImpl(),
+ CreateModelImpl(self.db, paas_connector_instance),
+ DeployCharmImpl(paas_connector_instance),
+ CheckCharmStatusImpl(paas_connector_instance),
+ TestVimConnectivityImpl(paas_connector_instance),
+ UpdateVimOperationStateImpl(self.db),
+ UpdateVimStateImpl(self.db),
+ DeleteVimRecordImpl(self.db),
+ ChangeVnfStateImpl(self.db),
+ ChangeVnfInstantiationStateImpl(self.db),
+ GetTaskQueueImpl(self.db),
+ GetVimCloudImpl(self.db),
+ GetVnfDescriptorImpl(self.db),
+ GetVnfRecordImpl(self.db),
+ SendNotificationForVnfImpl(),
+ SetVnfModelImpl(self.db),
]
# Check if we are running under a debugger
return True
+@activity.defn(name=TestVimConnectivity.__name__)
class TestVimConnectivityImpl(TestVimConnectivity):
- @activity.defn(name=TestVimConnectivity.__name__)
async def __call__(self, activity_input: TestVimConnectivity.Input) -> None:
vim_id = activity_input.vim_uuid
await self.juju_controller._get_controller(vim_id)
self.logger.info(message)
+@activity.defn(name=CreateModel.__name__)
class CreateModelImpl(CreateModel):
- @activity.defn(name=CreateModel.__name__)
async def __call__(self, activity_input: CreateModel.Input) -> None:
controller = await self.juju_controller._get_controller(activity_input.vim_uuid)
if activity_input.model_name in await controller.list_models():
self.logger.debug(f"Model {activity_input.model_name} created")
+@activity.defn(name=DeployCharm.__name__)
class DeployCharmImpl(DeployCharm):
- @activity.defn(name=DeployCharm.__name__)
async def __call__(self, activity_input: DeployCharm.Input) -> None:
model_name = activity_input.model_name
charm_info = activity_input.charm_info
)
+@activity.defn(name=CheckCharmStatus.__name__)
class CheckCharmStatusImpl(CheckCharmStatus):
- @activity.defn(name=CheckCharmStatus.__name__)
async def __call__(self, activity_input: CheckCharmStatus.Input) -> None:
controller = await self.juju_controller._get_controller(activity_input.vim_uuid)
model = await controller.get_model(activity_input.model_name)
)
+@activity.defn(name=NsLcmNoOp.__name__)
class NsLcmNoOpImpl(NsLcmNoOp):
- @activity.defn(name=NsLcmNoOp.__name__)
async def __call__(self, activity_input: NsLcmNoOp.Input) -> None:
self.logger.debug(f"Called with: {activity_input.nslcmop}")
+@activity.defn(name=UpdateNsLcmOperationState.__name__)
class UpdateNsLcmOperationStateImpl(UpdateNsLcmOperationState):
- @activity.defn(name=UpdateNsLcmOperationState.__name__)
async def __call__(self, activity_input: UpdateNsLcmOperationState.Input):
now = time.time()
)
+@activity.defn(name=GetVnfDetails.__name__)
class GetVnfDetailsImpl(GetVnfDetails):
- @activity.defn(name=GetVnfDetails.__name__)
async def __call__(
self, activity_input: GetVnfDetails.Input
) -> GetVnfDetails.Output:
)
+@activity.defn(name=GetNsRecord.__name__)
class GetNsRecordImpl(GetNsRecord):
- @activity.defn(name=GetNsRecord.__name__)
async def __call__(self, activity_input: GetNsRecord.Input) -> GetNsRecord.Output:
nsr = self.db.get_one("nsrs", {"_id": activity_input.nsr_uuid})
self.logger.debug("Got the nsr from Database for VNF operations.")
return GetNsRecord.Output(nsr=nsr)
+@activity.defn(name=UpdateNsState.__name__)
class UpdateNsStateImpl(UpdateNsState):
- @activity.defn(name=UpdateNsState.__name__)
async def __call__(self, activity_input: UpdateNsState.Input) -> None:
update_ns_state = {
"nsState": activity_input.state.name,
await super().wrap_nslcmop(workflow_input=workflow_input)
async def run(self, workflow_input: NsInstantiateWorkflow.Input) -> None:
- self.logger.info(f"Executing {NsInstantiateWorkflow.__name__} with {input}")
+ self.logger.info(
+ f"Executing {NsInstantiateWorkflow.__name__} with {workflow_input}"
+ )
# TODO: Can we clean up the input? Perhaps this workflow could receive NsInstantiateInput directly.
ns_uuid = workflow_input.nslcmop["nsInstanceId"]
)
+@activity.defn(name=UpdateVimState.__name__)
class UpdateVimStateImpl(UpdateVimState):
- @activity.defn(name=UpdateVimState.__name__)
async def __call__(self, activity_input: UpdateVimState.Input) -> None:
update_vim_state = {
"_admin.operationalState": activity_input.operational_state.name,
)
+@activity.defn(name=UpdateVimOperationState.__name__)
class UpdateVimOperationStateImpl(UpdateVimOperationState):
- @activity.defn(name=UpdateVimOperationState.__name__)
async def __call__(self, activity_input: UpdateVimOperationState.Input) -> None:
update_operation_state = {
f"_admin.operations.{format(activity_input.op_id)}.operationState": activity_input.op_state.name,
)
+@activity.defn(name=DeleteVimRecord.__name__)
class DeleteVimRecordImpl(DeleteVimRecord):
- @activity.defn(name=DeleteVimRecord.__name__)
async def __call__(self, activity_input: DeleteVimRecord.Input) -> None:
self.db.del_one("vim_accounts", {"_id": activity_input.vim_uuid})
self.logger.debug(f"Removed VIM {activity_input.vim_uuid}")
CONFIG_IDENTIFIER = "config::"
+@activity.defn(name=GetTaskQueue.__name__)
class GetTaskQueueImpl(GetTaskQueue):
- @activity.defn(name=GetTaskQueue.__name__)
async def __call__(self, activity_input: GetTaskQueue.Input) -> GetTaskQueue.Output:
vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
return GetTaskQueue.Output(task_queue)
+@activity.defn(name=GetVimCloud.__name__)
class GetVimCloudImpl(GetVimCloud):
- @activity.defn(name=GetVimCloud.__name__)
async def __call__(self, activity_input: GetVimCloud.Input) -> GetVimCloud.Output:
vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
return GetVimCloud.Output(cloud=cloud)
+@activity.defn(name=GetVnfRecord.__name__)
class GetVnfRecordImpl(GetVnfRecord):
- @activity.defn(name=GetVnfRecord.__name__)
async def __call__(self, activity_input: GetVnfRecord.Input) -> GetVnfRecord.Output:
vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
self.logger.debug("Got the vnfr from Database for VNF operations.")
return GetVnfRecord.Output(vnfr=vnfr)
+@activity.defn(name=GetVnfDescriptor.__name__)
class GetVnfDescriptorImpl(GetVnfDescriptor):
- @activity.defn(name=GetVnfDescriptor.__name__)
async def __call__(
self, activity_input: GetVnfDescriptor.Input
) -> GetVnfDescriptor.Output:
}
+@activity.defn(name=ChangeVnfState.__name__)
class ChangeVnfStateImpl(ChangeVnfState):
- @activity.defn(name=ChangeVnfState.__name__)
async def __call__(self, activity_input: ChangeVnfState.Input) -> None:
update_vnf_state = {"vnfState": activity_input.state.name}
self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_vnf_state)
)
+@activity.defn(name=ChangeVnfInstantiationState.__name__)
class ChangeVnfInstantiationStateImpl(ChangeVnfInstantiationState):
- @activity.defn(name=ChangeVnfInstantiationState.__name__)
async def __call__(self, activity_input: ChangeVnfInstantiationState.Input) -> None:
update_vnf_instantiation_state = {
"instantiationState": activity_input.state.name
)
+@activity.defn(name=SetVnfModel.__name__)
class SetVnfModelImpl(SetVnfModel):
- @activity.defn(name=SetVnfModel.__name__)
async def __call__(self, activity_input: SetVnfModel.Input) -> None:
update_namespace = {"namespace": activity_input.model_name}
self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_namespace)
)
+@activity.defn(name=SendNotificationForVnf.__name__)
class SendNotificationForVnfImpl(SendNotificationForVnf):
- @activity.defn(name=SendNotificationForVnf.__name__)
async def __call__(self, activity_input: SendNotificationForVnf.Input) -> None:
self.logger.debug("Send notification for VNF not implemented.")
self.mock_update_vim_state_raises,
self.mock_update_vim_operation_state,
]
- retry_policy = 3
+ retry_policy = 1 # TODO: Figure out what this should be anchored to
expected_vim_state = [VimState.ENABLED] * retry_policy
expected_vim_op_state = [VimOperationState.COMPLETED]
async with self.env, self.get_worker(activities):
self.mock_update_vim_operation_state_raises,
]
expected_vim_state = [VimState.ENABLED]
- retry_policy = 3
+ retry_policy = 1 # TODO: Figure out what this should be anchored to
expected_vim_op_state = [VimOperationState.COMPLETED] * retry_policy
async with self.env, self.get_worker(activities):
with self.assertRaises(WorkflowFailureError):