Fixing workflow passthrough
[osm/common.git] / osm_common / temporal / workflows / lcm.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17
18 from abc import abstractmethod
19 from dataclasses import dataclass
20 from datetime import timedelta
21
22 from osm_common.temporal.activities.lcm import UpdateNsLcmOperationState
23 from osm_common.temporal.states import LcmOperationState
24 from osm_common.temporal.workflows.base import BaseWorkflow
25 from temporalio import workflow
26 from temporalio.common import RetryPolicy
27 from temporalio.exceptions import ActivityError, ChildWorkflowError
28
29
30 class LcmOperationWorkflow(BaseWorkflow):
31 """
32 An abstract base class representing a Lifecycle Management Operation. Any
33 workflows that need LCM OP control should extend this class and implement
34 the workflow method.
35
36 Methods
37 -------
38
39 @abstractmethod run(self, workflow_input: Input)
40 Method for subclasses to implement the actual workflow that is being
41 wrapped in this operation.
42
43 @workflow.run wrap_nslcmop(workflow_input: Input)
44 Must be implemented in every subclass exactly as follows:
45 @workflow.run
46 async def wrap_nslcmop(self, workflow_input: Input) -> None:
47 await super().wrap_nslcmop(workflow_input=workflow_input)
48 """
49
50 retry_policy = RetryPolicy(maximum_attempts=3)
51 no_retry_policy = RetryPolicy(maximum_attempts=1)
52 default_schedule_to_close_timeout = timedelta(minutes=10)
53
54 @dataclass
55 class Input:
56 """
57 Input dataclass for workflows that run as LCM operations.
58
59 Attributes:
60 -----------
61
62 nslcmop: dict
63 A dictionary representing the nslcmop record from the
64 database.
65 """
66
67 nslcmop: dict
68
69 def __init__(self):
70 super().__init__()
71 self.op_id = None
72 self.stage = ""
73
74 async def wrap_nslcmop(self, workflow_input: Input):
75 self.op_id = workflow_input.nslcmop["_id"]
76 await self.update_operation_state(LcmOperationState.PROCESSING)
77 try:
78 await self.run(workflow_input=workflow_input)
79
80 except ActivityError as e:
81 err_details = str(e.cause.with_traceback(e.__traceback__))
82 self.logger.error(err_details)
83 await self.update_operation_state(
84 LcmOperationState.FAILED,
85 error_message=str(e.cause.message),
86 detailed_status=err_details,
87 )
88 raise e
89
90 except ChildWorkflowError as e:
91 err_details = str(e.cause.with_traceback(e.cause.__traceback__))
92 self.logger.error(err_details)
93 await self.update_operation_state(
94 LcmOperationState.FAILED,
95 error_message=str(e.cause.message),
96 detailed_status=err_details,
97 )
98 raise e
99
100 except Exception as e:
101 self.logger.exception(e)
102 await self.update_operation_state(
103 LcmOperationState.FAILED,
104 error_message=str(e),
105 detailed_status=str(e),
106 )
107 raise e
108
109 await self.update_operation_state(LcmOperationState.COMPLETED)
110
111 async def update_operation_state(
112 self,
113 op_state: LcmOperationState,
114 stage: str = None,
115 error_message: str = "",
116 detailed_status: str = "",
117 ) -> None:
118 if stage is not None:
119 self.stage = stage
120 update_input = UpdateNsLcmOperationState.Input(
121 op_id=self.op_id,
122 op_state=op_state,
123 stage=self.stage,
124 error_message=error_message,
125 detailed_status=detailed_status,
126 )
127 await workflow.execute_activity(
128 activity=UpdateNsLcmOperationState.__name__,
129 arg=update_input,
130 activity_id=f"{UpdateNsLcmOperationState.__name__}-{self.op_id}",
131 schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
132 retry_policy=LcmOperationWorkflow.retry_policy,
133 )
134
135 @abstractmethod
136 async def run(self, workflow_input: Input):
137 pass
138
139
140 class NsNoOpWorkflow(LcmOperationWorkflow):
141 """
142 This is a simple No Operation workflow that simply calls a No Operation
143 activity. It can be used as a placeholder when developing workflows.
144 """
145
146 @dataclass
147 class Input:
148 """
149 Input dataclass for workflows that run as LCM operations.
150
151 Attributes:
152 -----------
153
154 nslcmop: dict
155 A dictionary representing the nslcmop record from the
156 database.
157 """
158
159 nslcmop: dict
160
161 @abstractmethod
162 async def run(self, workflow_input: Input) -> None:
163 pass