# limitations under the License.
import asyncio
-from osm_common.dataclasses.temporal_dataclasses import (
- NsLcmOperationInput,
- VimOperationInput,
-)
-from osm_common.temporal_constants import (
- LCM_TASK_QUEUE,
- WORKFLOW_VIM_CREATE,
- WORKFLOW_VIM_DELETE,
- WORKFLOW_VIM_UPDATE,
- WORKFLOW_NSLCM_NO_OP,
- WORKFLOW_NS_INSTANTIATE,
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+from osm_common.temporal.workflows.vim import (
+ VimCreateWorkflow,
+ VimDeleteWorkflow,
+ VimUpdateWorkflow,
)
+from osm_common.temporal.workflows.lcm import NsNoOpWorkflow
+from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
from osm_common.wftemporal import WFTemporal
class NbiTemporal:
workflow_mappings = {
- "created": WORKFLOW_VIM_CREATE,
- "edited": WORKFLOW_VIM_UPDATE,
- "delete": WORKFLOW_VIM_DELETE,
+ "created": VimCreateWorkflow,
+ "edited": VimUpdateWorkflow,
+ "delete": VimDeleteWorkflow,
}
def start_vim_workflow(self, action: str, content: dict) -> None:
vim_uuid = content["_id"]
# Derive the operation id (although for a create it will always be 0)
op_id = content["op_id"].split(":")[1]
- workflow = NbiTemporal.workflow_mappings[action]
- workflow_data = VimOperationInput(vim_uuid, op_id)
+ workflow_object = NbiTemporal.workflow_mappings[action]
+ workflow_data = workflow_object.Input(vim_uuid, op_id)
asyncio.run(
WFTemporal(logger_name="nbi.vim_workflow").start_workflow(
task_queue=LCM_TASK_QUEUE,
- workflow_name=workflow,
+ workflow_name=workflow_object.__name__,
workflow_data=workflow_data,
id=vim_uuid,
)
)
lcm_workflow_mappings = {
- "instantiate": WORKFLOW_NS_INSTANTIATE,
- "terminate": WORKFLOW_NSLCM_NO_OP,
- "vca_status_refresh": WORKFLOW_NSLCM_NO_OP,
- "action": WORKFLOW_NSLCM_NO_OP,
- "update": WORKFLOW_NSLCM_NO_OP,
- "scale": WORKFLOW_NSLCM_NO_OP,
- "heal": WORKFLOW_NSLCM_NO_OP,
- "migrate": WORKFLOW_NSLCM_NO_OP,
- "verticalscale": WORKFLOW_NSLCM_NO_OP,
- "deleted": WORKFLOW_NSLCM_NO_OP,
- "vnf_terminated": WORKFLOW_NSLCM_NO_OP,
- "policy_updated": WORKFLOW_NSLCM_NO_OP,
- "terminated": WORKFLOW_NSLCM_NO_OP,
- "instantiated": WORKFLOW_NSLCM_NO_OP,
- "scaled": WORKFLOW_NSLCM_NO_OP,
- "healed": WORKFLOW_NSLCM_NO_OP,
- "actioned": WORKFLOW_NSLCM_NO_OP,
- "updated": WORKFLOW_NSLCM_NO_OP,
- "migrated": WORKFLOW_NSLCM_NO_OP,
- "verticalscaled": WORKFLOW_NSLCM_NO_OP,
+ "instantiate": NsInstantiateWorkflow,
+ "terminate": NsNoOpWorkflow,
+ "vca_status_refresh": NsNoOpWorkflow,
+ "action": NsNoOpWorkflow,
+ "update": NsNoOpWorkflow,
+ "scale": NsNoOpWorkflow,
+ "heal": NsNoOpWorkflow,
+ "migrate": NsNoOpWorkflow,
+ "verticalscale": NsNoOpWorkflow,
+ "deleted": NsNoOpWorkflow,
+ "vnf_terminated": NsNoOpWorkflow,
+ "policy_updated": NsNoOpWorkflow,
+ "terminated": NsNoOpWorkflow,
+ "instantiated": NsNoOpWorkflow,
+ "scaled": NsNoOpWorkflow,
+ "healed": NsNoOpWorkflow,
+ "actioned": NsNoOpWorkflow,
+ "updated": NsNoOpWorkflow,
+ "migrated": NsNoOpWorkflow,
+ "verticalscaled": NsNoOpWorkflow,
}
def start_ns_workflow(self, nslcmop: dict) -> None:
+ workflow_object = NbiTemporal.lcm_workflow_mappings[nslcmop["lcmOperationType"]]
asyncio.run(
WFTemporal(logger_name="nbi.lcm_workflow").start_workflow(
task_queue=LCM_TASK_QUEUE,
- workflow_name=NbiTemporal.lcm_workflow_mappings[
- nslcmop["lcmOperationType"]
- ],
- workflow_data=NsLcmOperationInput(nslcmop=nslcmop),
+ workflow_name=workflow_object.__name__,
+ workflow_data=workflow_object.Input(nslcmop=nslcmop),
id=nslcmop["nsInstanceId"],
)
)
import asynctest
-from osm_common.dataclasses.temporal_dataclasses import (
- NsLcmOperationInput,
- VimOperationInput,
-)
-from osm_common.temporal_constants import (
- LCM_TASK_QUEUE,
- WORKFLOW_VIM_CREATE,
- WORKFLOW_NS_INSTANTIATE,
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+from osm_common.temporal.workflows.vim import (
+ VimCreateWorkflow,
)
+from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
from osm_nbi.temporal.nbi_temporal import NbiTemporal
self.temporal.start_vim_workflow(action, self.vim_operation_content)
mock_start_workflow.assert_called_once_with(
task_queue=LCM_TASK_QUEUE,
- workflow_name=WORKFLOW_VIM_CREATE,
- workflow_data=VimOperationInput(self.vim_id, self.op_id),
+ workflow_name=VimCreateWorkflow.__name__,
+ workflow_data=VimCreateWorkflow.Input(self.vim_id, self.op_id),
id=self.vim_id,
)
self.temporal.start_ns_workflow(self.nslcmop)
mock_start_workflow.assert_called_once_with(
task_queue=LCM_TASK_QUEUE,
- workflow_name=WORKFLOW_NS_INSTANTIATE,
- workflow_data=NsLcmOperationInput(nslcmop=self.nslcmop),
+ workflow_name=NsInstantiateWorkflow.__name__,
+ workflow_data=NsInstantiateWorkflow.Input(nslcmop=self.nslcmop),
id=self.nslcmop["nsInstanceId"],
)