OSMENG-1155: Implementation of Constants and Dataclasses
Add implementation for workflows and activities
Change-Id: I58226765c41d18821724ac5763a3fe390c371ca6
Signed-off-by: Dario Faccin <dario.faccin@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py
index 9581db0..156bc59 100644
--- a/osm_lcm/temporal/vnf_activities.py
+++ b/osm_lcm/temporal/vnf_activities.py
@@ -13,158 +13,65 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+from temporalio import activity
from typing import List, Any
-from osm_common.temporal_constants import (
- ACTIVITY_CHANGE_VNF_STATE,
- ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
- ACTIVITY_GET_TASK_QUEUE,
- ACTIVITY_GET_VIM_CLOUD,
- ACTIVITY_GET_VNF_DESCRIPTOR,
- ACTIVITY_GET_VNF_RECORD,
- ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
- ACTIVITY_SET_VNF_MODEL,
+from osm_common.temporal.activities.vnf import (
+ GetTaskQueue,
+ GetVimCloud,
+ GetVnfDescriptor,
+ GetVnfRecord,
+ ChangeVnfState,
+ ChangeVnfInstantiationState,
+ SetVnfModel,
+ SendNotificationForVnf,
+)
+from osm_common.temporal.dataclasses_common import VduComputeConstraints
+from osm_common.temporal_task_queues.task_queues_mappings import (
VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
-from osm_common.dataclasses.temporal_dataclasses import (
- ChangeVnfInstantiationStateInput,
- ChangeVnfStateInput,
- GetTaskQueueInput,
- GetTaskQueueOutput,
- GetVimCloudInput,
- GetVimCloudOutput,
- GetVnfDescriptorInput,
- GetVnfDescriptorOutput,
- GetVnfRecordInput,
- GetVnfRecordOutput,
- SetVnfModelInput,
- VduComputeConstraints,
-)
-from osm_lcm.data_utils.database.database import Database
-from temporalio import activity
-
CONFIG_IDENTIFIER = "config::"
-class VnfOperations:
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
- async def get_task_queue(
- self, get_task_queue_input: GetTaskQueueInput
- ) -> GetTaskQueueOutput:
- """Finds the appropriate task queue according to VIM type of VNF.
-
- Collaborators:
- DB read: vim_accounts, vnfrs
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- vnfr = self.db.get_one("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
+class GetTaskQueueImpl(GetTaskQueue):
+ @activity.defn(name=GetTaskQueue.__name__)
+ async def __call__(self, activity_input: GetTaskQueue.Input) -> GetTaskQueue.Output:
+ vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim_type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
- return GetTaskQueueOutput(task_queue)
+ return GetTaskQueue.Output(task_queue)
- @activity.defn(name=ACTIVITY_GET_VIM_CLOUD)
- async def get_vim_cloud(
- self, get_vim_cloud_input: GetVimCloudInput
- ) -> GetVimCloudOutput:
- """Finds the cloud by checking the VIM account of VNF.
- Collaborators:
- DB Read: vnfrs, vim_accounts
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- vnfr = self.db.get_one("vnfrs", {"_id": get_vim_cloud_input.vnfr_uuid})
+class GetVimCloudImpl(GetVimCloud):
+ @activity.defn(name=GetVimCloud.__name__)
+ async def __call__(self, activity_input: GetVimCloud.Input) -> GetVimCloud.Output:
+ vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
cloud = vim_record["config"].get("cloud", "")
self.logger.debug(f"Got the cloud type {cloud} for VNF operations.")
- return GetVimCloudOutput(cloud=cloud)
+ return GetVimCloud.Output(cloud=cloud)
- @activity.defn(name=ACTIVITY_GET_VNF_RECORD)
- async def get_vnf_record(
- self, get_vnf_record_input: GetVnfRecordInput
- ) -> GetVnfRecordOutput:
- """Gets the VNF record and VNF descriptor from Database.
- Collaborators:
- DB read: vnfrs
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than 10
- second).
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- This is an idempotent activity.
-
- """
- vnfr = self.db.get_one("vnfrs", {"_id": get_vnf_record_input.vnfr_uuid})
+class GetVnfRecordImpl(GetVnfRecord):
+ @activity.defn(name=GetVnfRecord.__name__)
+ async def __call__(self, activity_input: GetVnfRecord.Input) -> GetVnfRecord.Output:
+ vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
self.logger.debug("Got the vnfr from Database for VNF operations.")
- return GetVnfRecordOutput(vnfr=vnfr)
+ return GetVnfRecord.Output(vnfr=vnfr)
- @activity.defn(name=ACTIVITY_GET_VNF_DESCRIPTOR)
- async def get_vnf_descriptor(
- self, get_vnf_descriptor_input: GetVnfDescriptorInput
- ) -> GetVnfDescriptorOutput:
- """Gets the VNF record and VNF descriptor from Database.
- Collaborators:
- DB read: vnfds
+class GetVnfDescriptorImpl(GetVnfDescriptor):
+ @activity.defn(name=GetVnfDescriptor.__name__)
+ async def __call__(
+ self, activity_input: GetVnfDescriptor.Input
+ ) -> GetVnfDescriptor.Output:
+ vnfd = self.db.get_one("vnfds", {"_id": activity_input.vnfd_uuid})
+ return GetVnfDescriptor.Output(vnfd=vnfd)
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
- Activity Lifecycle:
- This activity should complete relatively quickly (less than 10
- second).
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- This is an idempotent activity.
-
- """
- vnfd = self.db.get_one("vnfds", {"_id": get_vnf_descriptor_input.vnfd_uuid})
- self.logger.debug("Got the vnfr and vnfd from Database for VNF operations.")
- return GetVnfDescriptorOutput(vnfd=vnfd)
-
+class VnfSpecifications:
@staticmethod
def get_vdu_instantiation_params(
vdu_id: str, vnf_instantiation_config: dict
@@ -179,7 +86,7 @@
compute_desc_id = vdu.get("virtual-compute-desc")
if not compute_desc_id:
return VduComputeConstraints(cores=0, mem=0)
- flavor_details = VnfOperations._get_flavor_details(compute_desc_id, vnfd)
+ flavor_details = VnfSpecifications._get_flavor_details(compute_desc_id, vnfd)
if not flavor_details:
return VduComputeConstraints(cores=0, mem=0)
@@ -198,11 +105,11 @@
def get_application_config(vdu: dict, vdu_instantiation_config: dict) -> dict:
configurable_properties = vdu.get("configurable-properties", [])
- config_from_descriptor = VnfOperations._get_only_config_items(
- VnfOperations._list_to_dict(configurable_properties)
+ config_from_descriptor = VnfSpecifications._get_only_config_items(
+ VnfSpecifications._list_to_dict(configurable_properties)
)
- config_from_instantiation = VnfOperations._get_only_config_items(
+ config_from_instantiation = VnfSpecifications._get_only_config_items(
vdu_instantiation_config
)
return {**config_from_descriptor, **config_from_instantiation}
@@ -224,125 +131,43 @@
}
-class VnfDbActivity:
- """Perform Database operations for VNF accounts.
-
- Args:
- db (Database): Data Access Object
- """
-
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_CHANGE_VNF_STATE)
- async def change_vnf_state(self, vnf_state_input: ChangeVnfStateInput) -> None:
- """Updates the VNF State in VNFR.
-
- Collaborators:
- DB Write: vnfrs
-
- Raises (retryable):
- DbException: If DB access/update fails, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- update_vnf_state = {"vnfState": vnf_state_input.state.name}
- self.db.set_one("vnfrs", {"_id": vnf_state_input.vnfr_uuid}, update_vnf_state)
+class ChangeVnfStateImpl(ChangeVnfState):
+ @activity.defn(name=ChangeVnfState.__name__)
+ async def __call__(self, activity_input: ChangeVnfState.Input) -> None:
+ update_vnf_state = {"vnfState": activity_input.state.name}
+ self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_vnf_state)
self.logger.debug(
- f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state.name}."
+ f"VNF {activity_input.vnfr_uuid} state is updated to {activity_input.state.name}."
)
- @activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
- async def change_vnf_instantiation_state(
- self, vnf_instantiation_state_input: ChangeVnfInstantiationStateInput
- ) -> None:
- """Updates the VNF Instantiation State in VNFR.
- Collaborators:
- DB Write: vnfrs
-
- Raises (retryable):
- DbException: If DB access or update fails, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
+class ChangeVnfInstantiationStateImpl(ChangeVnfInstantiationState):
+ @activity.defn(name=ChangeVnfInstantiationState.__name__)
+ async def __call__(self, activity_input: ChangeVnfInstantiationState.Input) -> None:
update_vnf_instantiation_state = {
- "instantiationState": vnf_instantiation_state_input.state.name
+ "instantiationState": activity_input.state.name
}
self.db.set_one(
"vnfrs",
- {"_id": vnf_instantiation_state_input.vnfr_uuid},
+ {"_id": activity_input.vnfr_uuid},
update_vnf_instantiation_state,
)
self.logger.debug(
- f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state.name}."
+ f"VNF {activity_input.vnfr_uuid} state is updated to {activity_input.state.name}."
)
- @activity.defn(name=ACTIVITY_SET_VNF_MODEL)
- async def set_vnf_model(self, set_vnf_model_input: SetVnfModelInput) -> None:
- """Updates the model name of VNF in VNFR.
- Collaborators:
- DB Write: vnfrs
-
- Raises (retryable):
- DbException: If DB access or update fails, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- update_namespace = {"namespace": set_vnf_model_input.model_name}
- self.db.set_one(
- "vnfrs", {"_id": set_vnf_model_input.vnfr_uuid}, update_namespace
- )
+class SetVnfModelImpl(SetVnfModel):
+ @activity.defn(name=SetVnfModel.__name__)
+ async def __call__(self, activity_input: SetVnfModel.Input) -> None:
+ update_namespace = {"namespace": activity_input.model_name}
+ self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_namespace)
self.logger.debug(
- f"VNF {set_vnf_model_input.vnfr_uuid} model name is updated to {set_vnf_model_input.model_name}."
+ f"VNF {activity_input.vnfr_uuid} model name is updated to {activity_input.model_name}."
)
-class VnfSendNotifications:
- """Perform Notification operations."""
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_SEND_NOTIFICATION_FOR_VNF)
- async def send_notification_for_vnf(
- self, input: ChangeVnfInstantiationStateInput
- ) -> None:
- """If VNF LCM operation state changes, send notification updates.
-
- This activity does nothing.
-
- """
+class SendNotificationForVnfImpl(SendNotificationForVnf):
+ @activity.defn(name=SendNotificationForVnf.__name__)
+ async def __call__(self, activity_input: SendNotificationForVnf.Input) -> None:
self.logger.debug("Send notification for VNF not implemented.")