import getopt
import logging
import logging.handlers
+import os
import sys
import yaml
from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.lcm_utils import LcmException
from os import path
+from osm_lcm.temporal.lcm_activities import NsLcmActivity
+from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflow
from osm_lcm.temporal.vim_activities import VimDbActivity, JujuPaasConnector
from osm_lcm.temporal.vim_workflows import (
VimCreateWorkflow,
client = await Client.connect(temporal_api)
data_activity_instance = VimDbActivity(self.db)
paas_connector_instance = JujuPaasConnector(self.db)
+ nslcm_activity_instance = NsLcmActivity(self.db)
- workflows = [VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow]
+ workflows = [
+ NsNoOpWorkflow,
+ VimCreateWorkflow,
+ VimDeleteWorkflow,
+ VimUpdateWorkflow,
+ ]
activities = [
data_activity_instance.update_vim_operation_state,
data_activity_instance.update_vim_state,
data_activity_instance.delete_vim_record,
+ nslcm_activity_instance.update_ns_lcm_operation_state,
+ nslcm_activity_instance.no_op,
paas_connector_instance.test_vim_connectivity,
]
+ # Check if we are running under a debugger
+ debug = os.getenv("VSCODE_IPC_HOOK_CLI") is not None
+
worker = Worker(
client,
task_queue=LCM_TASK_QUEUE,
workflows=workflows,
activities=activities,
+ debug_mode=debug,
)
self.logger.info("Starting LCM temporal worker")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from temporalio import activity
+import time
+from osm_common.dataclasses.temporal_dataclasses import (
+ NsLcmOperationInput,
+ UpdateLcmOperationStateInput,
+)
+from osm_common.temporal_constants import (
+ ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+ ACTIVITY_NSLCM_NO_OP,
+)
+
+
+class NsLcmActivity:
+ """
+ Handles NS Lifecycle Managment operations.
+ Args:
+ db (object): Data Access Object
+ """
+
+ def __init__(self, db):
+ self.db = db
+ self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
+ self.count = 0
+
+ @activity.defn(name=ACTIVITY_NSLCM_NO_OP)
+ async def no_op(self, input: NsLcmOperationInput) -> None:
+ """
+ This is a simple No Operation Activity that simply logs the data
+ with which it was called. It can be used as a placeholder when
+ developing workflows, or can be enhanced with logic to throw
+ exceptions on specific conditions to test exception handling in
+ a workflow.
+ """
+ self.logger.debug(f"Called with: {input.nslcmop}")
+
+ @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
+ async def update_ns_lcm_operation_state(
+ self, data: UpdateLcmOperationStateInput
+ ) -> None:
+ """
+ Changes the state of a LCM operation task. Should be done to
+ indicate progress, or completion of the task itself.
+
+ Collaborators:
+ DB Write: nslcmops
+
+ Raises (Retryable):
+ DbException If the target DB record does not exist or DB is not reachable.
+
+ Activity Lifecycle:
+ This activity will not report a heartbeat due to its
+ short-running nature.
+ As this is a direct DB update, it is not recommended to have
+ any specific retry policy
+ """
+ now = time.time()
+
+ update_lcm_operation = {
+ "_admin.modified": now,
+ }
+
+ if data.op_state is not None:
+ update_lcm_operation["operationState"] = data.op_state.name
+ update_lcm_operation["statusEnteredTime"] = now
+
+ if data.stage is not None:
+ update_lcm_operation["stage"] = data.stage
+
+ if data.error_message is not None:
+ update_lcm_operation["errorMessage"] = data.error_message
+
+ if data.detailed_status is not None:
+ update_lcm_operation["detailedStatus"] = data.detailed_status
+
+ self.db.set_one("nslcmops", {"_id": data.op_id}, update_lcm_operation)
+ self.logger.debug(
+ f"Updated LCM Operation {data.op_id} to {update_lcm_operation}"
+ )
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from datetime import timedelta
+import logging
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
+from osm_common.dataclasses.temporal_dataclasses import (
+ NsLcmOperationInput,
+ UpdateLcmOperationStateInput,
+ LcmOperationState,
+)
+from osm_common.temporal_constants import (
+ ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+ WORKFLOW_NSLCM_NO_OP,
+ ACTIVITY_NSLCM_NO_OP,
+)
+import traceback
+
+
+class LcmOperationWorkflow(ABC):
+ """
+ An abstract base class representing a Lifecycle Management Operation. Any
+ workflows that need LCM OP control should extend this class and implement
+ the workflow method.
+
+ Methods
+ -------
+
+ @abstractmethod workflow(input: NsLcmOperationInput)
+ Method for subclasses to implement the actual workflow that is being
+ wrapped in this operation.
+
+ @workflow.run wrap_nslcmop(input: NsLcmOperationInput)
+ Must be implemented in every subclass exactly as follows:
+ @workflow.run
+ async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
+ await super().wrap_nslcmop(input=input)
+ """
+
+ _SANDBOXED = False
+ retry_policy = RetryPolicy(maximum_attempts=3)
+ no_retry_policy = RetryPolicy(maximum_attempts=1)
+ default_schedule_to_close_timeout = timedelta(minutes=10)
+
+ def __init__(self):
+ self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
+ self.op_state = self.op_state = UpdateLcmOperationStateInput(
+ op_id=None,
+ op_state=None,
+ detailed_status="",
+ error_message="",
+ stage="",
+ )
+ self.op_id = None
+
+ @abstractmethod
+ async def workflow(self, input: NsLcmOperationInput):
+ pass
+
+ async def wrap_nslcmop(self, input: NsLcmOperationInput):
+ self.op_id = input.nslcmop["_id"]
+ self.op_state.op_id = self.op_id
+ self.op_state.op_state = LcmOperationState.PROCESSING
+ await self.update_operation_state()
+
+ try:
+ await self.workflow(input=input)
+ self.op_state.op_state = LcmOperationState.COMPLETED
+
+ except ActivityError as e:
+ self.logger.exception(e)
+ self.op_state.op_state = LcmOperationState.FAILED
+ self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__))
+ self.op_state.error_message = str(e.cause.message)
+ raise e
+
+ except Exception as e:
+ self.logger.exception(e)
+ self.op_state.op_state = LcmOperationState.FAILED
+ self.op_state.detailed_status = traceback.format_exc()
+ self.op_state.error_message = str(e)
+ raise e
+
+ finally:
+ await self.update_operation_state()
+
+ async def update_operation_state(self) -> None:
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+ arg=self.op_state,
+ activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}",
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.retry_policy,
+ )
+
+
+@workflow.defn(name=WORKFLOW_NSLCM_NO_OP, sandboxed=LcmOperationWorkflow._SANDBOXED)
+class NsNoOpWorkflow(LcmOperationWorkflow):
+ """
+ This is a simple No Operation workflow that simply calls a No Operation
+ activity. It can be used as a placeholder when developing workflows.
+ """
+
+ @workflow.run
+ async def wrap_nslcmop(self, input: NsLcmOperationInput) -> None:
+ await super().wrap_nslcmop(input=input)
+
+ async def workflow(self, input: NsLcmOperationInput) -> None:
+ self.logger.debug(f"Called with: {input.nslcmop}")
+
+ await workflow.execute_activity(
+ activity=ACTIVITY_NSLCM_NO_OP,
+ arg=input,
+ activity_id=f"{ACTIVITY_NSLCM_NO_OP}-{self.op_id}",
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.retry_policy,
+ )