1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
18 from abc
import abstractmethod
19 from dataclasses
import dataclass
20 from datetime
import timedelta
22 from osm_common
.temporal
.activities
.lcm
import UpdateNsLcmOperationState
23 from osm_common
.temporal
.states
import LcmOperationState
24 from osm_common
.temporal
.workflows
.base
import BaseWorkflow
25 from temporalio
import workflow
26 from temporalio
.common
import RetryPolicy
27 from temporalio
.exceptions
import ActivityError
, ChildWorkflowError
30 class LcmOperationWorkflow(BaseWorkflow
):
32 An abstract base class representing a Lifecycle Management Operation. Any
33 workflows that need LCM OP control should extend this class and implement
39 @abstractmethod run(self, workflow_input: Input)
40 Method for subclasses to implement the actual workflow that is being
41 wrapped in this operation.
43 @workflow.run wrap_nslcmop(workflow_input: Input)
44 Must be implemented in every subclass exactly as follows:
46 async def wrap_nslcmop(self, workflow_input: Input) -> None:
47 await super().wrap_nslcmop(workflow_input=workflow_input)
50 retry_policy
= RetryPolicy(maximum_attempts
=3)
51 no_retry_policy
= RetryPolicy(maximum_attempts
=1)
52 default_schedule_to_close_timeout
= timedelta(minutes
=10)
57 Input dataclass for workflows that run as LCM operations.
63 A dictionary representing the nslcmop record from the
74 async def wrap_nslcmop(self
, workflow_input
: Input
):
75 self
.op_id
= workflow_input
.nslcmop
["_id"]
76 await self
.update_operation_state(LcmOperationState
.PROCESSING
)
78 await self
.run(workflow_input
=workflow_input
)
80 except ActivityError
as e
:
81 err_details
= str(e
.cause
.with_traceback(e
.__traceback
__))
82 self
.logger
.error(err_details
)
83 await self
.update_operation_state(
84 LcmOperationState
.FAILED
,
85 error_message
=str(e
.cause
.message
),
86 detailed_status
=err_details
,
90 except ChildWorkflowError
as e
:
91 err_details
= str(e
.cause
.with_traceback(e
.cause
.__traceback
__))
92 self
.logger
.error(err_details
)
93 await self
.update_operation_state(
94 LcmOperationState
.FAILED
,
95 error_message
=str(e
.cause
.message
),
96 detailed_status
=err_details
,
100 except Exception as e
:
101 self
.logger
.exception(e
)
102 await self
.update_operation_state(
103 LcmOperationState
.FAILED
,
104 error_message
=str(e
),
105 detailed_status
=str(e
),
109 await self
.update_operation_state(LcmOperationState
.COMPLETED
)
111 async def update_operation_state(
113 op_state
: LcmOperationState
,
115 error_message
: str = "",
116 detailed_status
: str = "",
118 if stage
is not None:
120 update_input
= UpdateNsLcmOperationState
.Input(
124 error_message
=error_message
,
125 detailed_status
=detailed_status
,
127 await workflow
.execute_activity(
128 activity
=UpdateNsLcmOperationState
.__name
__,
130 activity_id
=f
"{UpdateNsLcmOperationState.__name__}-{self.op_id}",
131 schedule_to_close_timeout
=LcmOperationWorkflow
.default_schedule_to_close_timeout
,
132 retry_policy
=LcmOperationWorkflow
.retry_policy
,
136 async def run(self
, workflow_input
: Input
):
140 class NsNoOpWorkflow(LcmOperationWorkflow
):
142 This is a simple No Operation workflow that simply calls a No Operation
143 activity. It can be used as a placeholder when developing workflows.
149 Input dataclass for workflows that run as LCM operations.
155 A dictionary representing the nslcmop record from the
162 async def run(self
, workflow_input
: Input
) -> None: