blob: 6a2e8bc0cb540e5ad329e58d71c723cb9d532675 [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from temporalio import activity
import time
from osm_common.dataclasses.temporal_dataclasses import (
NsLcmOperationInput,
UpdateLcmOperationStateInput,
)
from osm_common.temporal_constants import (
ACTIVITY_UPDATE_LCM_OPERATION_STATE,
ACTIVITY_NSLCM_NO_OP,
)
class NsLcmActivity:
"""
Handles NS Lifecycle Managment operations.
Args:
db (object): Data Access Object
"""
def __init__(self, db):
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
self.count = 0
@activity.defn(name=ACTIVITY_NSLCM_NO_OP)
async def no_op(self, input: NsLcmOperationInput) -> None:
"""
This is a simple No Operation Activity that simply logs the data
with which it was called. It can be used as a placeholder when
developing workflows, or can be enhanced with logic to throw
exceptions on specific conditions to test exception handling in
a workflow.
"""
self.logger.debug(f"Called with: {input.nslcmop}")
@activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
async def update_ns_lcm_operation_state(
self, data: UpdateLcmOperationStateInput
) -> None:
"""
Changes the state of a LCM operation task. Should be done to
indicate progress, or completion of the task itself.
Collaborators:
DB Write: nslcmops
Raises (Retryable):
DbException If the target DB record does not exist or DB is not reachable.
Activity Lifecycle:
This activity will not report a heartbeat due to its
short-running nature.
As this is a direct DB update, it is not recommended to have
any specific retry policy
"""
now = time.time()
update_lcm_operation = {
"_admin.modified": now,
}
if data.op_state is not None:
update_lcm_operation["operationState"] = data.op_state.name
update_lcm_operation["statusEnteredTime"] = now
if data.stage is not None:
update_lcm_operation["stage"] = data.stage
if data.error_message is not None:
update_lcm_operation["errorMessage"] = data.error_message
if data.detailed_status is not None:
update_lcm_operation["detailedStatus"] = data.detailed_status
self.db.set_one("nslcmops", {"_id": data.op_id}, update_lcm_operation)
self.logger.debug(
f"Updated LCM Operation {data.op_id} to {update_lcm_operation}"
)