blob: 12ffc764aab0480af8e7f3824de6fd621e13910d [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from time import time
from osm_common.dataclasses.temporal_dataclasses import (
GetVnfRecordIdsInput,
GetVnfRecordIdsOutput,
UpdateNsStateInput,
)
from osm_common.temporal_constants import (
ACTIVITY_GET_VNF_RECORD_IDS,
ACTIVITY_UPDATE_NS_STATE,
)
from temporalio import activity
from osm_lcm.data_utils.database.database import Database
class NsOperations:
def __init__(self, db: Database):
self.db: Database = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS)
async def get_vnf_record_ids(
self, get_vnf_record_ids_input: GetVnfRecordIdsInput
) -> GetVnfRecordIdsOutput:
vnfrs = self.db.get_list(
"vnfrs", {"nsr-id-ref": get_vnf_record_ids_input.ns_uuid}
)
return GetVnfRecordIdsOutput(vnfr_ids=[vnfr["id"] for vnfr in vnfrs])
class NsDbActivity:
"""Perform Database operations for NS accounts.
Args:
db (object): Data Access Object
"""
def __init__(self, db: Database):
self.db: Database = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
async def update_ns_state(self, data: UpdateNsStateInput) -> None:
"""
Changes the state of the NS itself.
Collaborators:
DB Write: nsrs
Raises (Retryable):
DbException If the target DB record does not exist or DB is not reachable.
Activity Lifecycle:
This activity will not report a heartbeat due to its
short-running nature.
As this is a direct DB update, it is not recommended to have
any specific retry policy
"""
update_ns_state = {
"nsState": data.state.name,
# "errorDescription" : data.message,
"_admin.nsState": data.state.name,
"_admin.detailed-status": data.message,
"_admin.modified": time(),
}
self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state)
self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}")