Implement instantiate NS workflow
Change-Id: I61defdc64865396cd6af4a20ffb67443450bd742
Signed-off-by: Daniel Arndt <daniel.arndt@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
diff --git a/osm_lcm/temporal/lcm_workflows.py b/osm_lcm/temporal/lcm_workflows.py
index 837e586..9ab8e5d 100644
--- a/osm_lcm/temporal/lcm_workflows.py
+++ b/osm_lcm/temporal/lcm_workflows.py
@@ -14,23 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import traceback
from abc import ABC, abstractmethod
from datetime import timedelta
-import logging
+
+from osm_common.dataclasses.temporal_dataclasses import (
+ LcmOperationState,
+ NsLcmOperationInput,
+ UpdateLcmOperationStateInput,
+)
+from osm_common.temporal_constants import (
+ ACTIVITY_NSLCM_NO_OP,
+ ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+ WORKFLOW_NSLCM_NO_OP,
+)
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError
-from osm_common.dataclasses.temporal_dataclasses import (
- NsLcmOperationInput,
- UpdateLcmOperationStateInput,
- LcmOperationState,
-)
-from osm_common.temporal_constants import (
- ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- WORKFLOW_NSLCM_NO_OP,
- ACTIVITY_NSLCM_NO_OP,
-)
-import traceback
class LcmOperationWorkflow(ABC):
@@ -60,14 +61,8 @@
def __init__(self):
self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
- self.op_state = self.op_state = UpdateLcmOperationStateInput(
- op_id=None,
- op_state=None,
- detailed_status="",
- error_message="",
- stage="",
- )
self.op_id = None
+ self.stage = ""
@abstractmethod
async def workflow(self, input: NsLcmOperationInput):
@@ -75,36 +70,47 @@
async def wrap_nslcmop(self, input: NsLcmOperationInput):
self.op_id = input.nslcmop["_id"]
- self.op_state.op_id = self.op_id
- self.op_state.op_state = LcmOperationState.PROCESSING
- await self.update_operation_state()
-
+ await self.update_operation_state(LcmOperationState.PROCESSING)
try:
await self.workflow(input=input)
- self.op_state.op_state = LcmOperationState.COMPLETED
-
except ActivityError as e:
# TODO: Deteremine the best content for Activity Errors OSM-994
self.logger.exception(e)
- self.op_state.op_state = LcmOperationState.FAILED
- self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__))
- self.op_state.error_message = str(e.cause.message)
+ await self.update_operation_state(
+ LcmOperationState.FAILED,
+ error_message=str(e.cause.message),
+ detailed_status=str(e.cause.with_traceback(e.__traceback__)),
+ )
raise e
-
except Exception as e:
self.logger.exception(e)
- self.op_state.op_state = LcmOperationState.FAILED
- self.op_state.detailed_status = traceback.format_exc()
- self.op_state.error_message = str(e)
+ self.update_operation_state(
+ LcmOperationState.FAILED,
+ error_message=str(e),
+ detailed_status=traceback.format_exc(),
+ )
raise e
+ await self.update_operation_state(LcmOperationState.COMPLETED)
- finally:
- await self.update_operation_state()
-
- async def update_operation_state(self) -> None:
+ async def update_operation_state(
+ self,
+ op_state: LcmOperationState,
+ stage: str = None,
+ error_message: str = "",
+ detailed_status: str = "",
+ ) -> None:
+ if stage is not None:
+ self.stage = stage
+ input = UpdateLcmOperationStateInput(
+ op_id=self.op_id,
+ op_state=op_state,
+ stage=self.stage,
+ error_message=error_message,
+ detailed_status=detailed_status,
+ )
await workflow.execute_activity(
activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- arg=self.op_state,
+ arg=input,
activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}",
schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
retry_policy=LcmOperationWorkflow.retry_policy,
diff --git a/osm_lcm/temporal/ns_activities.py b/osm_lcm/temporal/ns_activities.py
index eee4325..3ea7dcf 100644
--- a/osm_lcm/temporal/ns_activities.py
+++ b/osm_lcm/temporal/ns_activities.py
@@ -16,20 +16,17 @@
import logging
from time import time
-from temporalio import activity
+
+from osm_common.dataclasses.temporal_dataclasses import (
+ GetVnfRecordIdsInput,
+ GetVnfRecordIdsOutput,
+ UpdateNsStateInput,
+)
from osm_common.temporal_constants import (
- ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
- ACTIVITY_DEPLOY_NS,
- ACTIVITY_GET_MODEL_INFO,
+ ACTIVITY_GET_VNF_RECORD_IDS,
ACTIVITY_UPDATE_NS_STATE,
)
-from osm_common.dataclasses.temporal_dataclasses import (
- ModelInfo,
- NsInstantiateInput,
- UpdateNsStateInput,
- VduInstantiateInput,
-)
-from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+from temporalio import activity
class NsOperations:
@@ -37,38 +34,12 @@
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- # TODO: Change to a workflow OSM-990
- @activity.defn(name=ACTIVITY_DEPLOY_NS)
- async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None:
- vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
- for vnfr in vnfrs:
- await self.deploy_vnf(vnfr)
-
- async def deploy_vnf(self, vnfr: dict):
- vim_id = vnfr.get("vim-account-id")
- model_name = "model-name"
- vnfd_id = vnfr["vnfd-id"]
- vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
- sw_image_descs = vnfd.get("sw-image-desc")
- for vdu in vnfd.get("vdu"):
- await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name)
-
- async def deploy_vdu(
- self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str
- ) -> None:
- vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
- vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
- workflow_id = (
- vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
- )
- self.logger.info(f"TODO: Start VDU Workflow {workflow_id}")
-
- @activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED)
- async def check_ns_instantiate_finished(
- self, ns_instantiate: NsInstantiateInput
- ) -> None:
- # TODO: Implement OSM-993
- pass
+ @activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS)
+ async def get_vnf_records(
+ self, get_vnf_records_input: GetVnfRecordIdsInput
+ ) -> GetVnfRecordIdsOutput:
+ # TODO: [OSMENG-1043] Implement Get VNF Records
+ return GetVnfRecordIdsOutput(vnfr_ids=[""])
class NsDbActivity:
@@ -83,37 +54,6 @@
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- @activity.defn(name=ACTIVITY_GET_MODEL_INFO)
- async def get_model_info(
- self, ns_instantiate_input: NsInstantiateInput
- ) -> ModelInfo:
- """Returns a ModelInfo. Contains VIM ID and model name.
-
- Collaborators:
- DB Read: nsrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
-
- """
- ns_uuid = ns_instantiate_input.ns_uuid
- nsr = self.db.get_one("nsrs", {"_id": ns_uuid})
- vim_uuid = nsr.get("datacenter")
- model_name = self._get_namespace(ns_uuid, vim_uuid)
- return ModelInfo(vim_uuid, model_name)
-
- @staticmethod
- def _get_namespace(ns_id: str, vim_id: str) -> str:
- """The NS namespace is the combination if the NS ID and the VIM ID."""
- return ns_id[-12:] + "-" + vim_id[-12:]
-
@activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
async def update_ns_state(self, data: UpdateNsStateInput) -> None:
"""
diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py
index bef6766..e78392e 100644
--- a/osm_lcm/temporal/ns_workflows.py
+++ b/osm_lcm/temporal/ns_workflows.py
@@ -14,26 +14,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from temporalio import workflow
-from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError
+import asyncio
from osm_common.dataclasses.temporal_dataclasses import (
+ GetVnfRecordIdsInput,
+ GetVnfRecordIdsOutput,
ModelInfo,
- NsInstantiateInput,
NsLcmOperationInput,
NsState,
UpdateNsStateInput,
+ VnfInstantiateInput,
)
from osm_common.temporal_constants import (
ACTIVITY_CREATE_MODEL,
- ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
- ACTIVITY_DEPLOY_NS,
- ACTIVITY_GET_MODEL_INFO,
+ ACTIVITY_GET_VNF_RECORD_IDS,
ACTIVITY_UPDATE_NS_STATE,
- LCM_TASK_QUEUE,
WORKFLOW_NS_INSTANTIATE,
+ WORKFLOW_VNF_INSTANTIATE,
)
+from temporalio import workflow
+from temporalio.converter import value_to_type
+from temporalio.exceptions import ActivityError
from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow
@@ -47,72 +48,70 @@
await super().wrap_nslcmop(input=input)
async def workflow(self, input: NsLcmOperationInput) -> None:
- # Temporary until we align on what is really needed where. For now, this
- # reduced set looks to be all that is needed.
- input = NsInstantiateInput(
- ns_uuid=input.nslcmop["nsInstanceId"], op_id=input.nslcmop["_id"]
- )
+ self.logger.info(f"Executing {WORKFLOW_NS_INSTANTIATE} with {input}")
- ns_state = UpdateNsStateInput(input.ns_uuid, NsState.INSTANTIATED, "Done")
+ # TODO: Can we clean up the input? Perhaps this workflow could receive NsInstantiateInput
+ # directly.
+ ns_uuid = input.nslcmop["nsInstanceId"]
+ vim_uuid = input.nslcmop["operationParams"]["vimAccountId"]
+ model_name = self._get_namespace(ns_uuid, vim_uuid)
try:
- model_info = value_to_type(
- ModelInfo,
+ await workflow.execute_activity(
+ activity=ACTIVITY_CREATE_MODEL,
+ arg=ModelInfo(vim_uuid=vim_uuid, model_name=model_name),
+ activity_id=f"{ACTIVITY_CREATE_MODEL}-{ns_uuid}",
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.no_retry_policy,
+ )
+
+ # TODO: Provide the correct VNFR ids to the VNF instantiate workflow
+ vnf_record_ids_output: GetVnfRecordIdsOutput = value_to_type(
+ GetVnfRecordIdsOutput,
await workflow.execute_activity(
- activity=ACTIVITY_GET_MODEL_INFO,
- arg=input,
- activity_id=f"{ACTIVITY_GET_MODEL_INFO}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
+ activity=ACTIVITY_GET_VNF_RECORD_IDS,
+ arg=GetVnfRecordIdsInput(ns_uuid=ns_uuid),
+ activity_id=f"{ACTIVITY_GET_VNF_RECORD_IDS}-{ns_uuid}",
schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
retry_policy=LcmOperationWorkflow.no_retry_policy,
),
)
-
- await workflow.execute_activity(
- activity=ACTIVITY_CREATE_MODEL,
- arg=model_info,
- activity_id=f"{ACTIVITY_CREATE_MODEL}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
- )
-
- # TODO: Change this to a workflow OSM-990
- await workflow.execute_activity(
- activity=ACTIVITY_DEPLOY_NS,
- arg=input,
- activity_id=f"{ACTIVITY_DEPLOY_NS}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
- )
-
- await workflow.execute_activity(
- activity=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
- arg=input,
- activity_id=f"{ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
+ asyncio.gather(
+ workflow.execute_child_workflow(
+ workflow=WORKFLOW_VNF_INSTANTIATE,
+ arg=VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name),
+ id=f"{WORKFLOW_VNF_INSTANTIATE}-{vnfr_uuid}",
+ )
+ for vnfr_uuid in vnf_record_ids_output.vnfr_ids
)
except ActivityError as e:
- ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, e.cause.message)
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, e.cause.message)
self.logger.error(
f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}"
)
raise e
except Exception as e:
- ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, str(e))
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, str(e))
self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}")
raise e
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, "Done")
- finally:
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_NS_STATE,
- arg=ns_state,
- activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
- )
+ @staticmethod
+ async def update_ns_state(
+ ns_uuid: str,
+ state: NsState,
+ message: str,
+ ) -> None:
+ input = UpdateNsStateInput(ns_uuid, state, message)
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_NS_STATE,
+ arg=input,
+ activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{ns_uuid}",
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.retry_policy,
+ )
+
+ def _get_namespace(self, ns_id: str, vim_id: str) -> str:
+ """The NS namespace is the combination if the NS ID and the VIM ID."""
+ return ns_id[-12:] + "-" + vim_id[-12:]