from osm_common.temporal_constants import (
ACTIVITY_CHANGE_VNF_STATE,
ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
- ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_GET_TASK_QUEUE,
- VIM_TYPE_TASK_QUEUE_MAPPINGS,
+ ACTIVITY_GET_VNF_DETAILS,
+ ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_SET_VNF_MODEL,
+ VIM_TYPE_TASK_QUEUE_MAPPINGS,
)
from osm_common.dataclasses.temporal_dataclasses import (
ChangeVnfInstantiationStateInput,
ChangeVnfStateInput,
GetTaskQueueInput,
GetTaskQueueOutput,
+ GetVnfDetailsInput,
+ GetVnfDetailsOutput,
VnfInstantiateInput,
)
DB Access Object
Raises (retryable):
- DbException: If DB read operations fail
+ DbException: If DB read operations fail, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
activity, the operation is idempotent.
"""
- vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
- vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]})
+ vnfr = self.db.get_one("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
+ vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim-type"]]
self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
return GetTaskQueueOutput(task_queue)
+ @activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
+ async def get_vnf_details(
+ self, get_vnf_details_input: GetVnfDetailsInput
+ ) -> GetVnfDetailsOutput:
+ """Gets the VNF record and VNF descriptor from Database.
+
+ Collaborators:
+ DB Access Object
+
+ Raises (retryable):
+ DbException: If DB read operations fail, the collection or DB record ID does not exist.
+
+ Activity Lifecycle:
+ This activity should complete relatively quickly (less than 10
+ second).
+
+ This activity will not report a heartbeat due to its
+ short-running nature.
+
+ This is an idempotent activity.
+
+ """
+ vnfr = self.db.get_one("vnfrs", {"_id": get_vnf_details_input.vnfr_uuid})
+ vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
+ self.logger.debug("Got the vnfr and vnfd from Database for VNF operations.")
+ return GetVnfDetailsOutput(vnfr=vnfr, vnfd=vnfd)
+
class VnfDbActivity:
DB Access Object
Raises (retryable):
- DbException: If DB access or update fails
+ DbException: If DB access or update fails, the collection or DB record ID does not exist.
Activity Lifecycle:
This activity should complete relatively quickly (less than a
"""
update_vnf_instantiation_state = {
- "vnfState": vnf_instantiation_state_input.state
+ "instantiationState": vnf_instantiation_state_input.state
}
self.db.set_one(
"vnfrs",
from datetime import timedelta
import logging
from temporalio import workflow
+from temporalio.converter import value_to_type
from temporalio.common import RetryPolicy
from osm_common.dataclasses.temporal_dataclasses import (
ChangeVnfInstantiationStateInput,
ChangeVnfStateInput,
GetTaskQueueInput,
+ GetTaskQueueOutput,
+ GetVnfDetailsInput,
+ GetVnfDetailsOutput,
)
from osm_common.temporal_constants import (
ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
ACTIVITY_CHANGE_VNF_STATE,
ACTIVITY_GET_TASK_QUEUE,
+ ACTIVITY_GET_VNF_DETAILS,
ACTIVITY_SET_VNF_MODEL,
LCM_TASK_QUEUE,
WORKFLOW_VDU_INSTANTIATE,
def __init__(self):
self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
- self.vnf_instantiation_state = ChangeVnfInstantiationStateInput(
- "", VnfInstantiationState.NOT_INSTANTIATED
- )
- self.vnf_state = ChangeVnfStateInput("", VnfState.STOPPED)
@workflow.run
async def run(self, input: VnfInstantiateInput) -> None:
+ self.logger.info(f"Deploying VNF {input.vnfr_uuid}")
+ vnf_instantiation_state = ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED
+ )
+ vnf_state = ChangeVnfStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED
+ )
+ await self.update_states(
+ vnf_instantiation_state=vnf_instantiation_state,
+ vnf_state=vnf_state,
+ )
try:
- self.vnf_state.vnfr_uuid = (
- self.vnf_instantiation_state.vnfr_uuid
- ) = input.vnfr_uuid
-
- await self.update_states()
- vnf_task_queue = await workflow.execute_activity(
- activity=ACTIVITY_GET_TASK_QUEUE,
- arg=GetTaskQueueInput(input.vnfr_uuid),
- activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=default_schedule_to_close_timeout,
- retry_policy=retry_policy,
+ vnf_task_queue = value_to_type(
+ GetTaskQueueOutput,
+ await workflow.execute_activity(
+ activity=ACTIVITY_GET_TASK_QUEUE,
+ arg=GetTaskQueueInput(input.vnfr_uuid),
+ activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}",
+ task_queue=LCM_TASK_QUEUE,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
)
+ self.logger.debug(f"Dependent task queue is {vnf_task_queue.task_queue}")
+
await workflow.execute_child_workflow(
workflow=WORKFLOW_VNF_PREPARE,
arg=input,
- task_queue=vnf_task_queue,
+ task_queue=vnf_task_queue.task_queue,
id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}",
)
- await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue)
+ get_vnf_details = value_to_type(
+ GetVnfDetailsOutput,
+ await workflow.execute_activity(
+ activity=ACTIVITY_GET_VNF_DETAILS,
+ arg=GetVnfDetailsInput(input.vnfr_uuid),
+ activity_id=f"{ACTIVITY_GET_VNF_DETAILS}-{input.vnfr_uuid}",
+ task_queue=vnf_task_queue.task_queue,
+ schedule_to_close_timeout=default_schedule_to_close_timeout,
+ retry_policy=retry_policy,
+ ),
+ )
- self.vnf_instantiation_state.state = VnfInstantiationState.INSTANTIATED
- self.vnf_state.state = VnfState.STARTED
+ await self.instantiate_vdus(
+ get_vnf_details.vnfr, get_vnf_details.vnfd, vnf_task_queue.task_queue
+ )
+ await self.update_states(
+ vnf_instantiation_state=ChangeVnfInstantiationStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+ ),
+ vnf_state=ChangeVnfStateInput(
+ vnfr_uuid=input.vnfr_uuid, state=VnfState.STARTED
+ ),
+ )
except Exception as e:
workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}")
- self.vnf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED
- self.vnf_state.state = VnfState.STOPPED
+ await self.update_states(
+ vnf_instantiation_state=vnf_instantiation_state,
+ vnf_state=vnf_state,
+ )
raise e
- finally:
- await self.update_states()
-
- async def update_vnf_state(self):
+ @staticmethod
+ async def update_vnf_state(vnf_state):
await workflow.execute_activity(
activity=ACTIVITY_CHANGE_VNF_STATE,
- arg=self.vnf_state,
- activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{self.vnf_state.vnfr_uuid}",
+ arg=vnf_state,
+ activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{vnf_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
)
- async def update_vnf_instantiation_state(self):
+ @staticmethod
+ async def update_vnf_instantiation_state(vnf_instantiation_state):
await workflow.execute_activity(
activity=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
- arg=self.vnf_instantiation_state,
- activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{self.vnf_instantiation_state.vnfr_uuid}",
+ arg=vnf_instantiation_state,
+ activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{vnf_instantiation_state.vnfr_uuid}",
task_queue=LCM_TASK_QUEUE,
schedule_to_close_timeout=default_schedule_to_close_timeout,
retry_policy=retry_policy,
retry_policy=retry_policy,
)
- async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue):
- # TODO: see OSMENG-989
-
- vnfr = {"uuid": vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid})
- vnfd = {} # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
+ @staticmethod
+ async def instantiate_vdus(vnfr: dict, vnfd: dict, task_queue: str):
for vdu in vnfd.get("vdu"):
(
vdu_instantiate_input,
await workflow.execute_child_workflow(
workflow=WORKFLOW_VDU_INSTANTIATE,
arg=vdu_instantiate_input,
- task_queue=vnf_task_queue,
+ task_queue=task_queue,
id=vdu_instantiate_workflow_id,
)
vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
vdu_instantiate_workflow_id = (
- vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
+ vdu_instantiate_input.model_name
+ + "_"
+ + vdu_instantiate_input.charm_info.app_name
)
return vdu_instantiate_input, vdu_instantiate_workflow_id
- async def update_states(self):
- await self.update_vnf_instantiation_state()
- await self.update_vnf_state()
+ async def update_states(
+ self,
+ vnf_instantiation_state: ChangeVnfInstantiationStateInput,
+ vnf_state: ChangeVnfStateInput,
+ ):
+ await self.update_vnf_instantiation_state(vnf_instantiation_state)
+ await self.update_vnf_state(vnf_state)
await self.send_notification_for_vnf()
import asynctest
-from osm_common.dataclasses.temporal_dataclasses import VnfInstantiateInput
+from asyncio.exceptions import CancelledError
+from copy import deepcopy
+from osm_common.dataclasses.temporal_dataclasses import (
+ GetVnfDetailsInput,
+ VnfInstantiateInput,
+)
from osm_common.dbbase import DbException
-from osm_lcm.temporal.vnf_activities import VnfDbActivity
+from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
from temporalio.testing import ActivityEnvironment
from unittest.mock import Mock
-
vnfr_uuid = "d08d2da5-2120-476c-8538-deaeb4e88b3e"
model_name = "a-model-name"
vnf_instantiate_input = VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name)
await self.env.run(
self.vnf_db_activity.set_vnf_model, vnf_instantiate_input
)
+
+
+sample_vnfr = {
+ "_id": "9f472177-95c0-4335-b357-5cdc17a79965",
+ "id": "9f472177-95c0-4335-b357-5cdc17a79965",
+ "nsr-id-ref": "dcf4c922-5a73-41bf-a6ca-870c22d6336c",
+ "vnfd-ref": "jar_vnfd_scalable",
+ "vnfd-id": "f1b38eac-190c-485d-9a74-c6e169c929d8",
+ "vim-account-id": "9b0bedc3-ea8e-42fd-acc9-dd03f4dee73c",
+}
+
+sample_vnfd = {
+ "_id": "97784f19-d254-4252-946c-cf92d85443ca",
+ "id": "sol006-vnf",
+ "provider": "Canonical",
+ "product-name": "test-vnf",
+ "software-version": "1.0",
+}
+
+
+class TestVnfDetails(asynctest.TestCase):
+ async def setUp(self):
+ self.db = Mock()
+ self.vnf_operations_instance = VnfOperations(self.db)
+ self.env = ActivityEnvironment()
+
+ async def test_activity_succeeded(self):
+ self.db.get_one.side_effect = [sample_vnfr, sample_vnfd]
+ activity_result = await self.env.run(
+ self.vnf_operations_instance.get_vnf_details,
+ GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+ )
+ self.assertEqual(activity_result.vnfd, sample_vnfd)
+ self.assertEqual(activity_result.vnfr, sample_vnfr)
+
+ async def test_activity_failed_db_exception(self):
+ self.db.get_one.side_effect = DbException("Can not connect to Database.")
+ with self.assertRaises(DbException) as err:
+ activity_result = await self.env.run(
+ self.vnf_operations_instance.get_vnf_details,
+ GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+ )
+ self.assertEqual(activity_result, None)
+ self.assertEqual(
+ str(err.exception), "database exception Can not connect to Database."
+ )
+
+ async def test_activity_failed_key_error(self):
+ vnfr = deepcopy(sample_vnfr)
+ vnfr.pop("vnfd-id")
+ self.db.get_one.side_effect = [vnfr, sample_vnfd]
+ with self.assertRaises(KeyError) as err:
+ activity_result = await self.env.run(
+ self.vnf_operations_instance.get_vnf_details,
+ GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+ )
+ self.assertEqual(activity_result, None)
+ self.assertEqual(str(err.exception.args[0]), "vnfd-id")
+
+ async def test_activity_cancelled(self):
+ self.env._cancelled = True
+ self.db.get_one.side_effect = [sample_vnfr, sample_vnfd]
+ with self.assertRaises(CancelledError):
+ activity_result = await self.env.run(
+ self.vnf_operations_instance.get_vnf_details,
+ GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+ )
+ self.assertEqual(activity_result, None)
+
+
+if __name__ == "__main__":
+ asynctest.main()