From: Gulsum Atici Date: Thu, 6 Apr 2023 21:07:07 +0000 (+0300) Subject: OSM-989 Fetch vnfrs and vnfds using activity X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=3636540698878d3f3b08f3ff9c73d5ab4dc5ab88;p=osm%2FLCM.git OSM-989 Fetch vnfrs and vnfds using activity Change-Id: Iabcf10a36b8690ab5952ae46891c8dcc218a2a86 Signed-off-by: Gulsum Atici --- diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index c62719f..a3c57b3 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -172,9 +172,10 @@ class NGLcm: vim_data_activity_instance.update_vim_operation_state, vim_data_activity_instance.update_vim_state, vim_data_activity_instance.delete_vim_record, - vnf_operation_instance.get_task_queue, vnf_data_activity_instance.change_vnf_state, vnf_data_activity_instance.change_vnf_instantiation_state, + vnf_operation_instance.get_task_queue, + vnf_operation_instance.get_vnf_details, vnf_send_notifications_instance.send_notification_for_vnf, vnf_data_activity_instance.set_vnf_model, ] diff --git a/osm_lcm/temporal/vnf_activities.py b/osm_lcm/temporal/vnf_activities.py index 6e5abcb..16cc93c 100644 --- a/osm_lcm/temporal/vnf_activities.py +++ b/osm_lcm/temporal/vnf_activities.py @@ -19,16 +19,19 @@ from temporalio import activity from osm_common.temporal_constants import ( ACTIVITY_CHANGE_VNF_STATE, ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE, - ACTIVITY_SEND_NOTIFICATION_FOR_VNF, ACTIVITY_GET_TASK_QUEUE, - VIM_TYPE_TASK_QUEUE_MAPPINGS, + ACTIVITY_GET_VNF_DETAILS, + ACTIVITY_SEND_NOTIFICATION_FOR_VNF, ACTIVITY_SET_VNF_MODEL, + VIM_TYPE_TASK_QUEUE_MAPPINGS, ) from osm_common.dataclasses.temporal_dataclasses import ( ChangeVnfInstantiationStateInput, ChangeVnfStateInput, GetTaskQueueInput, GetTaskQueueOutput, + GetVnfDetailsInput, + GetVnfDetailsOutput, VnfInstantiateInput, ) @@ -48,7 +51,7 @@ class VnfOperations: DB Access Object Raises (retryable): - DbException: If DB read operations fail + DbException: If DB read operations fail, the collection or DB record ID does not exist. Activity Lifecycle: This activity should complete relatively quickly (less than a @@ -62,12 +65,39 @@ class VnfOperations: activity, the operation is idempotent. """ - vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid}) - vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]}) + vnfr = self.db.get_one("vnfrs", {"_id": get_task_queue_input.vnfr_uuid}) + vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]}) task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim-type"]] self.logger.debug(f"Got the task queue {task_queue} for VNF operations.") return GetTaskQueueOutput(task_queue) + @activity.defn(name=ACTIVITY_GET_VNF_DETAILS) + async def get_vnf_details( + self, get_vnf_details_input: GetVnfDetailsInput + ) -> GetVnfDetailsOutput: + """Gets the VNF record and VNF descriptor from Database. + + Collaborators: + DB Access Object + + Raises (retryable): + DbException: If DB read operations fail, the collection or DB record ID does not exist. + + Activity Lifecycle: + This activity should complete relatively quickly (less than 10 + second). + + This activity will not report a heartbeat due to its + short-running nature. + + This is an idempotent activity. + + """ + vnfr = self.db.get_one("vnfrs", {"_id": get_vnf_details_input.vnfr_uuid}) + vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}) + self.logger.debug("Got the vnfr and vnfd from Database for VNF operations.") + return GetVnfDetailsOutput(vnfr=vnfr, vnfd=vnfd) + class VnfDbActivity: @@ -119,7 +149,7 @@ class VnfDbActivity: DB Access Object Raises (retryable): - DbException: If DB access or update fails + DbException: If DB access or update fails, the collection or DB record ID does not exist. Activity Lifecycle: This activity should complete relatively quickly (less than a @@ -134,7 +164,7 @@ class VnfDbActivity: """ update_vnf_instantiation_state = { - "vnfState": vnf_instantiation_state_input.state + "instantiationState": vnf_instantiation_state_input.state } self.db.set_one( "vnfrs", diff --git a/osm_lcm/temporal/vnf_workflows.py b/osm_lcm/temporal/vnf_workflows.py index 6d16d26..4469c46 100644 --- a/osm_lcm/temporal/vnf_workflows.py +++ b/osm_lcm/temporal/vnf_workflows.py @@ -17,6 +17,7 @@ from datetime import timedelta import logging from temporalio import workflow +from temporalio.converter import value_to_type from temporalio.common import RetryPolicy from osm_common.dataclasses.temporal_dataclasses import ( @@ -27,6 +28,9 @@ from osm_common.dataclasses.temporal_dataclasses import ( ChangeVnfInstantiationStateInput, ChangeVnfStateInput, GetTaskQueueInput, + GetTaskQueueOutput, + GetVnfDetailsInput, + GetVnfDetailsOutput, ) from osm_common.temporal_constants import ( @@ -34,6 +38,7 @@ from osm_common.temporal_constants import ( ACTIVITY_SEND_NOTIFICATION_FOR_VNF, ACTIVITY_CHANGE_VNF_STATE, ACTIVITY_GET_TASK_QUEUE, + ACTIVITY_GET_VNF_DETAILS, ACTIVITY_SET_VNF_MODEL, LCM_TASK_QUEUE, WORKFLOW_VDU_INSTANTIATE, @@ -58,64 +63,91 @@ class VnfInstantiateWorkflow: def __init__(self): self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}") - self.vnf_instantiation_state = ChangeVnfInstantiationStateInput( - "", VnfInstantiationState.NOT_INSTANTIATED - ) - self.vnf_state = ChangeVnfStateInput("", VnfState.STOPPED) @workflow.run async def run(self, input: VnfInstantiateInput) -> None: + self.logger.info(f"Deploying VNF {input.vnfr_uuid}") + vnf_instantiation_state = ChangeVnfInstantiationStateInput( + vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED + ) + vnf_state = ChangeVnfStateInput( + vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED + ) + await self.update_states( + vnf_instantiation_state=vnf_instantiation_state, + vnf_state=vnf_state, + ) try: - self.vnf_state.vnfr_uuid = ( - self.vnf_instantiation_state.vnfr_uuid - ) = input.vnfr_uuid - - await self.update_states() - vnf_task_queue = await workflow.execute_activity( - activity=ACTIVITY_GET_TASK_QUEUE, - arg=GetTaskQueueInput(input.vnfr_uuid), - activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=default_schedule_to_close_timeout, - retry_policy=retry_policy, + vnf_task_queue = value_to_type( + GetTaskQueueOutput, + await workflow.execute_activity( + activity=ACTIVITY_GET_TASK_QUEUE, + arg=GetTaskQueueInput(input.vnfr_uuid), + activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}", + task_queue=LCM_TASK_QUEUE, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), ) + self.logger.debug(f"Dependent task queue is {vnf_task_queue.task_queue}") + await workflow.execute_child_workflow( workflow=WORKFLOW_VNF_PREPARE, arg=input, - task_queue=vnf_task_queue, + task_queue=vnf_task_queue.task_queue, id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}", ) - await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue) + get_vnf_details = value_to_type( + GetVnfDetailsOutput, + await workflow.execute_activity( + activity=ACTIVITY_GET_VNF_DETAILS, + arg=GetVnfDetailsInput(input.vnfr_uuid), + activity_id=f"{ACTIVITY_GET_VNF_DETAILS}-{input.vnfr_uuid}", + task_queue=vnf_task_queue.task_queue, + schedule_to_close_timeout=default_schedule_to_close_timeout, + retry_policy=retry_policy, + ), + ) - self.vnf_instantiation_state.state = VnfInstantiationState.INSTANTIATED - self.vnf_state.state = VnfState.STARTED + await self.instantiate_vdus( + get_vnf_details.vnfr, get_vnf_details.vnfd, vnf_task_queue.task_queue + ) + await self.update_states( + vnf_instantiation_state=ChangeVnfInstantiationStateInput( + vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED + ), + vnf_state=ChangeVnfStateInput( + vnfr_uuid=input.vnfr_uuid, state=VnfState.STARTED + ), + ) except Exception as e: workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}") - self.vnf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED - self.vnf_state.state = VnfState.STOPPED + await self.update_states( + vnf_instantiation_state=vnf_instantiation_state, + vnf_state=vnf_state, + ) raise e - finally: - await self.update_states() - - async def update_vnf_state(self): + @staticmethod + async def update_vnf_state(vnf_state): await workflow.execute_activity( activity=ACTIVITY_CHANGE_VNF_STATE, - arg=self.vnf_state, - activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{self.vnf_state.vnfr_uuid}", + arg=vnf_state, + activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{vnf_state.vnfr_uuid}", task_queue=LCM_TASK_QUEUE, schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, ) - async def update_vnf_instantiation_state(self): + @staticmethod + async def update_vnf_instantiation_state(vnf_instantiation_state): await workflow.execute_activity( activity=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE, - arg=self.vnf_instantiation_state, - activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{self.vnf_instantiation_state.vnfr_uuid}", + arg=vnf_instantiation_state, + activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{vnf_instantiation_state.vnfr_uuid}", task_queue=LCM_TASK_QUEUE, schedule_to_close_timeout=default_schedule_to_close_timeout, retry_policy=retry_policy, @@ -132,11 +164,8 @@ class VnfInstantiateWorkflow: retry_policy=retry_policy, ) - async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue): - # TODO: see OSMENG-989 - - vnfr = {"uuid": vnfr_uuid} # self.db.get_one("vnfrs", {"_id": vnfr_uuid}) - vnfd = {} # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]}) + @staticmethod + async def instantiate_vdus(vnfr: dict, vnfd: dict, task_queue: str): for vdu in vnfd.get("vdu"): ( vdu_instantiate_input, @@ -145,7 +174,7 @@ class VnfInstantiateWorkflow: await workflow.execute_child_workflow( workflow=WORKFLOW_VDU_INSTANTIATE, arg=vdu_instantiate_input, - task_queue=vnf_task_queue, + task_queue=task_queue, id=vdu_instantiate_workflow_id, ) @@ -157,13 +186,19 @@ class VnfInstantiateWorkflow: vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs) vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info) vdu_instantiate_workflow_id = ( - vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name + vdu_instantiate_input.model_name + + "_" + + vdu_instantiate_input.charm_info.app_name ) return vdu_instantiate_input, vdu_instantiate_workflow_id - async def update_states(self): - await self.update_vnf_instantiation_state() - await self.update_vnf_state() + async def update_states( + self, + vnf_instantiation_state: ChangeVnfInstantiationStateInput, + vnf_state: ChangeVnfStateInput, + ): + await self.update_vnf_instantiation_state(vnf_instantiation_state) + await self.update_vnf_state(vnf_state) await self.send_notification_for_vnf() diff --git a/osm_lcm/tests/test_vnf_activities.py b/osm_lcm/tests/test_vnf_activities.py index 989a647..12905fd 100644 --- a/osm_lcm/tests/test_vnf_activities.py +++ b/osm_lcm/tests/test_vnf_activities.py @@ -16,13 +16,17 @@ import asynctest -from osm_common.dataclasses.temporal_dataclasses import VnfInstantiateInput +from asyncio.exceptions import CancelledError +from copy import deepcopy +from osm_common.dataclasses.temporal_dataclasses import ( + GetVnfDetailsInput, + VnfInstantiateInput, +) from osm_common.dbbase import DbException -from osm_lcm.temporal.vnf_activities import VnfDbActivity +from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations from temporalio.testing import ActivityEnvironment from unittest.mock import Mock - vnfr_uuid = "d08d2da5-2120-476c-8538-deaeb4e88b3e" model_name = "a-model-name" vnf_instantiate_input = VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name) @@ -46,3 +50,75 @@ class TestVnfDbActivity(asynctest.TestCase): await self.env.run( self.vnf_db_activity.set_vnf_model, vnf_instantiate_input ) + + +sample_vnfr = { + "_id": "9f472177-95c0-4335-b357-5cdc17a79965", + "id": "9f472177-95c0-4335-b357-5cdc17a79965", + "nsr-id-ref": "dcf4c922-5a73-41bf-a6ca-870c22d6336c", + "vnfd-ref": "jar_vnfd_scalable", + "vnfd-id": "f1b38eac-190c-485d-9a74-c6e169c929d8", + "vim-account-id": "9b0bedc3-ea8e-42fd-acc9-dd03f4dee73c", +} + +sample_vnfd = { + "_id": "97784f19-d254-4252-946c-cf92d85443ca", + "id": "sol006-vnf", + "provider": "Canonical", + "product-name": "test-vnf", + "software-version": "1.0", +} + + +class TestVnfDetails(asynctest.TestCase): + async def setUp(self): + self.db = Mock() + self.vnf_operations_instance = VnfOperations(self.db) + self.env = ActivityEnvironment() + + async def test_activity_succeeded(self): + self.db.get_one.side_effect = [sample_vnfr, sample_vnfd] + activity_result = await self.env.run( + self.vnf_operations_instance.get_vnf_details, + GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), + ) + self.assertEqual(activity_result.vnfd, sample_vnfd) + self.assertEqual(activity_result.vnfr, sample_vnfr) + + async def test_activity_failed_db_exception(self): + self.db.get_one.side_effect = DbException("Can not connect to Database.") + with self.assertRaises(DbException) as err: + activity_result = await self.env.run( + self.vnf_operations_instance.get_vnf_details, + GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), + ) + self.assertEqual(activity_result, None) + self.assertEqual( + str(err.exception), "database exception Can not connect to Database." + ) + + async def test_activity_failed_key_error(self): + vnfr = deepcopy(sample_vnfr) + vnfr.pop("vnfd-id") + self.db.get_one.side_effect = [vnfr, sample_vnfd] + with self.assertRaises(KeyError) as err: + activity_result = await self.env.run( + self.vnf_operations_instance.get_vnf_details, + GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), + ) + self.assertEqual(activity_result, None) + self.assertEqual(str(err.exception.args[0]), "vnfd-id") + + async def test_activity_cancelled(self): + self.env._cancelled = True + self.db.get_one.side_effect = [sample_vnfr, sample_vnfd] + with self.assertRaises(CancelledError): + activity_result = await self.env.run( + self.vnf_operations_instance.get_vnf_details, + GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]), + ) + self.assertEqual(activity_result, None) + + +if __name__ == "__main__": + asynctest.main()