| ####################################################################################### |
| # Copyright ETSI Contributors and Others. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| from temporalio import activity |
| from time import time |
| from osm_common.temporal_constants import ( |
| ACTIVITY_DELETE_VIM, |
| ACTIVITY_UPDATE_VIM_OPERATION_STATE, |
| ACTIVITY_UPDATE_VIM_STATE, |
| ) |
| from osm_common.dataclasses.temporal_dataclasses import ( |
| DeleteVimInput, |
| UpdateVimOperationStateInput, |
| UpdateVimStateInput, |
| ) |
| from osm_lcm.data_utils.database.database import Database |
| |
| |
| class VimDbActivity: |
| """Perform Database operations for VIM accounts. |
| |
| Args: |
| db (Database): Data Access Object |
| """ |
| |
| def __init__(self, db: Database): |
| self.db: Database = db |
| self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") |
| |
| @activity.defn(name=ACTIVITY_UPDATE_VIM_STATE) |
| async def update_vim_state(self, data: UpdateVimStateInput) -> None: |
| """ |
| Changes the state of the VIM itself. Should be either |
| ENABLED or ERROR, however this activity does not validate |
| the state as no validation was done in OSM previously. |
| |
| Collaborators: |
| DB Write: vim_accounts |
| |
| Raises (Retryable): |
| DbException If the target DB record does not exist or DB is not reachable. |
| |
| Activity Lifecycle: |
| This activity will not report a heartbeat due to its |
| short-running nature. |
| |
| As this is a direct DB update, it is not recommended to have |
| any specific retry policy |
| """ |
| update_vim_state = { |
| "_admin.operationalState": data.operational_state.name, |
| "_admin.detailed-status": data.message, |
| "_admin.modified": time(), |
| } |
| |
| self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_vim_state) |
| self.logger.debug( |
| f"Updated VIM {data.vim_uuid} to {data.operational_state.name}" |
| ) |
| |
| @activity.defn(name=ACTIVITY_UPDATE_VIM_OPERATION_STATE) |
| async def update_vim_operation_state( |
| self, data: UpdateVimOperationStateInput |
| ) -> None: |
| """ |
| Changes the state of a VIM operation task. Should be done to |
| indicate progress, or completion of the task itself. |
| |
| Collaborators: |
| DB Write: vim_accounts |
| |
| Raises (Retryable): |
| DbException If the target DB record does not exist or DB is not reachable. |
| |
| Activity Lifecycle: |
| This activity will not report a heartbeat due to its |
| short-running nature. |
| |
| As this is a direct DB update, it is not recommended to have |
| any specific retry policy |
| """ |
| update_operation_state = { |
| f"_admin.operations.{format(data.op_id)}.operationState": data.op_state.name, |
| f"_admin.operations.{format(data.op_id)}.detailed-status": data.message, |
| "_admin.current_operation": None, |
| } |
| |
| self.db.set_one("vim_accounts", {"_id": data.vim_uuid}, update_operation_state) |
| self.logger.debug( |
| f"Updated VIM {data.vim_uuid} OP ID {data.op_id} to {data.op_state.name}" |
| ) |
| |
| @activity.defn(name=ACTIVITY_DELETE_VIM) |
| async def delete_vim_record(self, data: DeleteVimInput) -> None: |
| """ |
| Deletes the VIM record from the database. |
| |
| Collaborators: |
| DB Delete: vim_accounts |
| |
| Raises (Retryable): |
| DbException If the target DB record does not exist or DB is not reachable. |
| |
| Activity Lifecycle: |
| This activity will not report a heartbeat due to its |
| short-running nature. |
| |
| As this is a direct DB update, it is not recommended to have |
| any specific retry policy |
| """ |
| |
| self.db.del_one("vim_accounts", {"_id": data.vim_uuid}) |
| self.logger.debug(f"Removed VIM {data.vim_uuid}") |