OSMENG-1155: Implementation of Constants and Dataclasses
Add implementation for workflows and activities
Change-Id: I58226765c41d18821724ac5763a3fe390c371ca6
Signed-off-by: Dario Faccin <dario.faccin@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
diff --git a/osm_lcm/temporal/juju_paas_activities.py b/osm_lcm/temporal/juju_paas_activities.py
index 5082254..2915ee9 100644
--- a/osm_lcm/temporal/juju_paas_activities.py
+++ b/osm_lcm/temporal/juju_paas_activities.py
@@ -21,19 +21,15 @@
from juju.application import Application
from juju.controller import Controller
from n2vc.config import EnvironConfig
-from osm_common.dataclasses.temporal_dataclasses import (
- CharmInfo,
- CheckCharmStatusInput,
- ModelInfo,
- TestVimConnectivityInput,
- VduComputeConstraints,
- VduInstantiateInput,
+from osm_common.temporal.activities.paas import (
+ TestVimConnectivity,
+ CheckCharmStatus,
+ CreateModel,
+ DeployCharm,
)
-from osm_common.temporal_constants import (
- ACTIVITY_TEST_VIM_CONNECTIVITY,
- ACTIVITY_CHECK_CHARM_STATUS,
- ACTIVITY_CREATE_MODEL,
- ACTIVITY_DEPLOY_CHARM,
+from osm_common.temporal.dataclasses_common import (
+ CharmInfo,
+ VduComputeConstraints,
)
from osm_lcm.data_utils.database.database import Database
from temporalio import activity
@@ -123,187 +119,6 @@
application_constraints["cores"] = constraints.cores
return application_constraints
- @activity.defn(name=ACTIVITY_TEST_VIM_CONNECTIVITY)
- async def test_vim_connectivity(
- self, test_connectivity_input: TestVimConnectivityInput
- ) -> None:
- """Validates the credentials by attempting to connect to the given Juju Controller.
-
- Collaborators:
- DB Read: vim_accounts
- Juju Controller: Connect only
-
- Raises (Retryable):
- ApplicationError If any of password, cacert, cloud_credentials is invalid
- or Juju controller is not reachable
-
- Activity Lifecycle:
- This activity should complete relatively quickly (in a few seconds).
- However, it would be reasonable to wait more than 72 seconds (network timeout)
- incase there are network issues.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is recommended, although not necessary to implement a
- back-off strategy for this activity, as it will naturally block
- and wait on each connection attempt.
- """
- vim_id = test_connectivity_input.vim_uuid
- await self._get_controller(vim_id)
- message = f"Connection to juju controller succeeded for {vim_id}"
- self.logger.info(message)
-
- @activity.defn(name=ACTIVITY_CREATE_MODEL)
- async def create_model(self, create_model_input: ModelInfo) -> None:
- """Connects to Juju Controller. Creates a new model.
-
- Collaborators:
- DB Read: vim_accounts
- Juju Controller: Connect and create model.
-
- Raises (Retryable):
- ApplicationError If Juju controller is not reachable.
- If the model already exists.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (in a few seconds).
- However, it would be reasonable to wait more than 72 seconds (network timeout)
- incase there are network issues.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is recommended, although not necessary to implement a
- back-off strategy for this activity, as it will naturally block
- and wait on each connection attempt.
- """
- controller = await self._get_controller(create_model_input.vim_uuid)
- if create_model_input.model_name in await controller.list_models():
- self.logger.debug(f"Model {create_model_input.model_name} already created")
- return
-
- vim_content = self.db.get_one(
- "vim_accounts", {"_id": create_model_input.vim_uuid}
- )
- vim_config = vim_content["config"]
-
- config = {
- "endpoints": ",".join(await controller.api_endpoints),
- "user": vim_content["vim_user"],
- "secret": self._decrypt_password(vim_content),
- "cacert": base64.b64encode(
- vim_config["ca_cert_content"].encode("utf-8")
- ).decode("utf-8"),
- "authorized-keys": vim_config["authorized_keys"],
- }
-
- self.logger.debug(f"Creating model {create_model_input.model_name}")
- await controller.add_model(
- create_model_input.model_name,
- config=config,
- cloud_name=vim_config["cloud"],
- credential_name=vim_config["cloud_credentials"],
- )
- self.logger.debug(f"Model {create_model_input.model_name} created")
-
- @activity.defn(name=ACTIVITY_DEPLOY_CHARM)
- async def deploy_charm(self, deploy_charm_input: VduInstantiateInput) -> None:
- """Deploys a charm.
-
- Collaborators:
- DB Read: vim_accounts
- Juju Controller: Connect and deploy charm
-
- Raises (Retryable):
- ApplicationError If Juju controller is not reachable
- If application already exists
-
- Activity Lifecycle:
- This activity should complete relatively quickly (in a few seconds).
- However, it would be reasonable to wait more than 72 seconds (network timeout)
- incase there are network issues.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is recommended, although not necessary to implement a
- back-off strategy for this activity, as it will naturally block
- and wait on each connection attempt.
- """
- model_name = deploy_charm_input.model_name
- charm_info = deploy_charm_input.charm_info
- application_name = charm_info.app_name
- constraints = JujuPaasConnector._get_application_constraints(
- deploy_charm_input.constraints, deploy_charm_input.cloud
- )
- controller = await self._get_controller(deploy_charm_input.vim_uuid)
- model = await controller.get_model(model_name)
- if application_name in model.applications:
- raise Exception("Application {} already exists".format(application_name))
- await model.deploy(
- entity_url=charm_info.entity_url,
- application_name=application_name,
- channel=charm_info.channel,
- constraints=constraints if constraints else None,
- config=deploy_charm_input.config,
- )
-
- @activity.defn(name=ACTIVITY_CHECK_CHARM_STATUS)
- async def check_charm_status(
- self, check_charm_status: CheckCharmStatusInput
- ) -> None:
- """Checks the ready status of the charm. This activity will block until the status of
- the application is either "active" or "blocked". Additionally, it also blocks until
- the workload status of each of its units is also either "active" or "blocked".
-
- Collaborators:
- DB Read: vim_accounts
- Juju Controller: Connect to controller and check charm status.
-
- Raises (Retryable):
- ApplicationError If any of password, cacert, cloud_credentials is invalid
- or Juju controller is not reachable
-
- Activity Lifecycle:
- This activity will continue indefinitely until the specified charm deployment
- has reached a ready state. Heartbeats are performed to ensure this activity
- does not timeout.
-
- A start-to-close of something reasonable (such as 5 minutes) should be implemented
- at the workflow level and such a timeout shall trigger workflow failure logic.
- """
-
- controller = await self._get_controller(check_charm_status.vim_uuid)
- model = await controller.get_model(check_charm_status.model_name)
- application = model.applications[check_charm_status.application_name]
-
- ready = False
- last_status = None
- application_status = None
- last_unit_status = {}
-
- while not ready:
- activity.heartbeat()
- await asyncio.sleep(check_charm_status.poll_interval)
- # Perform the fetch of the status only once and keep it locally
- application_status = application.status
- if application_status != last_status:
- last_status = application_status
- self.logger.debug(
- f"Application `{check_charm_status.application_name}` is {application_status}"
- )
-
- if application_status in ["active", "blocked"]:
- # Check each unit to see if they are also ready
- if not self._check_units_ready(
- application=application, last_unit_status=last_unit_status
- ):
- continue
- else:
- continue
- ready = True
-
def _check_units_ready(
self, application: Application, last_unit_status: dict
) -> bool:
@@ -319,6 +134,102 @@
return True
+class TestVimConnectivityImpl(TestVimConnectivity):
+ @activity.defn(name=TestVimConnectivity.__name__)
+ async def __call__(self, activity_input: TestVimConnectivity.Input) -> None:
+ vim_id = activity_input.vim_uuid
+ await self.juju_controller._get_controller(vim_id)
+ message = f"Connection to juju controller succeeded for {vim_id}"
+ self.logger.info(message)
+
+
+class CreateModelImpl(CreateModel):
+ @activity.defn(name=CreateModel.__name__)
+ async def __call__(self, activity_input: CreateModel.Input) -> None:
+ controller = await self.juju_controller._get_controller(activity_input.vim_uuid)
+ if activity_input.model_name in await controller.list_models():
+ self.logger.debug(f"Model {activity_input.model_name} already created")
+ return
+
+ vim_content = self.db.get_one("vim_accounts", {"_id": activity_input.vim_uuid})
+ vim_config = vim_content["config"]
+
+ config = {
+ "endpoints": ",".join(await controller.api_endpoints),
+ "user": vim_content["vim_user"],
+ "secret": self.juju_controller._decrypt_password(vim_content),
+ "cacert": base64.b64encode(
+ vim_config["ca_cert_content"].encode("utf-8")
+ ).decode("utf-8"),
+ "authorized-keys": vim_config["authorized_keys"],
+ }
+
+ self.logger.debug(f"Creating model {activity_input.model_name}")
+ await controller.add_model(
+ activity_input.model_name,
+ config=config,
+ cloud_name=vim_config["cloud"],
+ credential_name=vim_config["cloud_credentials"],
+ )
+ self.logger.debug(f"Model {activity_input.model_name} created")
+
+
+class DeployCharmImpl(DeployCharm):
+ @activity.defn(name=DeployCharm.__name__)
+ async def __call__(self, activity_input: DeployCharm.Input) -> None:
+ model_name = activity_input.model_name
+ charm_info = activity_input.charm_info
+ application_name = charm_info.app_name
+ constraints = self.juju_controller._get_application_constraints(
+ activity_input.constraints, activity_input.cloud
+ )
+ controller = await self.juju_controller._get_controller(activity_input.vim_uuid)
+ model = await controller.get_model(model_name)
+ if application_name in model.applications:
+ raise Exception("Application {} already exists".format(application_name))
+ await model.deploy(
+ entity_url=charm_info.entity_url,
+ application_name=application_name,
+ channel=charm_info.channel,
+ constraints=constraints if constraints else None,
+ config=activity_input.config,
+ )
+
+
+class CheckCharmStatusImpl(CheckCharmStatus):
+ @activity.defn(name=CheckCharmStatus.__name__)
+ async def __call__(self, activity_input: CheckCharmStatus.Input) -> None:
+ controller = await self.juju_controller._get_controller(activity_input.vim_uuid)
+ model = await controller.get_model(activity_input.model_name)
+ application = model.applications[activity_input.application_name]
+
+ ready = False
+ last_status = None
+ application_status = None
+ last_unit_status = {}
+
+ while not ready:
+ activity.heartbeat()
+ await asyncio.sleep(activity_input.poll_interval)
+ # Perform the fetch of the status only once and keep it locally
+ application_status = application.status
+ if application_status != last_status:
+ last_status = application_status
+ self.logger.debug(
+ f"Application `{activity_input.application_name}` is {application_status}"
+ )
+
+ if application_status in ["active", "blocked"]:
+ # Check each unit to see if they are also ready
+ if not self.juju_controller._check_units_ready(
+ application=application, last_unit_status=last_unit_status
+ ):
+ continue
+ else:
+ continue
+ ready = True
+
+
class CharmInfoUtils:
@staticmethod
def get_charm_info(vdu: dict, sw_image_descs: list) -> CharmInfo:
diff --git a/osm_lcm/temporal/lcm_activities.py b/osm_lcm/temporal/lcm_activities.py
index 6ddb0f3..3400e94 100644
--- a/osm_lcm/temporal/lcm_activities.py
+++ b/osm_lcm/temporal/lcm_activities.py
@@ -13,83 +13,45 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+
+from temporalio import activity
import time
-from osm_common.dataclasses.temporal_dataclasses import (
- NsLcmOperationInput,
- UpdateLcmOperationStateInput,
+from osm_common.temporal.activities.lcm import (
+ NsLcmNoOp,
+ UpdateNsLcmOperationState,
)
-from osm_common.temporal_constants import (
- ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- ACTIVITY_NSLCM_NO_OP,
-)
-from osm_lcm.data_utils.database.database import Database
-from temporalio import activity
-class NsLcmActivity:
- """
- Handles NS Lifecycle Managment operations.
- Args:
- db (Database): Data Access Object
- """
+class NsLcmNoOpImpl(NsLcmNoOp):
+ @activity.defn(name=NsLcmNoOp.__name__)
+ async def __call__(self, activity_input: NsLcmNoOp.Input) -> None:
+ self.logger.debug(f"Called with: {activity_input.nslcmop}")
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- @activity.defn(name=ACTIVITY_NSLCM_NO_OP)
- async def no_op(self, input: NsLcmOperationInput) -> None:
- """
- This is a simple No Operation Activity that simply logs the data
- with which it was called. It can be used as a placeholder when
- developing workflows, or can be enhanced with logic to throw
- exceptions on specific conditions to test exception handling in
- a workflow.
- """
- self.logger.debug(f"Called with: {input.nslcmop}")
-
- @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
- async def update_ns_lcm_operation_state(
- self, data: UpdateLcmOperationStateInput
- ) -> None:
- """
- Changes the state of a LCM operation task. Should be done to
- indicate progress, or completion of the task itself.
-
- Collaborators:
- DB Write: nslcmops
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
+class UpdateNsLcmOperationStateImpl(UpdateNsLcmOperationState):
+ @activity.defn(name=UpdateNsLcmOperationState.__name__)
+ async def __call__(self, activity_input: UpdateNsLcmOperationState.Input):
now = time.time()
update_lcm_operation = {
"_admin.modified": now,
}
- if data.op_state is not None:
- update_lcm_operation["operationState"] = data.op_state.name
+ if activity_input.op_state is not None:
+ update_lcm_operation["operationState"] = activity_input.op_state.name
update_lcm_operation["statusEnteredTime"] = now
- if data.stage is not None:
- update_lcm_operation["stage"] = data.stage
+ if activity_input.stage is not None:
+ update_lcm_operation["stage"] = activity_input.stage
- if data.error_message is not None:
- update_lcm_operation["errorMessage"] = data.error_message
+ if activity_input.error_message is not None:
+ update_lcm_operation["errorMessage"] = activity_input.error_message
- if data.detailed_status is not None:
- update_lcm_operation["detailedStatus"] = data.detailed_status
+ if activity_input.detailed_status is not None:
+ update_lcm_operation["detailedStatus"] = activity_input.detailed_status
- self.db.set_one("nslcmops", {"_id": data.op_id}, update_lcm_operation)
+ self.db.set_one("nslcmops", {"_id": activity_input.op_id}, update_lcm_operation)
self.logger.debug(
- f"Updated LCM Operation {data.op_id} to {update_lcm_operation}"
+ f"Updated LCM Operation {activity_input.op_id} to {update_lcm_operation}"
)
diff --git a/osm_lcm/temporal/lcm_workflows.py b/osm_lcm/temporal/lcm_workflows.py
index 6c8c28f..aaa9870 100644
--- a/osm_lcm/temporal/lcm_workflows.py
+++ b/osm_lcm/temporal/lcm_workflows.py
@@ -14,139 +14,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-from abc import ABC, abstractmethod
-from datetime import timedelta
-
-from osm_common.dataclasses.temporal_dataclasses import (
- LcmOperationState,
- NsLcmOperationInput,
- UpdateLcmOperationStateInput,
-)
-from osm_common.temporal_constants import (
- ACTIVITY_NSLCM_NO_OP,
- ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- WORKFLOW_NSLCM_NO_OP,
-)
from temporalio import workflow
-from temporalio.common import RetryPolicy
-from temporalio.exceptions import ActivityError, ChildWorkflowError
+
+from osm_common.temporal.workflows.lcm import NsNoOpWorkflow
+from osm_common.temporal.activities.lcm import NsLcmNoOp
-class LcmOperationWorkflow(ABC):
- """
- An abstract base class representing a Lifecycle Management Operation. Any
- workflows that need LCM OP control should extend this class and implement
- the workflow method.
-
- Methods
- -------
-
- @abstractmethod workflow(input: NsLcmOperationInput)
- Method for subclasses to implement the actual workflow that is being
- wrapped in this operation.
-
- @workflow.run wrap_nslcmop(input: NsLcmOperationInput)
- Must be implemented in every subclass exactly as follows:
- @workflow.run
- async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
- await super().wrap_nslcmop(input=input)
- """
-
- _SANDBOXED = False
- retry_policy = RetryPolicy(maximum_attempts=3)
- no_retry_policy = RetryPolicy(maximum_attempts=1)
- default_schedule_to_close_timeout = timedelta(minutes=10)
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
- self.op_id = None
- self.stage = ""
-
- @abstractmethod
- async def workflow(self, input: NsLcmOperationInput):
- pass
-
- async def wrap_nslcmop(self, input: NsLcmOperationInput):
- self.op_id = input.nslcmop["_id"]
- await self.update_operation_state(LcmOperationState.PROCESSING)
- try:
- await self.workflow(input=input)
-
- except ActivityError as e:
- err_details = str(e.cause.with_traceback(e.__traceback__))
- self.logger.error(err_details)
- await self.update_operation_state(
- LcmOperationState.FAILED,
- error_message=str(e.cause.message),
- detailed_status=err_details,
- )
- raise e
-
- except ChildWorkflowError as e:
- err_details = str(e.cause.with_traceback(e.cause.__traceback__))
- self.logger.error(err_details)
- await self.update_operation_state(
- LcmOperationState.FAILED,
- error_message=str(e.cause.message),
- detailed_status=err_details,
- )
- raise e
-
- except Exception as e:
- self.logger.exception(e)
- await self.update_operation_state(
- LcmOperationState.FAILED,
- error_message=str(e),
- detailed_status=str(e),
- )
- raise e
-
- await self.update_operation_state(LcmOperationState.COMPLETED)
-
- async def update_operation_state(
- self,
- op_state: LcmOperationState,
- stage: str = None,
- error_message: str = "",
- detailed_status: str = "",
- ) -> None:
- if stage is not None:
- self.stage = stage
- input = UpdateLcmOperationStateInput(
- op_id=self.op_id,
- op_state=op_state,
- stage=self.stage,
- error_message=error_message,
- detailed_status=detailed_status,
- )
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- arg=input,
- activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}",
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
- )
-
-
-@workflow.defn(name=WORKFLOW_NSLCM_NO_OP, sandboxed=LcmOperationWorkflow._SANDBOXED)
-class NsNoOpWorkflow(LcmOperationWorkflow):
+@workflow.defn(name=NsNoOpWorkflow.__name__, sandboxed=False)
+class NsNoOpWorkflowImpl(NsNoOpWorkflow):
"""
This is a simple No Operation workflow that simply calls a No Operation
activity. It can be used as a placeholder when developing workflows.
"""
@workflow.run
- async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
- await super().wrap_nslcmop(input=input)
+ async def wrap_nslcmop(self, workflow_input: NsNoOpWorkflow.Input) -> None:
+ await super().wrap_nslcmop(workflow_input=workflow_input)
- async def workflow(self, input: NsLcmOperationInput) -> None:
- self.logger.debug(f"Called with: {input.nslcmop}")
+ async def run(self, workflow_input: NsNoOpWorkflow.Input) -> None:
+ self.logger.debug(f"Called with: {workflow_input.nslcmop}")
await workflow.execute_activity(
- activity=ACTIVITY_NSLCM_NO_OP,
- arg=input,
- activity_id=f"{ACTIVITY_NSLCM_NO_OP}-{self.op_id}",
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
+ activity=NsLcmNoOp.__name__,
+ arg=workflow_input,
+ activity_id=f"{NsLcmNoOp.__name__}-{self.op_id}",
+ schedule_to_close_timeout=NsNoOpWorkflow.default_schedule_to_close_timeout,
+ retry_policy=NsNoOpWorkflow.retry_policy,
)
diff --git a/osm_lcm/temporal/ns_activities.py b/osm_lcm/temporal/ns_activities.py
index 8c7c182..f25f78a 100644
--- a/osm_lcm/temporal/ns_activities.py
+++ b/osm_lcm/temporal/ns_activities.py
@@ -14,117 +14,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+from temporalio import activity
from time import time
-from osm_common.dataclasses.temporal_dataclasses import (
- GetNsRecordInput,
- GetNsRecordOutput,
- GetVnfDetailsInput,
- GetVnfDetailsOutput,
- UpdateNsStateInput,
+from osm_common.temporal.activities.ns import (
+ GetNsRecord,
+ GetVnfDetails,
+ UpdateNsState,
)
-from osm_common.temporal_constants import (
- ACTIVITY_GET_NS_RECORD,
- ACTIVITY_GET_VNF_DETAILS,
- ACTIVITY_UPDATE_NS_STATE,
-)
-from osm_lcm.data_utils.database.database import Database
-from temporalio import activity
-class NsOperations:
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
- async def get_vnf_details(
- self, get_vnf_details_input: GetVnfDetailsInput
- ) -> GetVnfDetailsOutput:
- """
- Gets the list of VNF record IDs, VNF member-index-refs for a given NS record ID.
-
- Collaborators:
- DB Read: vnfrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its short-running nature.
-
- Since this activity only reads from the DB, it is safe to retry, although
- you may wish to have some back-off policy.
- """
- vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": get_vnf_details_input.ns_uuid})
- return GetVnfDetailsOutput(
+class GetVnfDetailsImpl(GetVnfDetails):
+ @activity.defn(name=GetVnfDetails.__name__)
+ async def __call__(
+ self, activity_input: GetVnfDetails.Input
+ ) -> GetVnfDetails.Output:
+ vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": activity_input.ns_uuid})
+ return GetVnfDetails.Output(
vnf_details=[(vnfr["id"], vnfr["member-vnf-index-ref"]) for vnfr in vnfrs]
)
- @activity.defn(name=ACTIVITY_GET_NS_RECORD)
- async def get_ns_record(
- self, get_ns_record_input: GetNsRecordInput
- ) -> GetNsRecordOutput:
- """Gets the NS record from Database.
- Collaborators:
- DB Read: nsrs
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than 10
- second).
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- This is an idempotent activity.
-
- """
- nsr = self.db.get_one("nsrs", {"_id": get_ns_record_input.nsr_uuid})
+class GetNsRecordImpl(GetNsRecord):
+ @activity.defn(name=GetNsRecord.__name__)
+ async def __call__(self, activity_input: GetNsRecord.Input) -> GetNsRecord.Output:
+ nsr = self.db.get_one("nsrs", {"_id": activity_input.nsr_uuid})
self.logger.debug("Got the nsr from Database for VNF operations.")
- return GetNsRecordOutput(nsr=nsr)
+ return GetNsRecord.Output(nsr=nsr)
-class NsDbActivity:
-
- """Perform Database operations for NS accounts.
-
- Args:
- db (database): Data Access Object
- """
-
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
- async def update_ns_state(self, data: UpdateNsStateInput) -> None:
- """
- Changes the state of the NS itself.
-
- Collaborators:
- DB Write: nsrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
+class UpdateNsStateImpl(UpdateNsState):
+ @activity.defn(name=UpdateNsState.__name__)
+ async def __call__(self, activity_input: UpdateNsState.Input) -> None:
update_ns_state = {
- "nsState": data.state.name,
+ "nsState": activity_input.state.name,
# "errorDescription" : data.message,
- "_admin.nsState": data.state.name,
- "_admin.detailed-status": data.message,
+ "_admin.nsState": activity_input.state.name,
+ "_admin.detailed-status": activity_input.message,
"_admin.modified": time(),
}
- self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state)
- self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}")
+ self.db.set_one("nsrs", {"_id": activity_input.ns_uuid}, update_ns_state)
+ self.logger.debug(
+ f"Updated NS {activity_input.ns_uuid} to {activity_input.state.name}"
+ )
diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py
index 6281ee6..58bb40e 100644
--- a/osm_lcm/temporal/ns_workflows.py
+++ b/osm_lcm/temporal/ns_workflows.py
@@ -17,86 +17,74 @@
import asyncio
import traceback
-from osm_common.dataclasses.temporal_dataclasses import (
- GetNsRecordInput,
- GetNsRecordOutput,
- GetVnfDetailsInput,
- GetVnfDetailsOutput,
- ModelInfo,
- NsLcmOperationInput,
- NsState,
- UpdateNsStateInput,
- VnfInstantiateInput,
+from osm_common.temporal.activities.paas import CreateModel
+from osm_common.temporal.activities.ns import (
+ GetVnfDetails,
+ GetNsRecord,
+ UpdateNsState,
)
-from osm_common.temporal_constants import (
- ACTIVITY_CREATE_MODEL,
- ACTIVITY_GET_NS_RECORD,
- ACTIVITY_GET_VNF_DETAILS,
- ACTIVITY_UPDATE_NS_STATE,
- WORKFLOW_NS_INSTANTIATE,
- WORKFLOW_VNF_INSTANTIATE,
-)
-from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow
+from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
+from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
+from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow
+from osm_common.temporal.states import NsState
from temporalio import workflow
from temporalio.converter import value_to_type
from temporalio.exceptions import ActivityError, ChildWorkflowError
-@workflow.defn(name=WORKFLOW_NS_INSTANTIATE, sandboxed=LcmOperationWorkflow._SANDBOXED)
-class NsInstantiateWorkflow(LcmOperationWorkflow):
- """Instantiate a NS"""
-
+@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=False)
+class NsInstantiateWorkflowImpl(LcmOperationWorkflow):
@workflow.run
- async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
- await super().wrap_nslcmop(input=input)
+ async def wrap_nslcmop(self, workflow_input: NsInstantiateWorkflow.Input) -> None:
+ await super().wrap_nslcmop(workflow_input=workflow_input)
- async def workflow(self, input: NsLcmOperationInput) -> None:
- self.logger.info(f"Executing {WORKFLOW_NS_INSTANTIATE} with {input}")
+ async def run(self, workflow_input: NsInstantiateWorkflow.Input) -> None:
+ self.logger.info(f"Executing {NsInstantiateWorkflow.__name__} with {input}")
# TODO: Can we clean up the input? Perhaps this workflow could receive NsInstantiateInput directly.
- ns_uuid = input.nslcmop["nsInstanceId"]
- vim_uuid = input.nslcmop["operationParams"]["vimAccountId"]
+ ns_uuid = workflow_input.nslcmop["nsInstanceId"]
+ vim_uuid = workflow_input.nslcmop["operationParams"]["vimAccountId"]
model_name = self._get_namespace(ns_uuid, vim_uuid)
try:
await workflow.execute_activity(
- activity=ACTIVITY_CREATE_MODEL,
- arg=ModelInfo(vim_uuid=vim_uuid, model_name=model_name),
- activity_id=f"{ACTIVITY_CREATE_MODEL}-{ns_uuid}",
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
+ activity=CreateModel.__name__,
+ arg=CreateModel.Input(vim_uuid=vim_uuid, model_name=model_name),
+ activity_id=f"{CreateModel.__name__}-{ns_uuid}",
+ schedule_to_close_timeout=NsInstantiateWorkflow.default_schedule_to_close_timeout,
+ retry_policy=NsInstantiateWorkflow.no_retry_policy,
)
activities_results = await asyncio.gather(
workflow.execute_activity(
- activity=ACTIVITY_GET_VNF_DETAILS,
- arg=GetVnfDetailsInput(ns_uuid=ns_uuid),
- activity_id=f"{ACTIVITY_GET_VNF_DETAILS}-{ns_uuid}",
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
+ activity=GetVnfDetails.__name__,
+ arg=GetVnfDetails.Input(ns_uuid=ns_uuid),
+ activity_id=f"{GetVnfDetails.__name__}-{ns_uuid}",
+ schedule_to_close_timeout=NsInstantiateWorkflow.default_schedule_to_close_timeout,
+ retry_policy=NsInstantiateWorkflow.no_retry_policy,
),
workflow.execute_activity(
- activity=ACTIVITY_GET_NS_RECORD,
- arg=GetNsRecordInput(nsr_uuid=ns_uuid),
- activity_id=f"{ACTIVITY_GET_NS_RECORD}-{ns_uuid}",
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
+ activity=GetNsRecord.__name__,
+ arg=GetNsRecord.Input(nsr_uuid=ns_uuid),
+ activity_id=f"{GetNsRecord.__name__}-{ns_uuid}",
+ schedule_to_close_timeout=NsInstantiateWorkflow.default_schedule_to_close_timeout,
+ retry_policy=NsInstantiateWorkflow.no_retry_policy,
),
)
get_vnf_details, get_ns_record = value_to_type(
- GetVnfDetailsOutput, activities_results[0]
- ), value_to_type(GetNsRecordOutput, activities_results[1])
+ GetVnfDetails.Output, activities_results[0]
+ ), value_to_type(GetNsRecord.Output, activities_results[1])
await asyncio.gather(
*(
workflow.execute_child_workflow(
- workflow=WORKFLOW_VNF_INSTANTIATE,
- arg=VnfInstantiateInput(
+ workflow=VnfInstantiateWorkflow.__name__,
+ arg=VnfInstantiateWorkflow.Input(
vnfr_uuid=vnfr_uuid,
model_name=model_name,
- instantiation_config=NsInstantiateWorkflow.get_vnf_config(
+ instantiation_config=NsInstantiateWorkflowImpl.get_vnf_config(
vnf_member_index_ref, get_ns_record.nsr
),
),
- id=f"{WORKFLOW_VNF_INSTANTIATE}-{vnfr_uuid}",
+ id=f"{VnfInstantiateWorkflow.__name__}-{vnfr_uuid}",
)
for vnfr_uuid, vnf_member_index_ref in get_vnf_details.vnf_details
)
@@ -105,19 +93,25 @@
except ActivityError as e:
err_details = str(e.cause.with_traceback(e.__traceback__))
await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, err_details)
- self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {err_details}")
+ self.logger.error(
+ f"{NsInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
except ChildWorkflowError as e:
err_details = str(e.cause.with_traceback(e.cause.__traceback__))
await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, err_details)
- self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {err_details}")
+ self.logger.error(
+ f"{NsInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
except Exception as e:
err_details = str(traceback.format_exc())
await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, err_details)
- self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {err_details}")
+ self.logger.error(
+ f"{NsInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, "Done")
@@ -128,13 +122,13 @@
state: NsState,
message: str,
) -> None:
- input = UpdateNsStateInput(ns_uuid, state, message)
+ activity_input = UpdateNsState.Input(ns_uuid, state, message)
await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_NS_STATE,
- arg=input,
- activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{ns_uuid}",
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
+ activity=UpdateNsState.__name__,
+ arg=activity_input,
+ activity_id=f"{UpdateNsState.__name__}-{ns_uuid}",
+ schedule_to_close_timeout=NsInstantiateWorkflow.default_schedule_to_close_timeout,
+ retry_policy=NsInstantiateWorkflow.retry_policy,
)
def _get_namespace(self, ns_id: str, vim_id: str) -> str:
diff --git a/osm_lcm/temporal/vdu_workflows.py b/osm_lcm/temporal/vdu_workflows.py
index 7349535..8dc5ba9 100644
--- a/osm_lcm/temporal/vdu_workflows.py
+++ b/osm_lcm/temporal/vdu_workflows.py
@@ -15,20 +15,15 @@
# limitations under the License.
from datetime import timedelta
-import logging
import traceback
-from osm_common.dataclasses.temporal_dataclasses import (
- VduInstantiateInput,
- CheckCharmStatusInput,
+from osm_common.temporal.activities.paas import (
+ DeployCharm,
+ CheckCharmStatus,
)
+from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
-from osm_common.temporal_constants import (
- ACTIVITY_DEPLOY_CHARM,
- ACTIVITY_CHECK_CHARM_STATUS,
- LCM_TASK_QUEUE,
- WORKFLOW_VDU_INSTANTIATE,
-)
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError
@@ -38,51 +33,58 @@
default_schedule_to_close_timeout = timedelta(minutes=10)
-@workflow.defn(name=WORKFLOW_VDU_INSTANTIATE, sandboxed=_SANDBOXED)
-class VduInstantiateWorkflow:
- """Instantiate a VDU"""
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
-
+@workflow.defn(name=VduInstantiateWorkflow.__name__, sandboxed=False)
+class VduInstantiateWorkflowImpl(VduInstantiateWorkflow):
@workflow.run
- async def run(self, input: VduInstantiateInput) -> None:
+ async def run(self, workflow_input: VduInstantiateWorkflow.Input) -> None:
+ deploy_charm_input = DeployCharm.Input(
+ vim_uuid=workflow_input.vim_uuid,
+ model_name=workflow_input.model_name,
+ charm_info=workflow_input.charm_info,
+ constraints=workflow_input.constraints,
+ cloud=workflow_input.cloud,
+ config=workflow_input.config,
+ )
try:
- self.logger.info(f"Deploying VDU `{input.charm_info.app_name}`")
+ self.logger.info(f"Deploying VDU `{workflow_input.charm_info.app_name}`")
await workflow.execute_activity(
- activity=ACTIVITY_DEPLOY_CHARM,
- arg=input,
- activity_id=f"{ACTIVITY_DEPLOY_CHARM}-{input.vim_uuid}",
+ activity=DeployCharm.__name__,
+ arg=deploy_charm_input,
+ activity_id=f"{DeployCharm.__name__}-{workflow_input.vim_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
self.logger.info(
- f"Waiting for VDU `{input.charm_info.app_name}` to become ready"
+ f"Waiting for VDU `{workflow_input.charm_info.app_name}` to become ready"
)
await workflow.execute_activity(
- activity=ACTIVITY_CHECK_CHARM_STATUS,
- arg=CheckCharmStatusInput(
- vim_uuid=input.vim_uuid,
- model_name=input.model_name,
- application_name=input.charm_info.app_name,
+ activity=CheckCharmStatus.__name__,
+ arg=CheckCharmStatus.Input(
+ vim_uuid=workflow_input.vim_uuid,
+ model_name=workflow_input.model_name,
+ application_name=workflow_input.charm_info.app_name,
),
- activity_id=f"{ACTIVITY_CHECK_CHARM_STATUS}-{input.vim_uuid}",
+ activity_id=f"{CheckCharmStatus.__name__}-{workflow_input.vim_uuid}",
task_queue=LCM_TASK_QUEUE,
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
retry_policy=retry_policy,
)
- self.logger.info(f"VDU `{input.charm_info.app_name}` is ready")
+ self.logger.info(f"VDU `{workflow_input.charm_info.app_name}` is ready")
except ActivityError as e:
err_details = str(e.cause.with_traceback(e.__traceback__))
- self.logger.error(f"{WORKFLOW_VDU_INSTANTIATE} failed with {err_details}")
+ self.logger.error(
+ f"{VduInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
except Exception as e:
err_details = str(traceback.format_exc())
- self.logger.error(f"{WORKFLOW_VDU_INSTANTIATE} failed with {err_details}")
+ self.logger.error(
+ f"{VduInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
diff --git a/osm_lcm/temporal/vim_activities.py b/osm_lcm/temporal/vim_activities.py
index e9db668..708c423 100644
--- a/osm_lcm/temporal/vim_activities.py
+++ b/osm_lcm/temporal/vim_activities.py
@@ -14,115 +14,51 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-from time import time
-
-from osm_common.temporal_constants import (
- ACTIVITY_DELETE_VIM,
- ACTIVITY_UPDATE_VIM_OPERATION_STATE,
- ACTIVITY_UPDATE_VIM_STATE,
-)
-from osm_common.dataclasses.temporal_dataclasses import (
- DeleteVimInput,
- UpdateVimOperationStateInput,
- UpdateVimStateInput,
-)
-from osm_lcm.data_utils.database.database import Database
from temporalio import activity
+from time import time
+from osm_common.temporal.activities.vim import (
+ UpdateVimState,
+ UpdateVimOperationState,
+ DeleteVimRecord,
+)
-class VimDbActivity:
- """Perform Database operations for VIM accounts.
-
- Args:
- db (Database): Data Access Object
- """
-
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE)
- async def update_vim_state(self, data: UpdateVimStateInput) -> None:
- """
- Changes the state of the VIM itself. Should be either
- ENABLED or ERROR, however this activity does not validate
- the state as no validation was done in OSM previously.
-
- Collaborators:
- DB Write: vim_accounts
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
+class UpdateVimStateImpl(UpdateVimState):
+ @activity.defn(name=UpdateVimState.__name__)
+ async def __call__(self, activity_input: UpdateVimState.Input) -> None:
update_vim_state = {
- "_admin.operationalState": data.operational_state.name,
- "_admin.detailed-status": data.message,
+ "_admin.operationalState": activity_input.operational_state.name,
+ "_admin.detailed-status": activity_input.message,
"_admin.modified": time(),
}
- self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_vim_state)
+ self.db.set_one(
+ "vim_accounts", {"_id": activity_input.vim_uuid}, update_vim_state
+ )
self.logger.debug(
- f"Updated VIM {data.vim_uuid} to {data.operational_state.name}"
+ f"Updated VIM {activity_input.vim_uuid} to {activity_input.operational_state.name}"
)
- @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE)
- async def update_vim_operation_state(
- self, data: UpdateVimOperationStateInput
- ) -> None:
- """
- Changes the state of a VIM operation task. Should be done to
- indicate progress, or completion of the task itself.
- Collaborators:
- DB Write: vim_accounts
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
+class UpdateVimOperationStateImpl(UpdateVimOperationState):
+ @activity.defn(name=UpdateVimOperationState.__name__)
+ async def __call__(self, activity_input: UpdateVimOperationState.Input) -> None:
update_operation_state = {
- f"_admin.operations.{format(data.op_id)}.operationState": data.op_state.name,
- f"_admin.operations.{format(data.op_id)}.detailed-status": data.message,
+ f"_admin.operations.{format(activity_input.op_id)}.operationState": activity_input.op_state.name,
+ f"_admin.operations.{format(activity_input.op_id)}.detailed-status": activity_input.message,
"_admin.current_operation": None,
}
- self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_operation_state)
+ self.db.set_one(
+ "vim_accounts", {"_id": activity_input.vim_uuid}, update_operation_state
+ )
self.logger.debug(
- f"Updated VIM {data.vim_uuid} OP ID {data.op_id} to {data.op_state.name}"
+ f"Updated VIM {activity_input.vim_uuid} OP ID {activity_input.op_id} to {activity_input.op_state.name}"
)
- @activity.defn(name=ACTIVITY_DELETE_VIM)
- async def delete_vim_record(self, data: DeleteVimInput) -> None:
- """
- Deletes the VIM record from the database.
- Collaborators:
- DB Delete: vim_accounts
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
- """
-
- self.db.del_one("vim_accounts", {"_id": data.vim_uuid})
- self.logger.debug(f"Removed VIM {data.vim_uuid}")
+class DeleteVimRecordImpl(DeleteVimRecord):
+ @activity.defn(name=DeleteVimRecord.__name__)
+ async def __call__(self, activity_input: DeleteVimRecord.Input) -> None:
+ self.db.del_one("vim_accounts", {"_id": activity_input.vim_uuid})
+ self.logger.debug(f"Removed VIM {activity_input.vim_uuid}")
diff --git a/osm_lcm/temporal/vim_workflows.py b/osm_lcm/temporal/vim_workflows.py
index 5d6b08c..5123132 100644
--- a/osm_lcm/temporal/vim_workflows.py
+++ b/osm_lcm/temporal/vim_workflows.py
@@ -15,28 +15,21 @@
# limitations under the License.
from datetime import timedelta
-import logging
import traceback
-from osm_common.dataclasses.temporal_dataclasses import (
- DeleteVimInput,
- TestVimConnectivityInput,
- UpdateVimOperationStateInput,
- UpdateVimStateInput,
- VimOperationInput,
- VimState,
- VimOperationState,
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+from osm_common.temporal.states import VimState, VimOperationState
+from osm_common.temporal.workflows.vim import (
+ VimCreateWorkflow,
+ VimUpdateWorkflow,
+ VimDeleteWorkflow,
)
-from osm_common.temporal_constants import (
- ACTIVITY_TEST_VIM_CONNECTIVITY,
- ACTIVITY_UPDATE_VIM_OPERATION_STATE,
- ACTIVITY_UPDATE_VIM_STATE,
- ACTIVITY_DELETE_VIM,
- LCM_TASK_QUEUE,
- WORKFLOW_VIM_CREATE,
- WORKFLOW_VIM_UPDATE,
- WORKFLOW_VIM_DELETE,
+from osm_common.temporal.activities.vim import (
+ DeleteVimRecord,
+ UpdateVimState,
+ UpdateVimOperationState,
)
+from osm_common.temporal.activities.paas import TestVimConnectivity
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError
@@ -46,24 +39,24 @@
default_schedule_to_close_timeout = timedelta(minutes=10)
-@workflow.defn(name=WORKFLOW_VIM_CREATE, sandboxed=_SANDBOXED)
-class VimCreateWorkflow:
- """Updates VIM account state by validating the VIM connectivity."""
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
-
+@workflow.defn(name=VimCreateWorkflow.__name__, sandboxed=False)
+class VimCreateWorkflowImpl(VimCreateWorkflow):
@workflow.run
- async def run(self, input: VimOperationInput) -> None:
- vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ENABLED, "Done")
- op_state = UpdateVimOperationStateInput(
- input.vim_uuid, input.op_id, VimOperationState.COMPLETED, "Done"
+ async def run(self, workflow_input: VimCreateWorkflow.Input) -> None:
+ vim_state = UpdateVimState.Input(
+ workflow_input.vim_uuid, VimState.ENABLED, "Done"
+ )
+ op_state = UpdateVimOperationState.Input(
+ workflow_input.vim_uuid,
+ workflow_input.op_id,
+ VimOperationState.COMPLETED,
+ "Done",
)
try:
await workflow.execute_activity(
- activity=ACTIVITY_TEST_VIM_CONNECTIVITY,
- arg=TestVimConnectivityInput(input.vim_uuid),
- activity_id=f"{ACTIVITY_TEST_VIM_CONNECTIVITY}-{input.vim_uuid}",
+ activity=TestVimConnectivity.__name__,
+ arg=TestVimConnectivity.Input(workflow_input.vim_uuid),
+ activity_id=f"{TestVimConnectivity.__name__}-{workflow_input.vim_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
@@ -71,64 +64,72 @@
except ActivityError as e:
error_details = str(e.cause.with_traceback(e.__traceback__))
- vim_state = UpdateVimStateInput(
- input.vim_uuid, VimState.ERROR, str(e.cause.message)
+ vim_state = UpdateVimState.Input(
+ workflow_input.vim_uuid, VimState.ERROR, str(e.cause.message)
)
- op_state = UpdateVimOperationStateInput(
- input.vim_uuid,
- input.op_id,
+ op_state = UpdateVimOperationState.Input(
+ workflow_input.vim_uuid,
+ workflow_input.op_id,
VimOperationState.FAILED,
error_details,
)
await self.update_states(op_state, vim_state)
- self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}")
+ self.logger.error(
+ f"{VimCreateWorkflow.__name__} failed with {error_details}"
+ )
raise e
except Exception as e:
error_details = str(traceback.format_exc())
- vim_state = UpdateVimStateInput(input.vim_uuid, VimState.ERROR, str(e))
- op_state = UpdateVimOperationStateInput(
- input.vim_uuid,
- input.op_id,
+ vim_state = UpdateVimState.Input(
+ workflow_input.vim_uuid, VimState.ERROR, str(e)
+ )
+ op_state = UpdateVimOperationState.Input(
+ workflow_input.vim_uuid,
+ workflow_input.op_id,
VimOperationState.FAILED,
error_details,
)
await self.update_states(op_state, vim_state)
- self.logger.error(f"{WORKFLOW_VIM_CREATE} failed with {error_details}")
+ self.logger.error(
+ f"{VimCreateWorkflow.__name__} failed with {error_details}"
+ )
raise e
await self.update_states(op_state, vim_state)
async def update_states(
self,
- op_state: UpdateVimOperationStateInput,
- vim_state: UpdateVimStateInput,
+ op_state: UpdateVimOperationState.Input,
+ vim_state: UpdateVimState.Input,
):
raised_exceptions = []
try:
await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_VIM_STATE,
+ activity=UpdateVimState.__name__,
arg=vim_state,
- activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{vim_state.vim_uuid}",
+ activity_id=f"{UpdateVimState.__name__}-{vim_state.vim_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
except ActivityError as e:
raised_exceptions.append(e)
- self.logger.error(f"{WORKFLOW_VIM_CREATE} failed to update VIM state.")
+ self.logger.error(
+ f"{VimCreateWorkflow.__name__} failed to update VIM state."
+ )
try:
await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_VIM_OPERATION_STATE,
+ activity=UpdateVimOperationState.__name__,
arg=op_state,
- activity_id=f"{ACTIVITY_UPDATE_VIM_OPERATION_STATE}-{op_state.vim_uuid}",
+ activity_id=f"{UpdateVimOperationState.__name__}-{op_state.vim_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
except ActivityError as e:
self.logger.error(
- f"{WORKFLOW_VIM_CREATE} failed to update VIM operation state."
+ f"{VimCreateWorkflow.__name__} failed to update VIM operation state."
)
raised_exceptions.append(e)
@@ -138,25 +139,21 @@
raise raised_exceptions[0]
-@workflow.defn(name=WORKFLOW_VIM_UPDATE, sandboxed=_SANDBOXED)
-class VimUpdateWorkflow(VimCreateWorkflow):
- """Updates VIM account state by validating the VIM connectivity."""
-
+@workflow.defn(name=VimUpdateWorkflow.__name__, sandboxed=False)
+class VimUpdateWorkflowImpl(VimCreateWorkflowImpl):
@workflow.run
- async def run(self, input: VimOperationInput) -> None:
- await super().run(input)
+ async def run(self, workflow_input: VimUpdateWorkflow.Input) -> None:
+ await super().run(workflow_input)
-@workflow.defn(name=WORKFLOW_VIM_DELETE, sandboxed=_SANDBOXED)
-class VimDeleteWorkflow:
- """Deletes VIM accounts."""
-
+@workflow.defn(name=VimDeleteWorkflow.__name__, sandboxed=False)
+class VimDeleteWorkflowImpl(VimDeleteWorkflow):
@workflow.run
- async def run(self, input: VimOperationInput) -> None:
+ async def run(self, workflow_input: VimDeleteWorkflow.Input) -> None:
await workflow.execute_activity(
- activity=ACTIVITY_DELETE_VIM,
- arg=DeleteVimInput(input.vim_uuid),
- activity_id=f"{ACTIVITY_UPDATE_VIM_STATE}-{input.vim_uuid}",
+ activity=DeleteVimRecord.__name__,
+ arg=DeleteVimRecord.Input(workflow_input.vim_uuid),
+ activity_id=f"{DeleteVimRecord.__name__}-{workflow_input.vim_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py
index 9581db0..156bc59 100644
--- a/osm_lcm/temporal/vnf_activities.py
+++ b/osm_lcm/temporal/vnf_activities.py
@@ -13,158 +13,65 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
+from temporalio import activity
from typing import List, Any
-from osm_common.temporal_constants import (
- ACTIVITY_CHANGE_VNF_STATE,
- ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
- ACTIVITY_GET_TASK_QUEUE,
- ACTIVITY_GET_VIM_CLOUD,
- ACTIVITY_GET_VNF_DESCRIPTOR,
- ACTIVITY_GET_VNF_RECORD,
- ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
- ACTIVITY_SET_VNF_MODEL,
+from osm_common.temporal.activities.vnf import (
+ GetTaskQueue,
+ GetVimCloud,
+ GetVnfDescriptor,
+ GetVnfRecord,
+ ChangeVnfState,
+ ChangeVnfInstantiationState,
+ SetVnfModel,
+ SendNotificationForVnf,
+)
+from osm_common.temporal.dataclasses_common import VduComputeConstraints
+from osm_common.temporal_task_queues.task_queues_mappings import (
VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
-from osm_common.dataclasses.temporal_dataclasses import (
- ChangeVnfInstantiationStateInput,
- ChangeVnfStateInput,
- GetTaskQueueInput,
- GetTaskQueueOutput,
- GetVimCloudInput,
- GetVimCloudOutput,
- GetVnfDescriptorInput,
- GetVnfDescriptorOutput,
- GetVnfRecordInput,
- GetVnfRecordOutput,
- SetVnfModelInput,
- VduComputeConstraints,
-)
-from osm_lcm.data_utils.database.database import Database
-from temporalio import activity
-
CONFIG_IDENTIFIER = "config::"
-class VnfOperations:
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
- async def get_task_queue(
- self, get_task_queue_input: GetTaskQueueInput
- ) -> GetTaskQueueOutput:
- """Finds the appropriate task queue according to VIM type of VNF.
-
- Collaborators:
- DB read: vim_accounts, vnfrs
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- vnfr = self.db.get_one("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
+class GetTaskQueueImpl(GetTaskQueue):
+ @activity.defn(name=GetTaskQueue.__name__)
+ async def __call__(self, activity_input: GetTaskQueue.Input) -> GetTaskQueue.Output:
+ vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim_type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
- return GetTaskQueueOutput(task_queue)
+ return GetTaskQueue.Output(task_queue)
- @activity.defn(name=ACTIVITY_GET_VIM_CLOUD)
- async def get_vim_cloud(
- self, get_vim_cloud_input: GetVimCloudInput
- ) -> GetVimCloudOutput:
- """Finds the cloud by checking the VIM account of VNF.
- Collaborators:
- DB Read: vnfrs, vim_accounts
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- vnfr = self.db.get_one("vnfrs", {"_id": get_vim_cloud_input.vnfr_uuid})
+class GetVimCloudImpl(GetVimCloud):
+ @activity.defn(name=GetVimCloud.__name__)
+ async def __call__(self, activity_input: GetVimCloud.Input) -> GetVimCloud.Output:
+ vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
cloud = vim_record["config"].get("cloud", "")
self.logger.debug(f"Got the cloud type {cloud} for VNF operations.")
- return GetVimCloudOutput(cloud=cloud)
+ return GetVimCloud.Output(cloud=cloud)
- @activity.defn(name=ACTIVITY_GET_VNF_RECORD)
- async def get_vnf_record(
- self, get_vnf_record_input: GetVnfRecordInput
- ) -> GetVnfRecordOutput:
- """Gets the VNF record and VNF descriptor from Database.
- Collaborators:
- DB read: vnfrs
-
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than 10
- second).
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- This is an idempotent activity.
-
- """
- vnfr = self.db.get_one("vnfrs", {"_id": get_vnf_record_input.vnfr_uuid})
+class GetVnfRecordImpl(GetVnfRecord):
+ @activity.defn(name=GetVnfRecord.__name__)
+ async def __call__(self, activity_input: GetVnfRecord.Input) -> GetVnfRecord.Output:
+ vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
self.logger.debug("Got the vnfr from Database for VNF operations.")
- return GetVnfRecordOutput(vnfr=vnfr)
+ return GetVnfRecord.Output(vnfr=vnfr)
- @activity.defn(name=ACTIVITY_GET_VNF_DESCRIPTOR)
- async def get_vnf_descriptor(
- self, get_vnf_descriptor_input: GetVnfDescriptorInput
- ) -> GetVnfDescriptorOutput:
- """Gets the VNF record and VNF descriptor from Database.
- Collaborators:
- DB read: vnfds
+class GetVnfDescriptorImpl(GetVnfDescriptor):
+ @activity.defn(name=GetVnfDescriptor.__name__)
+ async def __call__(
+ self, activity_input: GetVnfDescriptor.Input
+ ) -> GetVnfDescriptor.Output:
+ vnfd = self.db.get_one("vnfds", {"_id": activity_input.vnfd_uuid})
+ return GetVnfDescriptor.Output(vnfd=vnfd)
- Raises (retryable):
- DbException: If DB read operations fail, the collection or DB record ID does not exist.
- Activity Lifecycle:
- This activity should complete relatively quickly (less than 10
- second).
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- This is an idempotent activity.
-
- """
- vnfd = self.db.get_one("vnfds", {"_id": get_vnf_descriptor_input.vnfd_uuid})
- self.logger.debug("Got the vnfr and vnfd from Database for VNF operations.")
- return GetVnfDescriptorOutput(vnfd=vnfd)
-
+class VnfSpecifications:
@staticmethod
def get_vdu_instantiation_params(
vdu_id: str, vnf_instantiation_config: dict
@@ -179,7 +86,7 @@
compute_desc_id = vdu.get("virtual-compute-desc")
if not compute_desc_id:
return VduComputeConstraints(cores=0, mem=0)
- flavor_details = VnfOperations._get_flavor_details(compute_desc_id, vnfd)
+ flavor_details = VnfSpecifications._get_flavor_details(compute_desc_id, vnfd)
if not flavor_details:
return VduComputeConstraints(cores=0, mem=0)
@@ -198,11 +105,11 @@
def get_application_config(vdu: dict, vdu_instantiation_config: dict) -> dict:
configurable_properties = vdu.get("configurable-properties", [])
- config_from_descriptor = VnfOperations._get_only_config_items(
- VnfOperations._list_to_dict(configurable_properties)
+ config_from_descriptor = VnfSpecifications._get_only_config_items(
+ VnfSpecifications._list_to_dict(configurable_properties)
)
- config_from_instantiation = VnfOperations._get_only_config_items(
+ config_from_instantiation = VnfSpecifications._get_only_config_items(
vdu_instantiation_config
)
return {**config_from_descriptor, **config_from_instantiation}
@@ -224,125 +131,43 @@
}
-class VnfDbActivity:
- """Perform Database operations for VNF accounts.
-
- Args:
- db (Database): Data Access Object
- """
-
- def __init__(self, db: Database):
- self.db: Database = db
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_CHANGE_VNF_STATE)
- async def change_vnf_state(self, vnf_state_input: ChangeVnfStateInput) -> None:
- """Updates the VNF State in VNFR.
-
- Collaborators:
- DB Write: vnfrs
-
- Raises (retryable):
- DbException: If DB access/update fails, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- update_vnf_state = {"vnfState": vnf_state_input.state.name}
- self.db.set_one("vnfrs", {"_id": vnf_state_input.vnfr_uuid}, update_vnf_state)
+class ChangeVnfStateImpl(ChangeVnfState):
+ @activity.defn(name=ChangeVnfState.__name__)
+ async def __call__(self, activity_input: ChangeVnfState.Input) -> None:
+ update_vnf_state = {"vnfState": activity_input.state.name}
+ self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_vnf_state)
self.logger.debug(
- f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state.name}."
+ f"VNF {activity_input.vnfr_uuid} state is updated to {activity_input.state.name}."
)
- @activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
- async def change_vnf_instantiation_state(
- self, vnf_instantiation_state_input: ChangeVnfInstantiationStateInput
- ) -> None:
- """Updates the VNF Instantiation State in VNFR.
- Collaborators:
- DB Write: vnfrs
-
- Raises (retryable):
- DbException: If DB access or update fails, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
+class ChangeVnfInstantiationStateImpl(ChangeVnfInstantiationState):
+ @activity.defn(name=ChangeVnfInstantiationState.__name__)
+ async def __call__(self, activity_input: ChangeVnfInstantiationState.Input) -> None:
update_vnf_instantiation_state = {
- "instantiationState": vnf_instantiation_state_input.state.name
+ "instantiationState": activity_input.state.name
}
self.db.set_one(
"vnfrs",
- {"_id": vnf_instantiation_state_input.vnfr_uuid},
+ {"_id": activity_input.vnfr_uuid},
update_vnf_instantiation_state,
)
self.logger.debug(
- f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state.name}."
+ f"VNF {activity_input.vnfr_uuid} state is updated to {activity_input.state.name}."
)
- @activity.defn(name=ACTIVITY_SET_VNF_MODEL)
- async def set_vnf_model(self, set_vnf_model_input: SetVnfModelInput) -> None:
- """Updates the model name of VNF in VNFR.
- Collaborators:
- DB Write: vnfrs
-
- Raises (retryable):
- DbException: If DB access or update fails, the collection or DB record ID does not exist.
-
- Activity Lifecycle:
- This activity should complete relatively quickly (less than a
- second). However, it would be reasonable to wait up to 10
- seconds.
-
- This activity will not report a heartbeat due to its
- short-running nature.
-
- It is not necessary to implement a back-off strategy for this
- activity, the operation is idempotent.
-
- """
- update_namespace = {"namespace": set_vnf_model_input.model_name}
- self.db.set_one(
- "vnfrs", {"_id": set_vnf_model_input.vnfr_uuid}, update_namespace
- )
+class SetVnfModelImpl(SetVnfModel):
+ @activity.defn(name=SetVnfModel.__name__)
+ async def __call__(self, activity_input: SetVnfModel.Input) -> None:
+ update_namespace = {"namespace": activity_input.model_name}
+ self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_namespace)
self.logger.debug(
- f"VNF {set_vnf_model_input.vnfr_uuid} model name is updated to {set_vnf_model_input.model_name}."
+ f"VNF {activity_input.vnfr_uuid} model name is updated to {activity_input.model_name}."
)
-class VnfSendNotifications:
- """Perform Notification operations."""
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
-
- @activity.defn(name=ACTIVITY_SEND_NOTIFICATION_FOR_VNF)
- async def send_notification_for_vnf(
- self, input: ChangeVnfInstantiationStateInput
- ) -> None:
- """If VNF LCM operation state changes, send notification updates.
-
- This activity does nothing.
-
- """
+class SendNotificationForVnfImpl(SendNotificationForVnf):
+ @activity.defn(name=SendNotificationForVnf.__name__)
+ async def __call__(self, activity_input: SendNotificationForVnf.Input) -> None:
self.logger.debug("Send notification for VNF not implemented.")
diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py
index c121b05..940eeb9 100644
--- a/osm_lcm/temporal/vnf_workflows.py
+++ b/osm_lcm/temporal/vnf_workflows.py
@@ -13,49 +13,36 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import asyncio
from datetime import timedelta
-import logging
import traceback
from typing import Tuple
-from osm_common.dataclasses.temporal_dataclasses import (
- ChangeVnfInstantiationStateInput,
- ChangeVnfStateInput,
- GetTaskQueueInput,
- GetTaskQueueOutput,
- GetVimCloudInput,
- GetVimCloudOutput,
- GetVnfDescriptorInput,
- GetVnfDescriptorOutput,
- GetVnfRecordInput,
- GetVnfRecordOutput,
- SetVnfModelInput,
- VduInstantiateInput,
- VnfInstantiateInput,
- VnfInstantiationState,
- VnfPrepareInput,
- VnfState,
+from osm_common.temporal.states import VnfInstantiationState, VnfState
+from osm_common.temporal.activities.vnf import (
+ ChangeVnfInstantiationState,
+ ChangeVnfState,
+ GetTaskQueue,
+ GetVimCloud,
+ SetVnfModel,
+ SendNotificationForVnf,
+ GetVnfDescriptor,
+ GetVnfRecord,
)
-from osm_common.temporal_constants import (
- ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
- ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
- ACTIVITY_CHANGE_VNF_STATE,
- ACTIVITY_GET_TASK_QUEUE,
- ACTIVITY_GET_VIM_CLOUD,
- ACTIVITY_GET_VNF_RECORD,
- ACTIVITY_GET_VNF_DESCRIPTOR,
- ACTIVITY_SET_VNF_MODEL,
- LCM_TASK_QUEUE,
- WORKFLOW_VDU_INSTANTIATE,
- WORKFLOW_VNF_INSTANTIATE,
- WORKFLOW_VNF_PREPARE,
+
+from osm_common.temporal.workflows.vnf import (
+ VnfInstantiateWorkflow,
+ VnfPrepareWorkflow,
)
+from osm_common.temporal.workflows.vdu import VduInstantiateWorkflow
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
+
from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
-from osm_lcm.temporal.vnf_activities import VnfOperations
+from osm_lcm.temporal.vnf_activities import VnfSpecifications
from temporalio import workflow
-from temporalio.converter import value_to_type
from temporalio.common import RetryPolicy
+from temporalio.converter import value_to_type
from temporalio.exceptions import ActivityError, ChildWorkflowError
_SANDBOXED = False
@@ -63,40 +50,30 @@
default_schedule_to_close_timeout = timedelta(minutes=10)
-@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=_SANDBOXED)
-class VnfInstantiateWorkflow:
- """Instantiate a VNF.
-
- Workflow Identifier:
- It is recommended that the ID for the VNF is referred as a workflow
- ID when invoking this workflow.
- """
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
-
+@workflow.defn(name=VnfInstantiateWorkflow.__name__, sandboxed=False)
+class VnfInstantiateWorkflowImpl(VnfInstantiateWorkflow):
@workflow.run
- async def run(self, input: VnfInstantiateInput) -> None:
- self.logger.info(f"Deploying VNF {input.vnfr_uuid}")
+ async def run(self, workflow_input: VnfInstantiateWorkflow.Input) -> None:
+ self.logger.info(f"Deploying VNF {workflow_input.vnfr_uuid}")
try:
await self.update_vnf_instantiation_state(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid,
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
state=VnfInstantiationState.NOT_INSTANTIATED,
),
)
await self.send_notification_for_vnf(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid,
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
state=VnfInstantiationState.NOT_INSTANTIATED,
),
)
vnf_task_queue = value_to_type(
- GetTaskQueueOutput,
+ GetTaskQueue.Output,
await workflow.execute_activity(
- activity=ACTIVITY_GET_TASK_QUEUE,
- arg=GetTaskQueueInput(input.vnfr_uuid),
- activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}",
+ activity=GetTaskQueue.__name__,
+ arg=GetTaskQueue.Input(workflow_input.vnfr_uuid),
+ activity_id=f"{GetTaskQueue.__name__}-{workflow_input.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
@@ -106,18 +83,18 @@
self.logger.debug(f"Dependent task queue is {vnf_task_queue.task_queue}")
await workflow.execute_child_workflow(
- workflow=WORKFLOW_VNF_PREPARE,
- arg=input,
+ workflow=VnfPrepareWorkflow.__name__,
+ arg=workflow_input,
task_queue=vnf_task_queue.task_queue,
- id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}",
+ id=f"{VnfPrepareWorkflow.__name__}-{workflow_input.vnfr_uuid}",
)
get_vnf_record = value_to_type(
- GetVnfRecordOutput,
+ GetVnfRecord.Output,
await workflow.execute_activity(
- activity=ACTIVITY_GET_VNF_RECORD,
- arg=GetVnfRecordInput(input.vnfr_uuid),
- activity_id=f"{ACTIVITY_GET_VNF_RECORD}-{input.vnfr_uuid}",
+ activity=GetVnfRecord.__name__,
+ arg=GetVnfRecord.Input(workflow_input.vnfr_uuid),
+ activity_id=f"{GetVnfRecord.__name__}-{workflow_input.vnfr_uuid}",
task_queue=vnf_task_queue.task_queue,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
@@ -125,118 +102,142 @@
)
activities_results = await asyncio.gather(
workflow.execute_activity(
- activity=ACTIVITY_GET_VNF_DESCRIPTOR,
- arg=GetVnfDescriptorInput(get_vnf_record.vnfr["vnfd-id"]),
- activity_id=f"{ACTIVITY_GET_VNF_DESCRIPTOR}-{get_vnf_record.vnfr['vnfd-id']}",
+ activity=GetVnfDescriptor.__name__,
+ arg=GetVnfDescriptor.Input(get_vnf_record.vnfr["vnfd-id"]),
+ activity_id=f"{GetVnfDescriptor.__name__}-{get_vnf_record.vnfr['vnfd-id']}",
task_queue=vnf_task_queue.task_queue,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
),
workflow.execute_activity(
- activity=ACTIVITY_GET_VIM_CLOUD,
- arg=GetVimCloudInput(input.vnfr_uuid),
- activity_id=f"{ACTIVITY_GET_VIM_CLOUD}-{input.vnfr_uuid}",
+ activity=GetVimCloud.__name__,
+ arg=GetVimCloud.Input(workflow_input.vnfr_uuid),
+ activity_id=f"{GetVimCloud.__name__}-{workflow_input.vnfr_uuid}",
task_queue=vnf_task_queue.task_queue,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
),
)
get_vnf_descriptor, get_cloud = value_to_type(
- GetVnfDescriptorOutput, activities_results[0]
- ), value_to_type(GetVimCloudOutput, activities_results[1])
+ GetVnfDescriptor.Output, activities_results[0]
+ ), value_to_type(GetVimCloud.Output, activities_results[1])
await self.instantiate_vdus(
vnfr=get_vnf_record.vnfr,
vnfd=get_vnf_descriptor.vnfd,
task_queue=vnf_task_queue.task_queue,
cloud=get_cloud.cloud,
- vnf_instantiation_config=input.instantiation_config,
+ vnf_instantiation_config=workflow_input.instantiation_config,
)
await self.update_vnf_instantiation_state(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
),
)
await self.update_vnf_state(
- ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STARTED),
+ ChangeVnfState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid, state=VnfState.STARTED
+ ),
)
await self.send_notification_for_vnf(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
),
)
except ActivityError as e:
err_details = str(e.cause.with_traceback(e.__traceback__))
await self.update_vnf_instantiation_state(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
),
)
await self.update_vnf_state(
- ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED),
- )
- await self.send_notification_for_vnf(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid, state=VnfState.STOPPED
),
)
- self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
+ ),
+ )
+ self.logger.error(
+ f"{VnfInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
except ChildWorkflowError as e:
err_details = str(e.cause.with_traceback(e.cause.__traceback__))
await self.update_vnf_instantiation_state(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
),
)
await self.update_vnf_state(
- ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED),
- )
- await self.send_notification_for_vnf(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid, state=VnfState.STOPPED
),
)
- self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
+ ),
+ )
+ self.logger.error(
+ f"{VnfInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
except Exception as e:
err_details = str(traceback.format_exc())
await self.update_vnf_instantiation_state(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
),
)
await self.update_vnf_state(
- ChangeVnfStateInput(vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED),
- )
- await self.send_notification_for_vnf(
- ChangeVnfInstantiationStateInput(
- vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ChangeVnfState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid, state=VnfState.STOPPED
),
)
- self.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {err_details}")
+ await self.send_notification_for_vnf(
+ ChangeVnfInstantiationState.Input(
+ vnfr_uuid=workflow_input.vnfr_uuid,
+ state=VnfInstantiationState.INSTANTIATED,
+ ),
+ )
+ self.logger.error(
+ f"{VnfInstantiateWorkflow.__name__} failed with {err_details}"
+ )
raise e
@staticmethod
- async def update_vnf_state(vnf_state):
+ async def update_vnf_state(vnf_state: ChangeVnfState.Input):
await workflow.execute_activity(
- activity=ACTIVITY_CHANGE_VNF_STATE,
+ activity=ChangeVnfState.__name__,
arg=vnf_state,
- activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{vnf_state.vnfr_uuid}",
+ activity_id=f"{ChangeVnfState.__name__}-{vnf_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
@staticmethod
- async def update_vnf_instantiation_state(vnf_instantiation_state):
+ async def update_vnf_instantiation_state(
+ vnf_instantiation_state: ChangeVnfInstantiationState.Input,
+ ):
await workflow.execute_activity(
- activity=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
+ activity=ChangeVnfInstantiationState.__name__,
arg=vnf_instantiation_state,
- activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{vnf_instantiation_state.vnfr_uuid}",
+ activity_id=f"{ChangeVnfInstantiationState.__name__}-{vnf_instantiation_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
@@ -244,12 +245,12 @@
@staticmethod
async def send_notification_for_vnf(
- vnf_instantiation_state: ChangeVnfInstantiationStateInput,
+ vnf_instantiation_state: ChangeVnfInstantiationState.Input,
):
await workflow.execute_activity(
- activity=ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
+ activity=SendNotificationForVnf.__name__,
arg=vnf_instantiation_state,
- activity_id=f"{ACTIVITY_SEND_NOTIFICATION_FOR_VNF}-{vnf_instantiation_state.vnfr_uuid}",
+ activity_id=f"{SendNotificationForVnf.__name__}-{vnf_instantiation_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
@@ -267,7 +268,7 @@
(
vdu_instantiate_input,
vdu_instantiate_workflow_id,
- ) = VnfInstantiateWorkflow._get_vdu_instantiate_info(
+ ) = VnfInstantiateWorkflowImpl._get_vdu_instantiate_input(
vnfr=vnfr,
vnfd=vnfd,
vdu=vdu,
@@ -275,27 +276,27 @@
vnf_instantiation_config=vnf_instantiation_config,
)
await workflow.execute_child_workflow(
- workflow=WORKFLOW_VDU_INSTANTIATE,
+ workflow=VduInstantiateWorkflow.__name__,
arg=vdu_instantiate_input,
task_queue=task_queue,
id=vdu_instantiate_workflow_id,
)
@staticmethod
- def _get_vdu_instantiate_info(
+ def _get_vdu_instantiate_input(
vnfr, vnfd, vdu, cloud, vnf_instantiation_config
- ) -> Tuple[VduInstantiateInput, str]:
+ ) -> Tuple[VduInstantiateWorkflow.Input, str]:
"""Calculates the VDU instantiate input data without reaching Database."""
model_name = vnfr.get("namespace")
vim_id = vnfr.get("vim-account-id")
sw_image_descs = vnfd.get("sw-image-desc")
vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
- vdu_instantiation_config = VnfOperations.get_vdu_instantiation_params(
+ vdu_instantiation_config = VnfSpecifications.get_vdu_instantiation_params(
vdu["id"], vnf_instantiation_config
)
- compute_constraints = VnfOperations.get_compute_constraints(vdu, vnfd)
- config = VnfOperations.get_application_config(vdu, vdu_instantiation_config)
- vdu_instantiate_input = VduInstantiateInput(
+ compute_constraints = VnfSpecifications.get_compute_constraints(vdu, vnfd)
+ config = VnfSpecifications.get_application_config(vdu, vdu_instantiation_config)
+ vdu_instantiate_input = VduInstantiateWorkflow.Input(
vim_uuid=vim_id,
model_name=model_name,
charm_info=vdu_info,
@@ -311,35 +312,31 @@
return vdu_instantiate_input, vdu_instantiate_workflow_id
-@workflow.defn(name=WORKFLOW_VNF_PREPARE, sandboxed=_SANDBOXED)
-class VnfPrepareWorkflow:
- """Prepare a VNF.
-
- Workflow Identifier:
- It is recommended that the ID for the VNF is referred as a workflow
- ID when invoking this workflow.
- """
-
- def __init__(self):
- self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
-
+@workflow.defn(name=VnfPrepareWorkflow.__name__, sandboxed=False)
+class VnfPrepareWorkflowImpl(VnfPrepareWorkflow):
@workflow.run
- async def run(self, wf_input: VnfPrepareInput) -> None:
+ async def run(self, workflow_input: VnfPrepareWorkflow.Input) -> None:
try:
await workflow.execute_activity(
- activity=ACTIVITY_SET_VNF_MODEL,
- arg=SetVnfModelInput(wf_input.vnfr_uuid, wf_input.model_name),
- activity_id=f"{ACTIVITY_SET_VNF_MODEL}-{wf_input.vnfr_uuid}",
+ activity=SetVnfModel.__name__,
+ arg=SetVnfModel.Input(
+ workflow_input.vnfr_uuid, workflow_input.model_name
+ ),
+ activity_id=f"{SetVnfModel.__name__}-{workflow_input.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
except ActivityError as e:
err_details = str(e.cause.with_traceback(e.__traceback__))
- self.logger.error(f"{WORKFLOW_VNF_PREPARE} failed with {err_details}")
+ self.logger.error(
+ f"{VnfPrepareWorkflow.__name__} failed with {err_details}"
+ )
raise e
except Exception as e:
err_details = str(traceback.format_exc())
- self.logger.error(f"{WORKFLOW_VNF_PREPARE} failed with {err_details}")
+ self.logger.error(
+ f"{VnfPrepareWorkflow.__name__} failed with {err_details}"
+ )
raise e