OSMENG-1155: Implementation of Constants and Dataclasses
Add implementation for workflows and activities
Change-Id: I58226765c41d18821724ac5763a3fe390c371ca6
Signed-off-by: Dario Faccin <dario.faccin@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
diff --git a/osm_lcm/temporal/ns_activities.py b/osm_lcm/temporal/ns_activities.py
index 8c7c182..f25f78a 100644
--- a/osm_lcm/temporal/ns_activities.py
+++ b/osm_lcm/temporal/ns_activities.py
@@ -14,117 +14,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+from temporalio import activity
from time import time
-from osm_common.dataclasses.temporal_dataclasses import (
- GetNsRecordInput,
- GetNsRecordOutput,
- GetVnfDetailsInput,
- GetVnfDetailsOutput,
- UpdateNsStateInput,
+from osm_common.temporal.activities.ns import (
+ GetNsRecord,
+ GetVnfDetails,
+ UpdateNsState,
)
-from osm_common.temporal_constants import (
- ACTIVITY_GET_NS_RECORD,
- ACTIVITY_GET_VNF_DETAILS,
- ACTIVITY_UPDATE_NS_STATE,
-)
-from osm_lcm.data_utils.database.database import Database
-from temporalio import activity
-class NsOperations:
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
- async def get_vnf_details(
- self, get_vnf_details_input: GetVnfDetailsInput
- ) -> GetVnfDetailsOutput:
- """
- Gets the list of VNF record IDs, VNF member-index-refs for a given NS record ID.
-
- Collaborators:
- DB Read: vnfrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its short-running nature.
-
- Since this activity only reads from the DB, it is safe to retry, although
- you may wish to have some back-off policy.
- """
- vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": get_vnf_details_input.ns_uuid})
- return GetVnfDetailsOutput(
+class GetVnfDetailsImpl(GetVnfDetails):
+ @activity.defn(name=GetVnfDetails.__name__)
+ async def __call__(
+ self, activity_input: GetVnfDetails.Input
+ ) -> GetVnfDetails.Output:
+ vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": activity_input.ns_uuid})
+ return GetVnfDetails.Output(
vnf_details=[(vnfr["id"], vnfr["member-vnf-index-ref"]) for vnfr in vnfrs]
)
- @activity.defn(name=ACTIVITY_GET_NS_RECORD)
- async def get_ns_record(
- self, get_ns_record_input: GetNsRecordInput
- ) -> GetNsRecordOutput:
- """Gets the NS record from Database.
- Collaborators:
- DB Read: nsrs
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than 10
- second).
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- This is an idempotent activity.
-
- """
- nsr = self.db.get_one("nsrs", {"_id": get_ns_record_input.nsr_uuid})
+class GetNsRecordImpl(GetNsRecord):
+ @activity.defn(name=GetNsRecord.__name__)
+ async def __call__(self, activity_input: GetNsRecord.Input) -> GetNsRecord.Output:
+ nsr = self.db.get_one("nsrs", {"_id": activity_input.nsr_uuid})
self.logger.debug("Got the nsr from Database for VNF operations.")
- return GetNsRecordOutput(nsr=nsr)
+ return GetNsRecord.Output(nsr=nsr)
-class NsDbActivity:
-
- """Perform Database operations for NS accounts.
-
- Args:
- db (database): Data Access Object
- """
-
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
- async def update_ns_state(self, data: UpdateNsStateInput) -> None:
- """
- Changes the state of the NS itself.
-
- Collaborators:
- DB Write: nsrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
+class UpdateNsStateImpl(UpdateNsState):
+ @activity.defn(name=UpdateNsState.__name__)
+ async def __call__(self, activity_input: UpdateNsState.Input) -> None:
update_ns_state = {
- "nsState": data.state.name,
+ "nsState": activity_input.state.name,
# "errorDescription" : data.message,
- "_admin.nsState": data.state.name,
- "_admin.detailed-status": data.message,
+ "_admin.nsState": activity_input.state.name,
+ "_admin.detailed-status": activity_input.message,
"_admin.modified": time(),
}
- self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state)
- self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}")
+ self.db.set_one("nsrs", {"_id": activity_input.ns_uuid}, update_ns_state)
+ self.logger.debug(
+ f"Updated NS {activity_input.ns_uuid} to {activity_input.state.name}"
+ )