blob: 98fc86c4b9586d00b9e287830ba8b017e5f1e1ce [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from temporalio import activity
from osm_common.temporal_constants import (
ACTIVITY_CHANGE_VNF_STATE,
ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
ACTIVITY_GET_TASK_QUEUE,
ACTIVITY_GET_VNF_DETAILS,
ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_SET_VNF_MODEL,
VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
from osm_common.dataclasses.temporal_dataclasses import (
ChangeVnfInstantiationStateInput,
ChangeVnfStateInput,
GetTaskQueueInput,
GetTaskQueueOutput,
GetVnfDetailsInput,
GetVnfDetailsOutput,
VnfInstantiateInput,
)
from osm_lcm.data_utils.database.database import Database
class VnfOperations:
def __init__(self, db: Database):
self.db: Database = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_GET_TASK_QUEUE)
async def get_task_queue(
self, get_task_queue_input: GetTaskQueueInput
) -> GetTaskQueueOutput:
"""Finds the appropriate task queue according to VIM type of VNF.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB read operations fail, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
vnfr = self.db.get_one("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim_type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
return GetTaskQueueOutput(task_queue)
@activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
async def get_vnf_details(
self, get_vnf_details_input: GetVnfDetailsInput
) -> GetVnfDetailsOutput:
"""Gets the VNF record and VNF descriptor from Database.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB read operations fail, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than 10
second).
This activity will not report a heartbeat due to its
short-running nature.
This is an idempotent activity.
"""
vnfr = self.db.get_one("vnfrs", {"_id": get_vnf_details_input.vnfr_uuid})
vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
self.logger.debug("Got the vnfr and vnfd from Database for VNF operations.")
return GetVnfDetailsOutput(vnfr=vnfr, vnfd=vnfd)
class VnfDbActivity:
"""Perform Database operations for NS accounts.
Args:
db (Database): Data Access Object
"""
def __init__(self, db: Database):
self.db: Database = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_CHANGE_VNF_STATE)
async def change_vnf_state(self, vnf_state_input: ChangeVnfStateInput) -> None:
"""Updates the VNF State in VNFR.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB access/update fails, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
update_vnf_state = {"vnfState": vnf_state_input.state}
self.db.set_one("vnfrs", {"_id": vnf_state_input.vnfr_uuid}, update_vnf_state)
self.logger.debug(
f"VNF {vnf_state_input.vnfr_uuid} state is updated to {vnf_state_input.state}."
)
@activity.defn(name=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE)
async def change_vnf_instantiation_state(
self, vnf_instantiation_state_input: ChangeVnfInstantiationStateInput
) -> None:
"""Updates the VNF Instantiation State in VNFR.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB access or update fails, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
update_vnf_instantiation_state = {
"instantiationState": vnf_instantiation_state_input.state
}
self.db.set_one(
"vnfrs",
{"_id": vnf_instantiation_state_input.vnfr_uuid},
update_vnf_instantiation_state,
)
self.logger.debug(
f"VNF {vnf_instantiation_state_input.vnfr_uuid} state is updated to {vnf_instantiation_state_input.state}."
)
@activity.defn(name=ACTIVITY_SET_VNF_MODEL)
async def set_vnf_model(self, set_vnf_model_input: VnfInstantiateInput) -> None:
"""Updates the model name of VNF in VNFR.
Collaborators:
DB Access Object
Raises (retryable):
DbException: If DB access or update fails, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
second). However, it would be reasonable to wait up to 10
seconds.
This activity will not report a heartbeat due to its
short-running nature.
It is not necessary to implement a back-off strategy for this
activity, the operation is idempotent.
"""
update_namespace = {"namespace": set_vnf_model_input.model_name}
self.db.set_one(
"vnfrs", {"_id": set_vnf_model_input.vnfr_uuid}, update_namespace
)
self.logger.debug(
f"VNF {set_vnf_model_input.vnfr_uuid} model name is updated to {set_vnf_model_input.model_name}."
)
class VnfSendNotifications:
"""Perform Notification operations."""
def __init__(self):
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_SEND_NOTIFICATION_FOR_VNF)
async def send_notification_for_vnf(
self, input: ChangeVnfInstantiationStateInput
) -> None:
"""If VNF LCM operation state changes, send notification updates.
This activity does nothing.
"""
self.logger.debug("Send notification for VNF not implemented.")