OSMENG-1155: Implementation of Constants and Dataclasses
Add implementation for workflows and activities
Change-Id: I58226765c41d18821724ac5763a3fe390c371ca6
Signed-off-by: Dario Faccin <dario.faccin@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
diff --git a/osm_lcm/temporal/lcm_activities.py b/osm_lcm/temporal/lcm_activities.py
index 6ddb0f3..3400e94 100644
--- a/osm_lcm/temporal/lcm_activities.py
+++ b/osm_lcm/temporal/lcm_activities.py
@@ -13,83 +13,45 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+
+from temporalio import activity
import time
-from osm_common.dataclasses.temporal_dataclasses import (
- NsLcmOperationInput,
- UpdateLcmOperationStateInput,
+from osm_common.temporal.activities.lcm import (
+ NsLcmNoOp,
+ UpdateNsLcmOperationState,
)
-from osm_common.temporal_constants import (
- ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- ACTIVITY_NSLCM_NO_OP,
-)
-from osm_lcm.data_utils.database.database import Database
-from temporalio import activity
-class NsLcmActivity:
- """
- Handles NS Lifecycle Managment operations.
- Args:
- db (Database): Data Access Object
- """
+class NsLcmNoOpImpl(NsLcmNoOp):
+ @activity.defn(name=NsLcmNoOp.__name__)
+ async def __call__(self, activity_input: NsLcmNoOp.Input) -> None:
+ self.logger.debug(f"Called with: {activity_input.nslcmop}")
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- @activity.defn(name=ACTIVITY_NSLCM_NO_OP)
- async def no_op(self, input: NsLcmOperationInput) -> None:
- """
- This is a simple No Operation Activity that simply logs the data
- with which it was called. It can be used as a placeholder when
- developing workflows, or can be enhanced with logic to throw
- exceptions on specific conditions to test exception handling in
- a workflow.
- """
- self.logger.debug(f"Called with: {input.nslcmop}")
-
- @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
- async def update_ns_lcm_operation_state(
- self, data: UpdateLcmOperationStateInput
- ) -> None:
- """
- Changes the state of a LCM operation task. Should be done to
- indicate progress, or completion of the task itself.
-
- Collaborators:
- DB Write: nslcmops
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
+class UpdateNsLcmOperationStateImpl(UpdateNsLcmOperationState):
+ @activity.defn(name=UpdateNsLcmOperationState.__name__)
+ async def __call__(self, activity_input: UpdateNsLcmOperationState.Input):
now = time.time()
update_lcm_operation = {
"_admin.modified": now,
}
- if data.op_state is not None:
- update_lcm_operation["operationState"] = data.op_state.name
+ if activity_input.op_state is not None:
+ update_lcm_operation["operationState"] = activity_input.op_state.name
update_lcm_operation["statusEnteredTime"] = now
- if data.stage is not None:
- update_lcm_operation["stage"] = data.stage
+ if activity_input.stage is not None:
+ update_lcm_operation["stage"] = activity_input.stage
- if data.error_message is not None:
- update_lcm_operation["errorMessage"] = data.error_message
+ if activity_input.error_message is not None:
+ update_lcm_operation["errorMessage"] = activity_input.error_message
- if data.detailed_status is not None:
- update_lcm_operation["detailedStatus"] = data.detailed_status
+ if activity_input.detailed_status is not None:
+ update_lcm_operation["detailedStatus"] = activity_input.detailed_status
- self.db.set_one("nslcmops", {"_id": data.op_id}, update_lcm_operation)
+ self.db.set_one("nslcmops", {"_id": activity_input.op_id}, update_lcm_operation)
self.logger.debug(
- f"Updated LCM Operation {data.op_id} to {update_lcm_operation}"
+ f"Updated LCM Operation {activity_input.op_id} to {update_lcm_operation}"
)