OSMENG-1155: Implementation of Constants and Dataclasses 76/13576/4
authorDario Faccin <dario.faccin@canonical.com>
Fri, 23 Jun 2023 13:39:31 +0000 (15:39 +0200)
committerDario Faccin <dario.faccin@canonical.com>
Tue, 27 Jun 2023 12:42:52 +0000 (14:42 +0200)
Add implementation for workflows and activities

Change-Id: I6c29d8e8939f0eb19e9fc0c0ea6b98bbd618ad7c
Signed-off-by: Dario Faccin <dario.faccin@canonical.com>
osm_nbi/temporal/nbi_temporal.py
osm_nbi/tests/test_nbi_temporal.py

index a9051c1..9ef43dc 100644 (file)
 # limitations under the License.
 
 import asyncio
-from osm_common.dataclasses.temporal_dataclasses import (
-    NsLcmOperationInput,
-    VimOperationInput,
-)
-from osm_common.temporal_constants import (
-    LCM_TASK_QUEUE,
-    WORKFLOW_VIM_CREATE,
-    WORKFLOW_VIM_DELETE,
-    WORKFLOW_VIM_UPDATE,
-    WORKFLOW_NSLCM_NO_OP,
-    WORKFLOW_NS_INSTANTIATE,
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+from osm_common.temporal.workflows.vim import (
+    VimCreateWorkflow,
+    VimDeleteWorkflow,
+    VimUpdateWorkflow,
 )
+from osm_common.temporal.workflows.lcm import NsNoOpWorkflow
+from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
 from osm_common.wftemporal import WFTemporal
 
 
 class NbiTemporal:
     workflow_mappings = {
-        "created": WORKFLOW_VIM_CREATE,
-        "edited": WORKFLOW_VIM_UPDATE,
-        "delete": WORKFLOW_VIM_DELETE,
+        "created": VimCreateWorkflow,
+        "edited": VimUpdateWorkflow,
+        "delete": VimDeleteWorkflow,
     }
 
     def start_vim_workflow(self, action: str, content: dict) -> None:
         vim_uuid = content["_id"]
         # Derive the operation id (although for a create it will always be 0)
         op_id = content["op_id"].split(":")[1]
-        workflow = NbiTemporal.workflow_mappings[action]
-        workflow_data = VimOperationInput(vim_uuid, op_id)
+        workflow_object = NbiTemporal.workflow_mappings[action]
+        workflow_data = workflow_object.Input(vim_uuid, op_id)
         asyncio.run(
             WFTemporal(logger_name="nbi.vim_workflow").start_workflow(
                 task_queue=LCM_TASK_QUEUE,
-                workflow_name=workflow,
+                workflow_name=workflow_object.__name__,
                 workflow_data=workflow_data,
                 id=vim_uuid,
             )
         )
 
     lcm_workflow_mappings = {
-        "instantiate": WORKFLOW_NS_INSTANTIATE,
-        "terminate": WORKFLOW_NSLCM_NO_OP,
-        "vca_status_refresh": WORKFLOW_NSLCM_NO_OP,
-        "action": WORKFLOW_NSLCM_NO_OP,
-        "update": WORKFLOW_NSLCM_NO_OP,
-        "scale": WORKFLOW_NSLCM_NO_OP,
-        "heal": WORKFLOW_NSLCM_NO_OP,
-        "migrate": WORKFLOW_NSLCM_NO_OP,
-        "verticalscale": WORKFLOW_NSLCM_NO_OP,
-        "deleted": WORKFLOW_NSLCM_NO_OP,
-        "vnf_terminated": WORKFLOW_NSLCM_NO_OP,
-        "policy_updated": WORKFLOW_NSLCM_NO_OP,
-        "terminated": WORKFLOW_NSLCM_NO_OP,
-        "instantiated": WORKFLOW_NSLCM_NO_OP,
-        "scaled": WORKFLOW_NSLCM_NO_OP,
-        "healed": WORKFLOW_NSLCM_NO_OP,
-        "actioned": WORKFLOW_NSLCM_NO_OP,
-        "updated": WORKFLOW_NSLCM_NO_OP,
-        "migrated": WORKFLOW_NSLCM_NO_OP,
-        "verticalscaled": WORKFLOW_NSLCM_NO_OP,
+        "instantiate": NsInstantiateWorkflow,
+        "terminate": NsNoOpWorkflow,
+        "vca_status_refresh": NsNoOpWorkflow,
+        "action": NsNoOpWorkflow,
+        "update": NsNoOpWorkflow,
+        "scale": NsNoOpWorkflow,
+        "heal": NsNoOpWorkflow,
+        "migrate": NsNoOpWorkflow,
+        "verticalscale": NsNoOpWorkflow,
+        "deleted": NsNoOpWorkflow,
+        "vnf_terminated": NsNoOpWorkflow,
+        "policy_updated": NsNoOpWorkflow,
+        "terminated": NsNoOpWorkflow,
+        "instantiated": NsNoOpWorkflow,
+        "scaled": NsNoOpWorkflow,
+        "healed": NsNoOpWorkflow,
+        "actioned": NsNoOpWorkflow,
+        "updated": NsNoOpWorkflow,
+        "migrated": NsNoOpWorkflow,
+        "verticalscaled": NsNoOpWorkflow,
     }
 
     def start_ns_workflow(self, nslcmop: dict) -> None:
+        workflow_object = NbiTemporal.lcm_workflow_mappings[nslcmop["lcmOperationType"]]
         asyncio.run(
             WFTemporal(logger_name="nbi.lcm_workflow").start_workflow(
                 task_queue=LCM_TASK_QUEUE,
-                workflow_name=NbiTemporal.lcm_workflow_mappings[
-                    nslcmop["lcmOperationType"]
-                ],
-                workflow_data=NsLcmOperationInput(nslcmop=nslcmop),
+                workflow_name=workflow_object.__name__,
+                workflow_data=workflow_object.Input(nslcmop=nslcmop),
                 id=nslcmop["nsInstanceId"],
             )
         )
index 8fe6775..ba75e9d 100644 (file)
 
 
 import asynctest
-from osm_common.dataclasses.temporal_dataclasses import (
-    NsLcmOperationInput,
-    VimOperationInput,
-)
-from osm_common.temporal_constants import (
-    LCM_TASK_QUEUE,
-    WORKFLOW_VIM_CREATE,
-    WORKFLOW_NS_INSTANTIATE,
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+from osm_common.temporal.workflows.vim import (
+    VimCreateWorkflow,
 )
+from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
 from osm_nbi.temporal.nbi_temporal import NbiTemporal
 
 
@@ -84,8 +80,8 @@ class TestNbiTemporal(asynctest.TestCase):
         self.temporal.start_vim_workflow(action, self.vim_operation_content)
         mock_start_workflow.assert_called_once_with(
             task_queue=LCM_TASK_QUEUE,
-            workflow_name=WORKFLOW_VIM_CREATE,
-            workflow_data=VimOperationInput(self.vim_id, self.op_id),
+            workflow_name=VimCreateWorkflow.__name__,
+            workflow_data=VimCreateWorkflow.Input(self.vim_id, self.op_id),
             id=self.vim_id,
         )
 
@@ -99,8 +95,8 @@ class TestNbiTemporal(asynctest.TestCase):
         self.temporal.start_ns_workflow(self.nslcmop)
         mock_start_workflow.assert_called_once_with(
             task_queue=LCM_TASK_QUEUE,
-            workflow_name=WORKFLOW_NS_INSTANTIATE,
-            workflow_data=NsLcmOperationInput(nslcmop=self.nslcmop),
+            workflow_name=NsInstantiateWorkflow.__name__,
+            workflow_data=NsInstantiateWorkflow.Input(nslcmop=self.nslcmop),
             id=self.nslcmop["nsInstanceId"],
         )