| ####################################################################################### |
| # Copyright ETSI Contributors and Others. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| from time import time |
| |
| from osm_common.dataclasses.temporal_dataclasses import ( |
| GetVnfRecordIdsInput, |
| GetVnfRecordIdsOutput, |
| UpdateNsStateInput, |
| ) |
| from osm_common.temporal_constants import ( |
| ACTIVITY_GET_VNF_RECORD_IDS, |
| ACTIVITY_UPDATE_NS_STATE, |
| ) |
| from temporalio import activity |
| |
| from osm_lcm.data_utils.database.database import Database |
| |
| |
| class NsOperations: |
| def __init__(self, db: Database): |
| self.db: Database = db |
| self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") |
| |
| @activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS) |
| async def get_vnf_record_ids( |
| self, get_vnf_record_ids_input: GetVnfRecordIdsInput |
| ) -> GetVnfRecordIdsOutput: |
| vnfrs = self.db.get_list( |
| "vnfrs", {"nsr-id-ref": get_vnf_record_ids_input.ns_uuid} |
| ) |
| return GetVnfRecordIdsOutput(vnfr_ids=[vnfr["id"] for vnfr in vnfrs]) |
| |
| |
| class NsDbActivity: |
| |
| """Perform Database operations for NS accounts. |
| |
| Args: |
| db (object): Data Access Object |
| """ |
| |
| def __init__(self, db: Database): |
| self.db: Database = db |
| self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") |
| |
| @activity.defn(name=ACTIVITY_UPDATE_NS_STATE) |
| async def update_ns_state(self, data: UpdateNsStateInput) -> None: |
| """ |
| Changes the state of the NS itself. |
| |
| Collaborators: |
| DB Write: nsrs |
| |
| Raises (Retryable): |
| DbException If the target DB record does not exist or DB is not reachable. |
| |
| Activity Lifecycle: |
| This activity will not report a heartbeat due to its |
| short-running nature. |
| |
| As this is a direct DB update, it is not recommended to have |
| any specific retry policy |
| """ |
| update_ns_state = { |
| "nsState": data.state.name, |
| # "errorDescription" : data.message, |
| "_admin.nsState": data.state.name, |
| "_admin.detailed-status": data.message, |
| "_admin.modified": time(), |
| } |
| self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state) |
| self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}") |