From: Mark Beierl Date: Wed, 5 Apr 2023 20:01:41 +0000 (+0000) Subject: NS LCM OP Workflow wrapper X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F59%2F13159%2F5;p=osm%2FLCM.git NS LCM OP Workflow wrapper Change-Id: I21ad95463237d5b3f65a7cca58d1dbe5031e2b4a Signed-off-by: Mark Beierl --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index 6c6f196f..0e8811be 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -20,6 +20,7 @@ import asyncio import getopt import logging import logging.handlers +import os import sys import yaml @@ -29,6 +30,8 @@ from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.lcm_config import LcmCfg from osm_lcm.lcm_utils import LcmException from os import path +from osm_lcm.temporal.lcm_activities import NsLcmActivity +from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflow from osm_lcm.temporal.vim_activities import VimDbActivity, JujuPaasConnector from osm_lcm.temporal.vim_workflows import ( VimCreateWorkflow, @@ -126,20 +129,32 @@ class NGLcm: client = await Client.connect(temporal_api) data_activity_instance = VimDbActivity(self.db) paas_connector_instance = JujuPaasConnector(self.db) + nslcm_activity_instance = NsLcmActivity(self.db) - workflows = [VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow] + workflows = [ + NsNoOpWorkflow, + VimCreateWorkflow, + VimDeleteWorkflow, + VimUpdateWorkflow, + ] activities = [ data_activity_instance.update_vim_operation_state, data_activity_instance.update_vim_state, data_activity_instance.delete_vim_record, + nslcm_activity_instance.update_ns_lcm_operation_state, + nslcm_activity_instance.no_op, paas_connector_instance.test_vim_connectivity, ] + # Check if we are running under a debugger + debug = os.getenv("VSCODE_IPC_HOOK_CLI") is not None + worker = Worker( client, task_queue=LCM_TASK_QUEUE, workflows=workflows, activities=activities, + debug_mode=debug, ) self.logger.info("Starting LCM temporal worker") diff --git a/osm_lcm/temporal/lcm_activities.py b/osm_lcm/temporal/lcm_activities.py new file mode 100644 index 00000000..6a2e8bc0 --- /dev/null +++ b/osm_lcm/temporal/lcm_activities.py @@ -0,0 +1,94 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from temporalio import activity +import time +from osm_common.dataclasses.temporal_dataclasses import ( + NsLcmOperationInput, + UpdateLcmOperationStateInput, +) +from osm_common.temporal_constants import ( + ACTIVITY_UPDATE_LCM_OPERATION_STATE, + ACTIVITY_NSLCM_NO_OP, +) + + +class NsLcmActivity: + """ + Handles NS Lifecycle Managment operations. + Args: + db (object): Data Access Object + """ + + def __init__(self, db): + self.db = db + self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") + self.count = 0 + + @activity.defn(name=ACTIVITY_NSLCM_NO_OP) + async def no_op(self, input: NsLcmOperationInput) -> None: + """ + This is a simple No Operation Activity that simply logs the data + with which it was called. It can be used as a placeholder when + developing workflows, or can be enhanced with logic to throw + exceptions on specific conditions to test exception handling in + a workflow. + """ + self.logger.debug(f"Called with: {input.nslcmop}") + + @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE) + async def update_ns_lcm_operation_state( + self, data: UpdateLcmOperationStateInput + ) -> None: + """ + Changes the state of a LCM operation task. Should be done to + indicate progress, or completion of the task itself. + + Collaborators: + DB Write: nslcmops + + Raises (Retryable): + DbException If the target DB record does not exist or DB is not reachable. + + Activity Lifecycle: + This activity will not report a heartbeat due to its + short-running nature. + As this is a direct DB update, it is not recommended to have + any specific retry policy + """ + now = time.time() + + update_lcm_operation = { + "_admin.modified": now, + } + + if data.op_state is not None: + update_lcm_operation["operationState"] = data.op_state.name + update_lcm_operation["statusEnteredTime"] = now + + if data.stage is not None: + update_lcm_operation["stage"] = data.stage + + if data.error_message is not None: + update_lcm_operation["errorMessage"] = data.error_message + + if data.detailed_status is not None: + update_lcm_operation["detailedStatus"] = data.detailed_status + + self.db.set_one("nslcmops", {"_id": data.op_id}, update_lcm_operation) + self.logger.debug( + f"Updated LCM Operation {data.op_id} to {update_lcm_operation}" + ) diff --git a/osm_lcm/temporal/lcm_workflows.py b/osm_lcm/temporal/lcm_workflows.py new file mode 100644 index 00000000..0af6324b --- /dev/null +++ b/osm_lcm/temporal/lcm_workflows.py @@ -0,0 +1,133 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from datetime import timedelta +import logging +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError +from osm_common.dataclasses.temporal_dataclasses import ( + NsLcmOperationInput, + UpdateLcmOperationStateInput, + LcmOperationState, +) +from osm_common.temporal_constants import ( + ACTIVITY_UPDATE_LCM_OPERATION_STATE, + WORKFLOW_NSLCM_NO_OP, + ACTIVITY_NSLCM_NO_OP, +) +import traceback + + +class LcmOperationWorkflow(ABC): + """ + An abstract base class representing a Lifecycle Management Operation. Any + workflows that need LCM OP control should extend this class and implement + the workflow method. + + Methods + ------- + + @abstractmethod workflow(input: NsLcmOperationInput) + Method for subclasses to implement the actual workflow that is being + wrapped in this operation. + + @workflow.run wrap_nslcmop(input: NsLcmOperationInput) + Must be implemented in every subclass exactly as follows: + @workflow.run + async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None: + await super().wrap_nslcmop(input=input) + """ + + _SANDBOXED = False + retry_policy = RetryPolicy(maximum_attempts=3) + no_retry_policy = RetryPolicy(maximum_attempts=1) + default_schedule_to_close_timeout = timedelta(minutes=10) + + def __init__(self): + self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}") + self.op_state = self.op_state = UpdateLcmOperationStateInput( + op_id=None, + op_state=None, + detailed_status="", + error_message="", + stage="", + ) + self.op_id = None + + @abstractmethod + async def workflow(self, input: NsLcmOperationInput): + pass + + async def wrap_nslcmop(self, input: NsLcmOperationInput): + self.op_id = input.nslcmop["_id"] + self.op_state.op_id = self.op_id + self.op_state.op_state = LcmOperationState.PROCESSING + await self.update_operation_state() + + try: + await self.workflow(input=input) + self.op_state.op_state = LcmOperationState.COMPLETED + + except ActivityError as e: + self.logger.exception(e) + self.op_state.op_state = LcmOperationState.FAILED + self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__)) + self.op_state.error_message = str(e.cause.message) + raise e + + except Exception as e: + self.logger.exception(e) + self.op_state.op_state = LcmOperationState.FAILED + self.op_state.detailed_status = traceback.format_exc() + self.op_state.error_message = str(e) + raise e + + finally: + await self.update_operation_state() + + async def update_operation_state(self) -> None: + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE, + arg=self.op_state, + activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}", + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.retry_policy, + ) + + +@workflow.defn(name=WORKFLOW_NSLCM_NO_OP, sandboxed=LcmOperationWorkflow._SANDBOXED) +class NsNoOpWorkflow(LcmOperationWorkflow): + """ + This is a simple No Operation workflow that simply calls a No Operation + activity. It can be used as a placeholder when developing workflows. + """ + + @workflow.run + async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None: + await super().wrap_nslcmop(input=input) + + async def workflow(self, input: NsLcmOperationInput) -> None: + self.logger.debug(f"Called with: {input.nslcmop}") + + await workflow.execute_activity( + activity=ACTIVITY_NSLCM_NO_OP, + arg=input, + activity_id=f"{ACTIVITY_NSLCM_NO_OP}-{self.op_id}", + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.retry_policy, + ) diff --git a/osm_lcm/temporal/vim_activities.py b/osm_lcm/temporal/vim_activities.py index badeae58..6307704f 100644 --- a/osm_lcm/temporal/vim_activities.py +++ b/osm_lcm/temporal/vim_activities.py @@ -198,7 +198,7 @@ class VimDbActivity: Collaborators: DB Delete: vim_accounts - Raises (Retryable): + Raises (Retryable): DbException If the target DB record does not exist or DB is not reachable. Activity Lifecycle: