blob: 3ea8c6477cd2174117ac0f0b38b59a4b2722e940 [file] [log] [blame]
#########################################################################
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from temporalio import activity
from typing import List, Any
from osm_common.temporal.activities.vnf import (
GetTaskQueue,
GetVimCloud,
GetVnfDescriptor,
GetVnfRecord,
GetModelNames,
ChangeVnfState,
ChangeVnfInstantiationState,
SetVnfModel,
SendNotificationForVnf,
)
from osm_common.temporal.dataclasses_common import VduComputeConstraints
from osm_common.temporal_task_queues.task_queues_mappings import (
VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
CONFIG_IDENTIFIER = "config::"
@activity.defn(name=GetTaskQueue.__name__)
class GetTaskQueueImpl(GetTaskQueue):
async def __call__(self, activity_input: GetTaskQueue.Input) -> GetTaskQueue.Output:
vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim_type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
return GetTaskQueue.Output(task_queue)
@activity.defn(name=GetVimCloud.__name__)
class GetVimCloudImpl(GetVimCloud):
async def __call__(self, activity_input: GetVimCloud.Input) -> GetVimCloud.Output:
vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
cloud = vim_record["config"].get("cloud", "")
self.logger.debug(f"Got the cloud type {cloud} for VNF operations.")
return GetVimCloud.Output(cloud=cloud)
@activity.defn(name=GetVnfRecord.__name__)
class GetVnfRecordImpl(GetVnfRecord):
async def __call__(self, activity_input: GetVnfRecord.Input) -> GetVnfRecord.Output:
vnfr = self.db.get_one("vnfrs", {"_id": activity_input.vnfr_uuid})
self.logger.debug("Got the vnfr from Database for VNF operations.")
return GetVnfRecord.Output(vnfr=vnfr)
@activity.defn(name=GetVnfDescriptor.__name__)
class GetVnfDescriptorImpl(GetVnfDescriptor):
async def __call__(
self, activity_input: GetVnfDescriptor.Input
) -> GetVnfDescriptor.Output:
vnfd = self.db.get_one("vnfds", {"_id": activity_input.vnfd_uuid})
return GetVnfDescriptor.Output(vnfd=vnfd)
class VnfSpecifications:
@staticmethod
def get_vdu_instantiation_params(
vdu_id: str, vnf_instantiation_config: dict
) -> dict:
for vdu in vnf_instantiation_config.get("vdu", []):
if vdu.get("id") == vdu_id:
return vdu.get("configurable-properties", {})
return {}
@staticmethod
def get_compute_constraints(vdu: dict, vnfd: dict) -> VduComputeConstraints:
compute_desc_id = vdu.get("virtual-compute-desc")
if not compute_desc_id:
return VduComputeConstraints(cores=0, mem=0)
flavor_details = VnfSpecifications._get_flavor_details(compute_desc_id, vnfd)
if not flavor_details:
return VduComputeConstraints(cores=0, mem=0)
cpu_cores = flavor_details.get("virtual-cpu", {}).get("num-virtual-cpu", 0)
memory_gb = flavor_details.get("virtual-memory", {}).get("size", 0)
return VduComputeConstraints(cores=cpu_cores, mem=int(memory_gb))
@staticmethod
def _get_flavor_details(compute_desc_id: str, vnfd: dict) -> Any:
for flavor in vnfd.get("virtual-compute-desc", []):
if flavor.get("id") == compute_desc_id:
return flavor
return None
@staticmethod
def get_application_config(vdu: dict, vdu_instantiation_config: dict) -> dict:
configurable_properties = vdu.get("configurable-properties", [])
config_from_descriptor = VnfSpecifications._get_only_config_items(
VnfSpecifications._list_to_dict(configurable_properties)
)
config_from_instantiation = VnfSpecifications._get_only_config_items(
vdu_instantiation_config
)
return {**config_from_descriptor, **config_from_instantiation}
@staticmethod
def _get_only_config_items(config: dict) -> dict:
return {
key[len(CONFIG_IDENTIFIER) :]: value
for key, value in config.items()
if key.startswith(CONFIG_IDENTIFIER)
}
@staticmethod
def _list_to_dict(indata: List[dict]) -> dict:
return {
item["key"]: item["value"]
for item in indata
if item.get("key") and item.get("value")
}
@activity.defn(name=ChangeVnfState.__name__)
class ChangeVnfStateImpl(ChangeVnfState):
async def __call__(self, activity_input: ChangeVnfState.Input) -> None:
update_vnf_state = {"vnfState": activity_input.state.name}
self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_vnf_state)
self.logger.debug(
f"VNF {activity_input.vnfr_uuid} state is updated to {activity_input.state.name}."
)
@activity.defn(name=ChangeVnfInstantiationState.__name__)
class ChangeVnfInstantiationStateImpl(ChangeVnfInstantiationState):
async def __call__(self, activity_input: ChangeVnfInstantiationState.Input) -> None:
update_vnf_instantiation_state = {
"instantiationState": activity_input.state.name
}
self.db.set_one(
"vnfrs",
{"_id": activity_input.vnfr_uuid},
update_vnf_instantiation_state,
)
self.logger.debug(
f"VNF {activity_input.vnfr_uuid} state is updated to {activity_input.state.name}."
)
@activity.defn(name=SetVnfModel.__name__)
class SetVnfModelImpl(SetVnfModel):
async def __call__(self, activity_input: SetVnfModel.Input) -> None:
update_namespace = {"namespace": activity_input.model_name}
self.db.set_one("vnfrs", {"_id": activity_input.vnfr_uuid}, update_namespace)
self.logger.debug(
f"VNF {activity_input.vnfr_uuid} model name is updated to {activity_input.model_name}."
)
@activity.defn(name=SendNotificationForVnf.__name__)
class SendNotificationForVnfImpl(SendNotificationForVnf):
async def __call__(self, activity_input: SendNotificationForVnf.Input) -> None:
self.logger.debug("Send notification for VNF not implemented.")
@activity.defn(name=GetModelNames.__name__)
class GetModelNamesImpl(GetModelNames):
async def __call__(
self, activity_input: GetModelNames.Input
) -> GetModelNames.Output:
ns_id = activity_input.ns_uuid
vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_id})
self.logger.debug(
f"Retrieved {len(vnfrs)} vnf records matching {ns_id} from database."
)
return GetModelNames.Output(
model_names={vnfr["namespace"] for vnfr in vnfrs if "namespace" in vnfr}
)