OSM-989 Fetch vnfrs and vnfds using activity 71/13171/14
authorGulsum Atici <gulsum.atici@canonical.com>
Thu, 6 Apr 2023 21:07:07 +0000 (00:07 +0300)
committerMark Beierl <mark.beierl@canonical.com>
Tue, 18 Apr 2023 18:09:12 +0000 (18:09 +0000)
Change-Id: Iabcf10a36b8690ab5952ae46891c8dcc218a2a86
Signed-off-by: Gulsum Atici <gulsum.atici@canonical.com>
osm_lcm/nglcm.py
osm_lcm/temporal/vnf_activities.py
osm_lcm/temporal/vnf_workflows.py
osm_lcm/tests/test_vnf_activities.py

index c62719f..a3c57b3 100644 (file)
@@ -172,9 +172,10 @@ class NGLcm:
             vim_data_activity_instance.update_vim_operation_state,
             vim_data_activity_instance.update_vim_state,
             vim_data_activity_instance.delete_vim_record,
-            vnf_operation_instance.get_task_queue,
             vnf_data_activity_instance.change_vnf_state,
             vnf_data_activity_instance.change_vnf_instantiation_state,
+            vnf_operation_instance.get_task_queue,
+            vnf_operation_instance.get_vnf_details,
             vnf_send_notifications_instance.send_notification_for_vnf,
             vnf_data_activity_instance.set_vnf_model,
         ]
index 6e5abcb..16cc93c 100644 (file)
@@ -19,16 +19,19 @@ from temporalio import activity
 from osm_common.temporal_constants import (
     ACTIVITY_CHANGE_VNF_STATE,
     ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
-    ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
     ACTIVITY_GET_TASK_QUEUE,
-    VIM_TYPE_TASK_QUEUE_MAPPINGS,
+    ACTIVITY_GET_VNF_DETAILS,
+    ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
     ACTIVITY_SET_VNF_MODEL,
+    VIM_TYPE_TASK_QUEUE_MAPPINGS,
 )
 from osm_common.dataclasses.temporal_dataclasses import (
     ChangeVnfInstantiationStateInput,
     ChangeVnfStateInput,
     GetTaskQueueInput,
     GetTaskQueueOutput,
+    GetVnfDetailsInput,
+    GetVnfDetailsOutput,
     VnfInstantiateInput,
 )
 
@@ -48,7 +51,7 @@ class VnfOperations:
             DB Access Object
 
         Raises (retryable):
-            DbException: If DB read operations fail
+            DbException: If DB read operations fail, the collection or DB record ID does not exist.
 
         Activity Lifecycle:
             This activity should complete relatively quickly (less than a
@@ -62,12 +65,39 @@ class VnfOperations:
             activity, the operation is idempotent.
 
         """
-        vnfrs = self.db.get_list("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
-        vim_record = self.db.get_list("vim_accounts", {"_id": vnfrs["vim-account-id"]})
+        vnfr = self.db.get_one("vnfrs", {"_id": get_task_queue_input.vnfr_uuid})
+        vim_record = self.db.get_one("vim_accounts", {"_id": vnfr["vim-account-id"]})
         task_queue = VIM_TYPE_TASK_QUEUE_MAPPINGS[vim_record["vim-type"]]
         self.logger.debug(f"Got the task queue {task_queue} for VNF operations.")
         return GetTaskQueueOutput(task_queue)
 
+    @activity.defn(name=ACTIVITY_GET_VNF_DETAILS)
+    async def get_vnf_details(
+        self, get_vnf_details_input: GetVnfDetailsInput
+    ) -> GetVnfDetailsOutput:
+        """Gets the VNF record and VNF descriptor from Database.
+
+        Collaborators:
+            DB Access Object
+
+        Raises (retryable):
+            DbException: If DB read operations fail, the collection or DB record ID does not exist.
+
+        Activity Lifecycle:
+            This activity should complete relatively quickly (less than 10
+            second).
+
+            This activity will not report a heartbeat due to its
+            short-running nature.
+
+            This is an idempotent activity.
+
+        """
+        vnfr = self.db.get_one("vnfrs", {"_id": get_vnf_details_input.vnfr_uuid})
+        vnfd = self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
+        self.logger.debug("Got the vnfr and vnfd from Database for VNF operations.")
+        return GetVnfDetailsOutput(vnfr=vnfr, vnfd=vnfd)
+
 
 class VnfDbActivity:
 
@@ -119,7 +149,7 @@ class VnfDbActivity:
             DB Access Object
 
         Raises (retryable):
-            DbException: If DB access or update fails
+            DbException: If DB access or update fails, the collection or DB record ID does not exist.
 
         Activity Lifecycle:
             This activity should complete relatively quickly (less than a
@@ -134,7 +164,7 @@ class VnfDbActivity:
 
         """
         update_vnf_instantiation_state = {
-            "vnfState": vnf_instantiation_state_input.state
+            "instantiationState": vnf_instantiation_state_input.state
         }
         self.db.set_one(
             "vnfrs",
index 6d16d26..4469c46 100644 (file)
@@ -17,6 +17,7 @@
 from datetime import timedelta
 import logging
 from temporalio import workflow
+from temporalio.converter import value_to_type
 from temporalio.common import RetryPolicy
 
 from osm_common.dataclasses.temporal_dataclasses import (
@@ -27,6 +28,9 @@ from osm_common.dataclasses.temporal_dataclasses import (
     ChangeVnfInstantiationStateInput,
     ChangeVnfStateInput,
     GetTaskQueueInput,
+    GetTaskQueueOutput,
+    GetVnfDetailsInput,
+    GetVnfDetailsOutput,
 )
 
 from osm_common.temporal_constants import (
@@ -34,6 +38,7 @@ from osm_common.temporal_constants import (
     ACTIVITY_SEND_NOTIFICATION_FOR_VNF,
     ACTIVITY_CHANGE_VNF_STATE,
     ACTIVITY_GET_TASK_QUEUE,
+    ACTIVITY_GET_VNF_DETAILS,
     ACTIVITY_SET_VNF_MODEL,
     LCM_TASK_QUEUE,
     WORKFLOW_VDU_INSTANTIATE,
@@ -58,64 +63,91 @@ class VnfInstantiateWorkflow:
 
     def __init__(self):
         self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
-        self.vnf_instantiation_state = ChangeVnfInstantiationStateInput(
-            "", VnfInstantiationState.NOT_INSTANTIATED
-        )
-        self.vnf_state = ChangeVnfStateInput("", VnfState.STOPPED)
 
     @workflow.run
     async def run(self, input: VnfInstantiateInput) -> None:
+        self.logger.info(f"Deploying VNF {input.vnfr_uuid}")
+        vnf_instantiation_state = ChangeVnfInstantiationStateInput(
+            vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.NOT_INSTANTIATED
+        )
+        vnf_state = ChangeVnfStateInput(
+            vnfr_uuid=input.vnfr_uuid, state=VnfState.STOPPED
+        )
+        await self.update_states(
+            vnf_instantiation_state=vnf_instantiation_state,
+            vnf_state=vnf_state,
+        )
         try:
-            self.vnf_state.vnfr_uuid = (
-                self.vnf_instantiation_state.vnfr_uuid
-            ) = input.vnfr_uuid
-
-            await self.update_states()
-            vnf_task_queue = await workflow.execute_activity(
-                activity=ACTIVITY_GET_TASK_QUEUE,
-                arg=GetTaskQueueInput(input.vnfr_uuid),
-                activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}",
-                task_queue=LCM_TASK_QUEUE,
-                schedule_to_close_timeout=default_schedule_to_close_timeout,
-                retry_policy=retry_policy,
+            vnf_task_queue = value_to_type(
+                GetTaskQueueOutput,
+                await workflow.execute_activity(
+                    activity=ACTIVITY_GET_TASK_QUEUE,
+                    arg=GetTaskQueueInput(input.vnfr_uuid),
+                    activity_id=f"{ACTIVITY_GET_TASK_QUEUE}-{input.vnfr_uuid}",
+                    task_queue=LCM_TASK_QUEUE,
+                    schedule_to_close_timeout=default_schedule_to_close_timeout,
+                    retry_policy=retry_policy,
+                ),
             )
 
+            self.logger.debug(f"Dependent task queue is {vnf_task_queue.task_queue}")
+
             await workflow.execute_child_workflow(
                 workflow=WORKFLOW_VNF_PREPARE,
                 arg=input,
-                task_queue=vnf_task_queue,
+                task_queue=vnf_task_queue.task_queue,
                 id=f"{WORKFLOW_VNF_PREPARE}-{input.vnfr_uuid}",
             )
 
-            await self.instantiate_vdus(input.vnfr_uuid, vnf_task_queue)
+            get_vnf_details = value_to_type(
+                GetVnfDetailsOutput,
+                await workflow.execute_activity(
+                    activity=ACTIVITY_GET_VNF_DETAILS,
+                    arg=GetVnfDetailsInput(input.vnfr_uuid),
+                    activity_id=f"{ACTIVITY_GET_VNF_DETAILS}-{input.vnfr_uuid}",
+                    task_queue=vnf_task_queue.task_queue,
+                    schedule_to_close_timeout=default_schedule_to_close_timeout,
+                    retry_policy=retry_policy,
+                ),
+            )
 
-            self.vnf_instantiation_state.state = VnfInstantiationState.INSTANTIATED
-            self.vnf_state.state = VnfState.STARTED
+            await self.instantiate_vdus(
+                get_vnf_details.vnfr, get_vnf_details.vnfd, vnf_task_queue.task_queue
+            )
+            await self.update_states(
+                vnf_instantiation_state=ChangeVnfInstantiationStateInput(
+                    vnfr_uuid=input.vnfr_uuid, state=VnfInstantiationState.INSTANTIATED
+                ),
+                vnf_state=ChangeVnfStateInput(
+                    vnfr_uuid=input.vnfr_uuid, state=VnfState.STARTED
+                ),
+            )
 
         except Exception as e:
             workflow.logger.error(f"{WORKFLOW_VNF_INSTANTIATE} failed with {str(e)}")
-            self.vnf_instantiation_state.state = VnfInstantiationState.NOT_INSTANTIATED
-            self.vnf_state.state = VnfState.STOPPED
+            await self.update_states(
+                vnf_instantiation_state=vnf_instantiation_state,
+                vnf_state=vnf_state,
+            )
             raise e
 
-        finally:
-            await self.update_states()
-
-    async def update_vnf_state(self):
+    @staticmethod
+    async def update_vnf_state(vnf_state):
         await workflow.execute_activity(
             activity=ACTIVITY_CHANGE_VNF_STATE,
-            arg=self.vnf_state,
-            activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{self.vnf_state.vnfr_uuid}",
+            arg=vnf_state,
+            activity_id=f"{ACTIVITY_CHANGE_VNF_STATE}-{vnf_state.vnfr_uuid}",
             task_queue=LCM_TASK_QUEUE,
             schedule_to_close_timeout=default_schedule_to_close_timeout,
             retry_policy=retry_policy,
         )
 
-    async def update_vnf_instantiation_state(self):
+    @staticmethod
+    async def update_vnf_instantiation_state(vnf_instantiation_state):
         await workflow.execute_activity(
             activity=ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE,
-            arg=self.vnf_instantiation_state,
-            activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{self.vnf_instantiation_state.vnfr_uuid}",
+            arg=vnf_instantiation_state,
+            activity_id=f"{ACTIVITY_CHANGE_VNF_INSTANTIATION_STATE}-{vnf_instantiation_state.vnfr_uuid}",
             task_queue=LCM_TASK_QUEUE,
             schedule_to_close_timeout=default_schedule_to_close_timeout,
             retry_policy=retry_policy,
@@ -132,11 +164,8 @@ class VnfInstantiateWorkflow:
             retry_policy=retry_policy,
         )
 
-    async def instantiate_vdus(self, vnfr_uuid, vnf_task_queue):
-        # TODO: see OSMENG-989
-
-        vnfr = {"uuid": vnfr_uuid}  # self.db.get_one("vnfrs", {"_id": vnfr_uuid})
-        vnfd = {}  # self.db.get_one("vnfds", {"_id": vnfr["vnfd-id"]})
+    @staticmethod
+    async def instantiate_vdus(vnfr: dict, vnfd: dict, task_queue: str):
         for vdu in vnfd.get("vdu"):
             (
                 vdu_instantiate_input,
@@ -145,7 +174,7 @@ class VnfInstantiateWorkflow:
             await workflow.execute_child_workflow(
                 workflow=WORKFLOW_VDU_INSTANTIATE,
                 arg=vdu_instantiate_input,
-                task_queue=vnf_task_queue,
+                task_queue=task_queue,
                 id=vdu_instantiate_workflow_id,
             )
 
@@ -157,13 +186,19 @@ class VnfInstantiateWorkflow:
         vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
         vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
         vdu_instantiate_workflow_id = (
-            vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
+            vdu_instantiate_input.model_name
+            + "_"
+            + vdu_instantiate_input.charm_info.app_name
         )
         return vdu_instantiate_input, vdu_instantiate_workflow_id
 
-    async def update_states(self):
-        await self.update_vnf_instantiation_state()
-        await self.update_vnf_state()
+    async def update_states(
+        self,
+        vnf_instantiation_state: ChangeVnfInstantiationStateInput,
+        vnf_state: ChangeVnfStateInput,
+    ):
+        await self.update_vnf_instantiation_state(vnf_instantiation_state)
+        await self.update_vnf_state(vnf_state)
         await self.send_notification_for_vnf()
 
 
index 989a647..12905fd 100644 (file)
 
 
 import asynctest
-from osm_common.dataclasses.temporal_dataclasses import VnfInstantiateInput
+from asyncio.exceptions import CancelledError
+from copy import deepcopy
+from osm_common.dataclasses.temporal_dataclasses import (
+    GetVnfDetailsInput,
+    VnfInstantiateInput,
+)
 from osm_common.dbbase import DbException
-from osm_lcm.temporal.vnf_activities import VnfDbActivity
+from osm_lcm.temporal.vnf_activities import VnfDbActivity, VnfOperations
 from temporalio.testing import ActivityEnvironment
 from unittest.mock import Mock
 
-
 vnfr_uuid = "d08d2da5-2120-476c-8538-deaeb4e88b3e"
 model_name = "a-model-name"
 vnf_instantiate_input = VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name)
@@ -46,3 +50,75 @@ class TestVnfDbActivity(asynctest.TestCase):
             await self.env.run(
                 self.vnf_db_activity.set_vnf_model, vnf_instantiate_input
             )
+
+
+sample_vnfr = {
+    "_id": "9f472177-95c0-4335-b357-5cdc17a79965",
+    "id": "9f472177-95c0-4335-b357-5cdc17a79965",
+    "nsr-id-ref": "dcf4c922-5a73-41bf-a6ca-870c22d6336c",
+    "vnfd-ref": "jar_vnfd_scalable",
+    "vnfd-id": "f1b38eac-190c-485d-9a74-c6e169c929d8",
+    "vim-account-id": "9b0bedc3-ea8e-42fd-acc9-dd03f4dee73c",
+}
+
+sample_vnfd = {
+    "_id": "97784f19-d254-4252-946c-cf92d85443ca",
+    "id": "sol006-vnf",
+    "provider": "Canonical",
+    "product-name": "test-vnf",
+    "software-version": "1.0",
+}
+
+
+class TestVnfDetails(asynctest.TestCase):
+    async def setUp(self):
+        self.db = Mock()
+        self.vnf_operations_instance = VnfOperations(self.db)
+        self.env = ActivityEnvironment()
+
+    async def test_activity_succeeded(self):
+        self.db.get_one.side_effect = [sample_vnfr, sample_vnfd]
+        activity_result = await self.env.run(
+            self.vnf_operations_instance.get_vnf_details,
+            GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+        )
+        self.assertEqual(activity_result.vnfd, sample_vnfd)
+        self.assertEqual(activity_result.vnfr, sample_vnfr)
+
+    async def test_activity_failed_db_exception(self):
+        self.db.get_one.side_effect = DbException("Can not connect to Database.")
+        with self.assertRaises(DbException) as err:
+            activity_result = await self.env.run(
+                self.vnf_operations_instance.get_vnf_details,
+                GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+            )
+            self.assertEqual(activity_result, None)
+        self.assertEqual(
+            str(err.exception), "database exception Can not connect to Database."
+        )
+
+    async def test_activity_failed_key_error(self):
+        vnfr = deepcopy(sample_vnfr)
+        vnfr.pop("vnfd-id")
+        self.db.get_one.side_effect = [vnfr, sample_vnfd]
+        with self.assertRaises(KeyError) as err:
+            activity_result = await self.env.run(
+                self.vnf_operations_instance.get_vnf_details,
+                GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+            )
+            self.assertEqual(activity_result, None)
+        self.assertEqual(str(err.exception.args[0]), "vnfd-id")
+
+    async def test_activity_cancelled(self):
+        self.env._cancelled = True
+        self.db.get_one.side_effect = [sample_vnfr, sample_vnfd]
+        with self.assertRaises(CancelledError):
+            activity_result = await self.env.run(
+                self.vnf_operations_instance.get_vnf_details,
+                GetVnfDetailsInput(vnfr_uuid=sample_vnfr["id"]),
+            )
+            self.assertEqual(activity_result, None)
+
+
+if __name__ == "__main__":
+    asynctest.main()