blob: 2602eebe65bae3262e15b7231175286b582ba6d3 [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from temporalio import activity
from osm_common.temporal_constants import (
ACTIVITY_CHANGE_NF_STATE,
ACTIVITY_CHANGE_NF_INSTANTIATION_STATE,
ACTIVITY_CHANGE_NF_NOTIFICATION_STATE,
ACTIVITY_GET_TASK_QUEUE,
VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
from osm_common.dataclasses.temporal_dataclasses import (
ChangeNFInstantiationStateInput,
ChangeNFStateInput,
GetTaskQueueInput,
GetTaskQueueOutput,
)
class VnfOperations:
def __init__(self, db):
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
async def get_task_queue(
self, get_task_queue_input: GetTaskQueueInput
) -> GetTaskQueueOutput:
"""Finds the appropriate task queue according to VIM type of VNF.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB read operations fail
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]})
task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim-type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
return GetTaskQueueOutput(task_queue)
class VnfDbActivity:
"""Perform Database operations for NS accounts.
Args:
db (object): Data Access Object
"""
def __init__(self, db):
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_CHANGE_NF_STATE)
async def change_nf_state(self, nf_state_input: ChangeNFStateInput) -> None:
"""Updates the VNF State in VNFR.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB access/update fails, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
update_nf_state = {"vnfState": nf_state_input.nf_state}
self.db.set_one("vnfrs", {"_id": nf_state_input.vnfr_uuid}, update_nf_state)
self.logger.debug(
f"VNF {nf_state_input.vnfr_uuid} state is updated to {nf_state_input.nf_state}."
)
@activity.defn(name=ACTIVITY_CHANGE_NF_INSTANTIATION_STATE)
async def change_nf_instantiation_state(
self, nf_instantiation_state_input: ChangeNFInstantiationStateInput
) -> None:
"""Updates the VNF Instantiation State in VNFR.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB access or update fails
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
update_nf_instantiation_state = {
"vnfState": nf_instantiation_state_input.nf_instantiation_state
}
self.db.set_one(
"vnfrs",
{"_id": nf_instantiation_state_input.vnfr_uuid},
update_nf_instantiation_state,
)
self.logger.debug(
f"VNF {nf_instantiation_state_input.vnfr_uuid} state is updated to {nf_instantiation_state_input.nf_instantiation_state}."
)
@activity.defn(name=ACTIVITY_CHANGE_NF_NOTIFICATION_STATE)
async def change_nf_notification_state(self) -> None:
"""If VNF LCM operation state changes, send notification updates.
This activity does nothing.
"""
pass