| ####################################################################################### |
| # Copyright ETSI Contributors and Others. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from temporalio import activity |
| from time import time |
| from osm_common.temporal.activities.vim import ( |
| UpdateVimState, |
| UpdateVimOperationState, |
| DeleteVimRecord, |
| ) |
| |
| |
| @activity.defn(name=UpdateVimState.__name__) |
| class UpdateVimStateImpl(UpdateVimState): |
| async def __call__(self, activity_input: UpdateVimState.Input) -> None: |
| update_vim_state = { |
| "_admin.operationalState": activity_input.operational_state.name, |
| "_admin.detailed-status": activity_input.message, |
| "_admin.modified": time(), |
| } |
| |
| self.db.set_one( |
| "vim_accounts", {"_id": activity_input.vim_uuid}, update_vim_state |
| ) |
| self.logger.debug( |
| f"Updated VIM {activity_input.vim_uuid} to {activity_input.operational_state.name}" |
| ) |
| |
| |
| @activity.defn(name=UpdateVimOperationState.__name__) |
| class UpdateVimOperationStateImpl(UpdateVimOperationState): |
| async def __call__(self, activity_input: UpdateVimOperationState.Input) -> None: |
| update_operation_state = { |
| f"_admin.operations.{format(activity_input.op_id)}.operationState": activity_input.op_state.name, |
| f"_admin.operations.{format(activity_input.op_id)}.detailed-status": activity_input.message, |
| "_admin.current_operation": None, |
| } |
| |
| self.db.set_one( |
| "vim_accounts", {"_id": activity_input.vim_uuid}, update_operation_state |
| ) |
| self.logger.debug( |
| f"Updated VIM {activity_input.vim_uuid} OP ID {activity_input.op_id} to {activity_input.op_state.name}" |
| ) |
| |
| |
| @activity.defn(name=DeleteVimRecord.__name__) |
| class DeleteVimRecordImpl(DeleteVimRecord): |
| async def __call__(self, activity_input: DeleteVimRecord.Input) -> None: |
| self.db.del_one("vim_accounts", {"_id": activity_input.vim_uuid}) |
| self.logger.debug(f"Removed VIM {activity_input.vim_uuid}") |