NS LCM OP Workflow wrapper 59/13159/5
authorMark Beierl <mark.beierl@canonical.com>
Wed, 5 Apr 2023 20:01:41 +0000 (20:01 +0000)
committerMark Beierl <mark.beierl@canonical.com>
Thu, 6 Apr 2023 01:35:15 +0000 (01:35 +0000)
Change-Id: I21ad95463237d5b3f65a7cca58d1dbe5031e2b4a
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
osm_lcm/nglcm.py
osm_lcm/temporal/lcm_activities.py [new file with mode: 0644]
osm_lcm/temporal/lcm_workflows.py [new file with mode: 0644]
osm_lcm/temporal/vim_activities.py

index 6c6f196..0e8811b 100644 (file)
@@ -20,6 +20,7 @@ import asyncio
 import getopt
 import logging
 import logging.handlers
+import os
 import sys
 import yaml
 
@@ -29,6 +30,8 @@ from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.lcm_config import LcmCfg
 from osm_lcm.lcm_utils import LcmException
 from os import path
+from osm_lcm.temporal.lcm_activities import NsLcmActivity
+from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflow
 from osm_lcm.temporal.vim_activities import VimDbActivity, JujuPaasConnector
 from osm_lcm.temporal.vim_workflows import (
     VimCreateWorkflow,
@@ -126,20 +129,32 @@ class NGLcm:
         client = await Client.connect(temporal_api)
         data_activity_instance = VimDbActivity(self.db)
         paas_connector_instance = JujuPaasConnector(self.db)
+        nslcm_activity_instance = NsLcmActivity(self.db)
 
-        workflows = [VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow]
+        workflows = [
+            NsNoOpWorkflow,
+            VimCreateWorkflow,
+            VimDeleteWorkflow,
+            VimUpdateWorkflow,
+        ]
         activities = [
             data_activity_instance.update_vim_operation_state,
             data_activity_instance.update_vim_state,
             data_activity_instance.delete_vim_record,
+            nslcm_activity_instance.update_ns_lcm_operation_state,
+            nslcm_activity_instance.no_op,
             paas_connector_instance.test_vim_connectivity,
         ]
 
+        # Check if we are running under a debugger
+        debug = os.getenv("VSCODE_IPC_HOOK_CLI") is not None
+
         worker = Worker(
             client,
             task_queue=LCM_TASK_QUEUE,
             workflows=workflows,
             activities=activities,
+            debug_mode=debug,
         )
 
         self.logger.info("Starting LCM temporal worker")
diff --git a/osm_lcm/temporal/lcm_activities.py b/osm_lcm/temporal/lcm_activities.py
new file mode 100644 (file)
index 0000000..6a2e8bc
--- /dev/null
@@ -0,0 +1,94 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from temporalio import activity
+import time
+from osm_common.dataclasses.temporal_dataclasses import (
+    NsLcmOperationInput,
+    UpdateLcmOperationStateInput,
+)
+from osm_common.temporal_constants import (
+    ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+    ACTIVITY_NSLCM_NO_OP,
+)
+
+
+class NsLcmActivity:
+    """
+    Handles NS Lifecycle Managment operations.
+    Args:
+        db  (object):       Data Access Object
+    """
+
+    def __init__(self, db):
+        self.db = db
+        self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+        self.count = 0
+
+    @activity.defn(name=ACTIVITY_NSLCM_NO_OP)
+    async def no_op(self, input: NsLcmOperationInput) -> None:
+        """
+        This is a simple No Operation Activity that simply logs the data
+        with which it was called.  It can be used as a placeholder when
+        developing workflows, or can be enhanced with logic to throw
+        exceptions on specific conditions to test exception handling in
+        a workflow.
+        """
+        self.logger.debug(f"Called with: {input.nslcmop}")
+
+    @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
+    async def update_ns_lcm_operation_state(
+        self, data: UpdateLcmOperationStateInput
+    ) -> None:
+        """
+        Changes the state of a LCM operation task.  Should be done to
+        indicate progress, or completion of the task itself.
+
+        Collaborators:
+            DB Write:           nslcmops
+
+        Raises  (Retryable):
+            DbException         If the target DB record does not exist or DB is not reachable.
+
+        Activity Lifecycle:
+            This activity will not report a heartbeat due to its
+            short-running nature.
+            As this is a direct DB update, it is not recommended to have
+            any specific retry policy
+        """
+        now = time.time()
+
+        update_lcm_operation = {
+            "_admin.modified": now,
+        }
+
+        if data.op_state is not None:
+            update_lcm_operation["operationState"] = data.op_state.name
+            update_lcm_operation["statusEnteredTime"] = now
+
+        if data.stage is not None:
+            update_lcm_operation["stage"] = data.stage
+
+        if data.error_message is not None:
+            update_lcm_operation["errorMessage"] = data.error_message
+
+        if data.detailed_status is not None:
+            update_lcm_operation["detailedStatus"] = data.detailed_status
+
+        self.db.set_one("nslcmops", {"_id": data.op_id}, update_lcm_operation)
+        self.logger.debug(
+            f"Updated LCM Operation {data.op_id} to {update_lcm_operation}"
+        )
diff --git a/osm_lcm/temporal/lcm_workflows.py b/osm_lcm/temporal/lcm_workflows.py
new file mode 100644 (file)
index 0000000..0af6324
--- /dev/null
@@ -0,0 +1,133 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from datetime import timedelta
+import logging
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
+from osm_common.dataclasses.temporal_dataclasses import (
+    NsLcmOperationInput,
+    UpdateLcmOperationStateInput,
+    LcmOperationState,
+)
+from osm_common.temporal_constants import (
+    ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+    WORKFLOW_NSLCM_NO_OP,
+    ACTIVITY_NSLCM_NO_OP,
+)
+import traceback
+
+
+class LcmOperationWorkflow(ABC):
+    """
+    An abstract base class representing a Lifecycle Management Operation.  Any
+    workflows that need LCM OP control should extend this class and implement
+    the workflow method.
+
+    Methods
+    -------
+
+    @abstractmethod workflow(input: NsLcmOperationInput)
+        Method for subclasses to implement the actual workflow that is being
+        wrapped in this operation.
+
+    @workflow.run wrap_nslcmop(input: NsLcmOperationInput)
+        Must be implemented in every subclass exactly as follows:
+        @workflow.run
+        async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
+            await super().wrap_nslcmop(input=input)
+    """
+
+    _SANDBOXED = False
+    retry_policy = RetryPolicy(maximum_attempts=3)
+    no_retry_policy = RetryPolicy(maximum_attempts=1)
+    default_schedule_to_close_timeout = timedelta(minutes=10)
+
+    def __init__(self):
+        self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
+        self.op_state = self.op_state = UpdateLcmOperationStateInput(
+            op_id=None,
+            op_state=None,
+            detailed_status="",
+            error_message="",
+            stage="",
+        )
+        self.op_id = None
+
+    @abstractmethod
+    async def workflow(self, input: NsLcmOperationInput):
+        pass
+
+    async def wrap_nslcmop(self, input: NsLcmOperationInput):
+        self.op_id = input.nslcmop["_id"]
+        self.op_state.op_id = self.op_id
+        self.op_state.op_state = LcmOperationState.PROCESSING
+        await self.update_operation_state()
+
+        try:
+            await self.workflow(input=input)
+            self.op_state.op_state = LcmOperationState.COMPLETED
+
+        except ActivityError as e:
+            self.logger.exception(e)
+            self.op_state.op_state = LcmOperationState.FAILED
+            self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__))
+            self.op_state.error_message = str(e.cause.message)
+            raise e
+
+        except Exception as e:
+            self.logger.exception(e)
+            self.op_state.op_state = LcmOperationState.FAILED
+            self.op_state.detailed_status = traceback.format_exc()
+            self.op_state.error_message = str(e)
+            raise e
+
+        finally:
+            await self.update_operation_state()
+
+    async def update_operation_state(self) -> None:
+        await workflow.execute_activity(
+            activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+            arg=self.op_state,
+            activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}",
+            schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+            retry_policy=LcmOperationWorkflow.retry_policy,
+        )
+
+
+@workflow.defn(name=WORKFLOW_NSLCM_NO_OP, sandboxed=LcmOperationWorkflow._SANDBOXED)
+class NsNoOpWorkflow(LcmOperationWorkflow):
+    """
+    This is a simple No Operation workflow that simply calls a No Operation
+    activity.  It can be used as a placeholder when developing workflows.
+    """
+
+    @workflow.run
+    async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
+        await super().wrap_nslcmop(input=input)
+
+    async def workflow(self, input: NsLcmOperationInput) -> None:
+        self.logger.debug(f"Called with: {input.nslcmop}")
+
+        await workflow.execute_activity(
+            activity=ACTIVITY_NSLCM_NO_OP,
+            arg=input,
+            activity_id=f"{ACTIVITY_NSLCM_NO_OP}-{self.op_id}",
+            schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+            retry_policy=LcmOperationWorkflow.retry_policy,
+        )
index badeae5..6307704 100644 (file)
@@ -198,7 +198,7 @@ class VimDbActivity:
         Collaborators:
             DB Delete:           vim_accounts
 
-        Raises  (Retryable):
+        Raises (Retryable):
             DbException         If the target DB record does not exist or DB is not reachable.
 
         Activity Lifecycle: