blob: 6834eecefd26caaca8ac41e4e315c2924bb182e8 [file] [log] [blame]
#######################################################################################
# Copyright ETSI Contributors and Others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from time import time
from temporalio import activity
from osm_common.temporal_constants import (
ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
ACTIVITY_DEPLOY_NS,
ACTIVITY_GET_MODEL_INFO,
ACTIVITY_PREPARE_VNF_RECORDS,
ACTIVITY_UPDATE_NS_STATE,
)
from osm_common.dataclasses.temporal_dataclasses import (
ModelInfo,
NsInstantiateInput,
UpdateNsStateInput,
VduInstantiateInput,
)
from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
class NsOperations:
def __init__(self, db):
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
# TODO: Change to a workflow OSM-990
@activity.defn(name=ACTIVITY_DEPLOY_NS)
async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None:
vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
for vnfr in vnfrs:
await self.deploy_vnf(vnfr)
async def deploy_vnf(self, vnfr: dict):
vim_id = vnfr.get("vim-account-id")
model_name = "model-name"
vnfd_id = vnfr["vnfd-id"]
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
sw_image_descs = vnfd.get("sw-image-desc")
for vdu in vnfd.get("vdu"):
await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name)
async def deploy_vdu(
self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str
) -> None:
vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
workflow_id = (
vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
)
self.logger.info(f"TODO: Start VDU Workflow {workflow_id}")
@activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED)
async def check_ns_instantiate_finished(
self, ns_instantiate: NsInstantiateInput
) -> None:
# TODO: Implement OSM-993
pass
class NsDbActivity:
"""Perform Database operations for NS accounts.
Args:
db (object): Data Access Object
"""
def __init__(self, db):
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
@activity.defn(name=ACTIVITY_PREPARE_VNF_RECORDS)
async def prepare_vnf_records(
self, ns_instantiate_input: NsInstantiateInput
) -> None:
"""Prepare VNFs to be deployed: Add namespace to the VNFr.
Collaborators:
DB Write: vnfrs
Raises (Retryable):
DbException If the target DB record does not exist or DB is not reachable.
Activity Lifecycle:
This activity will not report a heartbeat due to its
short-running nature.
As this is a direct DB update, it is not recommended to have
any specific retry policy
"""
vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
for vnfr in vnfrs:
self._prepare_vnf_record(vnfr)
@activity.defn(name=ACTIVITY_GET_MODEL_INFO)
async def get_model_info(
self, ns_instantiate_input: NsInstantiateInput
) -> ModelInfo:
"""Returns a ModelInfo. Contains VIM ID and model name.
Collaborators:
DB Read: nsrs
Raises (Retryable):
DbException If the target DB record does not exist or DB is not reachable.
Activity Lifecycle:
This activity will not report a heartbeat due to its
short-running nature.
As this is a direct DB update, it is not recommended to have
any specific retry policy
"""
ns_uuid = ns_instantiate_input.ns_uuid
nsr = self.db.get_one("nsrs", {"_id": ns_uuid})
vim_uuid = nsr.get("datacenter")
model_name = self._get_namespace(ns_uuid, vim_uuid)
return ModelInfo(vim_uuid, model_name)
def _get_namespace(self, ns_id: str, vim_id: str) -> str:
"""The NS namespace is the combination if the NS ID and the VIM ID."""
return ns_id[-12:] + "-" + vim_id[-12:]
def _prepare_vnf_record(self, vnfr: dict) -> None:
"""Add namespace to the VNFr."""
ns_id = vnfr["nsr-id-ref"]
vim_id = vnfr["vim-account-id"]
namespace = self._get_namespace(ns_id, vim_id)
update_namespace = {"namespace": namespace}
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, update_namespace)
@activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
async def update_ns_state(self, data: UpdateNsStateInput) -> None:
"""
Changes the state of the NS itself.
Collaborators:
DB Write: nsrs
Raises (Retryable):
DbException If the target DB record does not exist or DB is not reachable.
Activity Lifecycle:
This activity will not report a heartbeat due to its
short-running nature.
As this is a direct DB update, it is not recommended to have
any specific retry policy
"""
update_ns_state = {
"nsState": data.state.name,
# "errorDescription" : data.message,
"_admin.nsState": data.state.name,
"_admin.detailed-status": data.message,
"_admin.modified": time(),
}
self.db.set_one("nsrs", {"_id": data.ns_uuid}, update_ns_state)
self.logger.debug(f"Updated NS {data.ns_uuid} to {data.state.name}")