Implement instantiate NS workflow 20/13220/20
authorDaniel Arndt <daniel.arndt@canonical.com>
Tue, 18 Apr 2023 14:26:16 +0000 (11:26 -0300)
committerMark Beierl <mark.beierl@canonical.com>
Thu, 20 Apr 2023 16:35:12 +0000 (16:35 +0000)
Change-Id: I61defdc64865396cd6af4a20ffb67443450bd742
Signed-off-by: Daniel Arndt <daniel.arndt@canonical.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
.gitignore
osm_lcm/nglcm.py
osm_lcm/temporal/lcm_workflows.py
osm_lcm/temporal/ns_activities.py
osm_lcm/temporal/ns_workflows.py
osm_lcm/tests/test_lcm_utils.py
osm_lcm/tests/test_ns_activities.py [deleted file]
osm_lcm/tests/test_ns_workflows.py [new file with mode: 0644]

index 4517ab1..9df0136 100644 (file)
@@ -32,4 +32,5 @@ nosetests.xml
 .pydevproject
 pyvenv.cfg
 venv/
-.idea
\ No newline at end of file
+.idea
+deb_dist
index a3c57b3..5ddae3f 100644 (file)
@@ -22,34 +22,35 @@ import logging
 import logging.handlers
 import os
 import sys
-import yaml
+from os import path
 
+import yaml
 from osm_common.dbbase import DbException
 from osm_common.temporal_constants import LCM_TASK_QUEUE
+from temporalio.client import Client
+from temporalio.worker import Worker
+
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.lcm_config import LcmCfg
 from osm_lcm.lcm_utils import LcmException
-from os import path
+from osm_lcm.temporal.juju_paas_activities import JujuPaasConnector
 from osm_lcm.temporal.lcm_activities import NsLcmActivity
 from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflow
+from osm_lcm.temporal.ns_activities import NsDbActivity, NsOperations
+from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
+from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
 from osm_lcm.temporal.vim_activities import VimDbActivity
-from osm_lcm.temporal.juju_paas_activities import JujuPaasConnector
 from osm_lcm.temporal.vim_workflows import (
     VimCreateWorkflow,
     VimDeleteWorkflow,
     VimUpdateWorkflow,
 )
-from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
-from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow
 from osm_lcm.temporal.vnf_activities import (
     VnfDbActivity,
     VnfOperations,
     VnfSendNotifications,
 )
-from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
-from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity
-from temporalio.client import Client
-from temporalio.worker import Worker
+from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow
 
 
 class NGLcm:
@@ -159,10 +160,8 @@ class NGLcm:
             VnfPrepareWorkflow,
         ]
         activities = [
-            ns_data_activity_instance.get_model_info,
             ns_data_activity_instance.update_ns_state,
-            ns_operation_instance.check_ns_instantiate_finished,
-            ns_operation_instance.deploy_ns,
+            ns_operation_instance.get_vnf_records,
             nslcm_activity_instance.update_ns_lcm_operation_state,
             nslcm_activity_instance.no_op,
             paas_connector_instance.create_model,
index 837e586..9ab8e5d 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+import traceback
 from abc import ABC, abstractmethod
 from datetime import timedelta
-import logging
-from temporalio import workflow
-from temporalio.common import RetryPolicy
-from temporalio.exceptions import ActivityError
+
 from osm_common.dataclasses.temporal_dataclasses import (
+    LcmOperationState,
     NsLcmOperationInput,
     UpdateLcmOperationStateInput,
-    LcmOperationState,
 )
 from osm_common.temporal_constants import (
+    ACTIVITY_NSLCM_NO_OP,
     ACTIVITY_UPDATE_LCM_OPERATION_STATE,
     WORKFLOW_NSLCM_NO_OP,
-    ACTIVITY_NSLCM_NO_OP,
 )
-import traceback
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
 
 
 class LcmOperationWorkflow(ABC):
@@ -60,14 +61,8 @@ class LcmOperationWorkflow(ABC):
 
     def __init__(self):
         self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
-        self.op_state = self.op_state = UpdateLcmOperationStateInput(
-            op_id=None,
-            op_state=None,
-            detailed_status="",
-            error_message="",
-            stage="",
-        )
         self.op_id = None
+        self.stage = ""
 
     @abstractmethod
     async def workflow(self, input: NsLcmOperationInput):
@@ -75,36 +70,47 @@ class LcmOperationWorkflow(ABC):
 
     async def wrap_nslcmop(self, input: NsLcmOperationInput):
         self.op_id = input.nslcmop["_id"]
-        self.op_state.op_id = self.op_id
-        self.op_state.op_state = LcmOperationState.PROCESSING
-        await self.update_operation_state()
-
+        await self.update_operation_state(LcmOperationState.PROCESSING)
         try:
             await self.workflow(input=input)
-            self.op_state.op_state = LcmOperationState.COMPLETED
-
         except ActivityError as e:
             # TODO: Deteremine the best content for Activity Errors OSM-994
             self.logger.exception(e)
-            self.op_state.op_state = LcmOperationState.FAILED
-            self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__))
-            self.op_state.error_message = str(e.cause.message)
+            await self.update_operation_state(
+                LcmOperationState.FAILED,
+                error_message=str(e.cause.message),
+                detailed_status=str(e.cause.with_traceback(e.__traceback__)),
+            )
             raise e
-
         except Exception as e:
             self.logger.exception(e)
-            self.op_state.op_state = LcmOperationState.FAILED
-            self.op_state.detailed_status = traceback.format_exc()
-            self.op_state.error_message = str(e)
+            self.update_operation_state(
+                LcmOperationState.FAILED,
+                error_message=str(e),
+                detailed_status=traceback.format_exc(),
+            )
             raise e
-
-        finally:
-            await self.update_operation_state()
-
-    async def update_operation_state(self) -> None:
+        await self.update_operation_state(LcmOperationState.COMPLETED)
+
+    async def update_operation_state(
+        self,
+        op_state: LcmOperationState,
+        stage: str = None,
+        error_message: str = "",
+        detailed_status: str = "",
+    ) -> None:
+        if stage is not None:
+            self.stage = stage
+        input = UpdateLcmOperationStateInput(
+            op_id=self.op_id,
+            op_state=op_state,
+            stage=self.stage,
+            error_message=error_message,
+            detailed_status=detailed_status,
+        )
         await workflow.execute_activity(
             activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE,
-            arg=self.op_state,
+            arg=input,
             activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}",
             schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
             retry_policy=LcmOperationWorkflow.retry_policy,
index eee4325..3ea7dcf 100644 (file)
 
 import logging
 from time import time
-from temporalio import activity
-from osm_common.temporal_constants import (
-    ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
-    ACTIVITY_DEPLOY_NS,
-    ACTIVITY_GET_MODEL_INFO,
-    ACTIVITY_UPDATE_NS_STATE,
-)
+
 from osm_common.dataclasses.temporal_dataclasses import (
-    ModelInfo,
-    NsInstantiateInput,
+    GetVnfRecordIdsInput,
+    GetVnfRecordIdsOutput,
     UpdateNsStateInput,
-    VduInstantiateInput,
 )
-from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+from osm_common.temporal_constants import (
+    ACTIVITY_GET_VNF_RECORD_IDS,
+    ACTIVITY_UPDATE_NS_STATE,
+)
+from temporalio import activity
 
 
 class NsOperations:
@@ -37,38 +34,12 @@ class NsOperations:
         self.db = db
         self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
 
-    # TODO: Change to a workflow OSM-990
-    @activity.defn(name=ACTIVITY_DEPLOY_NS)
-    async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None:
-        vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
-        for vnfr in vnfrs:
-            await self.deploy_vnf(vnfr)
-
-    async def deploy_vnf(self, vnfr: dict):
-        vim_id = vnfr.get("vim-account-id")
-        model_name = "model-name"
-        vnfd_id = vnfr["vnfd-id"]
-        vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
-        sw_image_descs = vnfd.get("sw-image-desc")
-        for vdu in vnfd.get("vdu"):
-            await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name)
-
-    async def deploy_vdu(
-        self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str
-    ) -> None:
-        vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
-        vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
-        workflow_id = (
-            vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
-        )
-        self.logger.info(f"TODO: Start VDU Workflow {workflow_id}")
-
-    @activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED)
-    async def check_ns_instantiate_finished(
-        self, ns_instantiate: NsInstantiateInput
-    ) -> None:
-        # TODO: Implement OSM-993
-        pass
+    @activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS)
+    async def get_vnf_records(
+        self, get_vnf_records_input: GetVnfRecordIdsInput
+    ) -> GetVnfRecordIdsOutput:
+        # TODO: [OSMENG-1043] Implement Get VNF Records
+        return GetVnfRecordIdsOutput(vnfr_ids=[""])
 
 
 class NsDbActivity:
@@ -83,37 +54,6 @@ class NsDbActivity:
         self.db = db
         self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
 
-    @activity.defn(name=ACTIVITY_GET_MODEL_INFO)
-    async def get_model_info(
-        self, ns_instantiate_input: NsInstantiateInput
-    ) -> ModelInfo:
-        """Returns a ModelInfo. Contains VIM ID and model name.
-
-        Collaborators:
-            DB Read:           nsrs
-
-        Raises  (Retryable):
-            DbException         If the target DB record does not exist or DB is not reachable.
-
-        Activity Lifecycle:
-            This activity will not report a heartbeat due to its
-            short-running nature.
-
-            As this is a direct DB update, it is not recommended to have
-            any specific retry policy
-
-        """
-        ns_uuid = ns_instantiate_input.ns_uuid
-        nsr = self.db.get_one("nsrs", {"_id": ns_uuid})
-        vim_uuid = nsr.get("datacenter")
-        model_name = self._get_namespace(ns_uuid, vim_uuid)
-        return ModelInfo(vim_uuid, model_name)
-
-    @staticmethod
-    def _get_namespace(ns_id: str, vim_id: str) -> str:
-        """The NS namespace is the combination if the NS ID and the VIM ID."""
-        return ns_id[-12:] + "-" + vim_id[-12:]
-
     @activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
     async def update_ns_state(self, data: UpdateNsStateInput) -> None:
         """
index bef6766..e78392e 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from temporalio import workflow
-from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError
+import asyncio
 
 from osm_common.dataclasses.temporal_dataclasses import (
+    GetVnfRecordIdsInput,
+    GetVnfRecordIdsOutput,
     ModelInfo,
-    NsInstantiateInput,
     NsLcmOperationInput,
     NsState,
     UpdateNsStateInput,
+    VnfInstantiateInput,
 )
 from osm_common.temporal_constants import (
     ACTIVITY_CREATE_MODEL,
-    ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
-    ACTIVITY_DEPLOY_NS,
-    ACTIVITY_GET_MODEL_INFO,
+    ACTIVITY_GET_VNF_RECORD_IDS,
     ACTIVITY_UPDATE_NS_STATE,
-    LCM_TASK_QUEUE,
     WORKFLOW_NS_INSTANTIATE,
+    WORKFLOW_VNF_INSTANTIATE,
 )
+from temporalio import workflow
+from temporalio.converter import value_to_type
+from temporalio.exceptions import ActivityError
 
 from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow
 
@@ -47,72 +48,70 @@ class NsInstantiateWorkflow(LcmOperationWorkflow):
         await super().wrap_nslcmop(input=input)
 
     async def workflow(self, input: NsLcmOperationInput) -> None:
-        # Temporary until we align on what is really needed where.  For now, this
-        # reduced set looks to be all that is needed.
-        input = NsInstantiateInput(
-            ns_uuid=input.nslcmop["nsInstanceId"], op_id=input.nslcmop["_id"]
-        )
+        self.logger.info(f"Executing {WORKFLOW_NS_INSTANTIATE} with {input}")
 
-        ns_state = UpdateNsStateInput(input.ns_uuid, NsState.INSTANTIATED, "Done")
+        # TODO: Can we clean up the input? Perhaps this workflow could receive NsInstantiateInput
+        # directly.
+        ns_uuid = input.nslcmop["nsInstanceId"]
+        vim_uuid = input.nslcmop["operationParams"]["vimAccountId"]
+        model_name = self._get_namespace(ns_uuid, vim_uuid)
         try:
-            model_info = value_to_type(
-                ModelInfo,
-                await workflow.execute_activity(
-                    activity=ACTIVITY_GET_MODEL_INFO,
-                    arg=input,
-                    activity_id=f"{ACTIVITY_GET_MODEL_INFO}-{input.ns_uuid}",
-                    task_queue=LCM_TASK_QUEUE,
-                    schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
-                    retry_policy=LcmOperationWorkflow.no_retry_policy,
-                ),
-            )
-
             await workflow.execute_activity(
                 activity=ACTIVITY_CREATE_MODEL,
-                arg=model_info,
-                activity_id=f"{ACTIVITY_CREATE_MODEL}-{input.ns_uuid}",
-                task_queue=LCM_TASK_QUEUE,
+                arg=ModelInfo(vim_uuid=vim_uuid, model_name=model_name),
+                activity_id=f"{ACTIVITY_CREATE_MODEL}-{ns_uuid}",
                 schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
                 retry_policy=LcmOperationWorkflow.no_retry_policy,
             )
 
-            # TODO: Change this to a workflow OSM-990
-            await workflow.execute_activity(
-                activity=ACTIVITY_DEPLOY_NS,
-                arg=input,
-                activity_id=f"{ACTIVITY_DEPLOY_NS}-{input.ns_uuid}",
-                task_queue=LCM_TASK_QUEUE,
-                schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
-                retry_policy=LcmOperationWorkflow.no_retry_policy,
+            # TODO: Provide the correct VNFR ids to the VNF instantiate workflow
+            vnf_record_ids_output: GetVnfRecordIdsOutput = value_to_type(
+                GetVnfRecordIdsOutput,
+                await workflow.execute_activity(
+                    activity=ACTIVITY_GET_VNF_RECORD_IDS,
+                    arg=GetVnfRecordIdsInput(ns_uuid=ns_uuid),
+                    activity_id=f"{ACTIVITY_GET_VNF_RECORD_IDS}-{ns_uuid}",
+                    schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+                    retry_policy=LcmOperationWorkflow.no_retry_policy,
+                ),
             )
-
-            await workflow.execute_activity(
-                activity=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
-                arg=input,
-                activity_id=f"{ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED}-{input.ns_uuid}",
-                task_queue=LCM_TASK_QUEUE,
-                schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
-                retry_policy=LcmOperationWorkflow.retry_policy,
+            asyncio.gather(
+                workflow.execute_child_workflow(
+                    workflow=WORKFLOW_VNF_INSTANTIATE,
+                    arg=VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name),
+                    id=f"{WORKFLOW_VNF_INSTANTIATE}-{vnfr_uuid}",
+                )
+                for vnfr_uuid in vnf_record_ids_output.vnfr_ids
             )
 
         except ActivityError as e:
-            ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, e.cause.message)
+            await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, e.cause.message)
             self.logger.error(
                 f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}"
             )
             raise e
 
         except Exception as e:
-            ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, str(e))
+            await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, str(e))
             self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}")
             raise e
+        await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, "Done")
 
-        finally:
-            await workflow.execute_activity(
-                activity=ACTIVITY_UPDATE_NS_STATE,
-                arg=ns_state,
-                activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{input.ns_uuid}",
-                task_queue=LCM_TASK_QUEUE,
-                schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
-                retry_policy=LcmOperationWorkflow.retry_policy,
-            )
+    @staticmethod
+    async def update_ns_state(
+        ns_uuid: str,
+        state: NsState,
+        message: str,
+    ) -> None:
+        input = UpdateNsStateInput(ns_uuid, state, message)
+        await workflow.execute_activity(
+            activity=ACTIVITY_UPDATE_NS_STATE,
+            arg=input,
+            activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{ns_uuid}",
+            schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+            retry_policy=LcmOperationWorkflow.retry_policy,
+        )
+
+    def _get_namespace(self, ns_id: str, vim_id: str) -> str:
+        """The NS namespace is the combination if the NS ID and the VIM ID."""
+        return ns_id[-12:] + "-" + vim_id[-12:]
index 32abbf5..3305fa6 100644 (file)
 ##
 import logging
 import tempfile
-from unittest.mock import Mock, patch, MagicMock, mock_open
 from unittest import TestCase
+from unittest.mock import MagicMock, Mock, mock_open, patch
+from zipfile import BadZipfile
 
-from osm_common.msgkafka import MsgKafka
+import yaml
 from osm_common import fslocal
+from osm_common.msgkafka import MsgKafka
+
 from osm_lcm.data_utils.database.database import Database
 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
 from osm_lcm.lcm_utils import LcmBase, LcmException
 from osm_lcm.tests import test_db_descriptors as descriptors
-import yaml
-from zipfile import BadZipfile
-
 
 tmpdir = tempfile.mkdtemp()[1]
 tmpfile = tempfile.mkstemp()[1]
diff --git a/osm_lcm/tests/test_ns_activities.py b/osm_lcm/tests/test_ns_activities.py
deleted file mode 100644 (file)
index b12693b..0000000
+++ /dev/null
@@ -1,59 +0,0 @@
-#######################################################################################
-# Copyright ETSI Contributors and Others.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import asynctest
-from osm_common.dataclasses.temporal_dataclasses import ModelInfo, NsInstantiateInput
-from osm_common.dbbase import DbException
-from osm_lcm.temporal.ns_activities import NsDbActivity
-from temporalio.testing import ActivityEnvironment
-from unittest.mock import Mock
-
-
-vim_id = "some-vim-uuid"
-ns_id = "0123456789-9876543210"
-nsr = {
-    "_id": ns_id,
-    "datacenter": vim_id,
-}
-input = NsInstantiateInput(ns_id, "op_id")
-expected_namespace = "9-9876543210-ome-vim-uuid"
-expected_model_info = ModelInfo(vim_id, expected_namespace)
-
-
-class TestGetModelInfo(asynctest.TestCase):
-    def setUp(self):
-        self.db = Mock()
-        self.env = ActivityEnvironment()
-        self.ns_db_activity = NsDbActivity(self.db)
-
-    async def test_nominal_case(self):
-        self.db.get_one.return_value = nsr
-        model_info = await self.env.run(self.ns_db_activity.get_model_info, input)
-        self.assertEqual(model_info, expected_model_info)
-
-    async def test_db_raises_exception(self):
-        self.db.get_one.side_effect = DbException("not found")
-        with self.assertRaises(DbException):
-            model_info = await self.env.run(self.ns_db_activity.get_model_info, input)
-            self.assertIsNone(model_info)
-
-    async def test_no_datacenter_raises_exception(self):
-        nsr = {"_id": ns_id}
-        self.db.get_one.return_value = nsr
-        with self.assertRaises(TypeError):
-            model_info = await self.env.run(self.ns_db_activity.get_model_info, input)
-            self.assertIsNone(model_info)
diff --git a/osm_lcm/tests/test_ns_workflows.py b/osm_lcm/tests/test_ns_workflows.py
new file mode 100644 (file)
index 0000000..9885a16
--- /dev/null
@@ -0,0 +1,199 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from unittest.mock import Mock
+
+import asynctest
+from osm_common.dataclasses.temporal_dataclasses import (
+    GetVnfRecordIdsInput,
+    GetVnfRecordIdsOutput,
+    LcmOperationState,
+    ModelInfo,
+    NsLcmOperationInput,
+    NsState,
+    UpdateLcmOperationStateInput,
+    UpdateNsStateInput,
+    VnfInstantiateInput,
+)
+from osm_common.temporal_constants import (
+    ACTIVITY_CREATE_MODEL,
+    ACTIVITY_GET_VNF_RECORD_IDS,
+    ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+    ACTIVITY_UPDATE_NS_STATE,
+    LCM_TASK_QUEUE,
+    WORKFLOW_VNF_INSTANTIATE,
+)
+from temporalio import activity, workflow
+from temporalio.client import WorkflowFailureError
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
+from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
+
+
+class TestException(Exception):
+    pass
+
+
+@activity.defn(name=ACTIVITY_CREATE_MODEL)
+async def mock_create_model(create_model_input: ModelInfo) -> None:
+    pass
+
+
+@activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS)
+async def mock_get_vnf_record_ids(
+    get_vnf_record_ids_input: GetVnfRecordIdsInput,
+) -> None:
+    return GetVnfRecordIdsOutput(
+        vnfr_ids=[
+            "828d91ee-fa04-43bb-8471-f66ea74597e7",
+            "c4bbeb41-df7e-4daa-863d-c8fd29fac96d",
+        ]
+    )
+
+
+@activity.defn(name=ACTIVITY_CREATE_MODEL)
+async def mock_create_model_raises(create_model_input: ModelInfo) -> None:
+    raise TestException("Test exception")
+
+
+@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=False)
+class MockVnfInstantiateWorkflow:
+    @workflow.run
+    async def run(self, input: VnfInstantiateInput) -> None:
+        pass
+
+
+class TestNsInstantiateWorkflow(asynctest.TestCase):
+    input = NsLcmOperationInput(
+        nslcmop={
+            "_id": "1234",
+            "nsInstanceId": "5678",
+            "operationParams": {"vimAccountId": "9876"},
+        }
+    )
+
+    @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
+    async def mock_update_lcm_operation_state(
+        self,
+        data: UpdateLcmOperationStateInput,
+    ) -> None:
+        self.mock_update_lcm_operation_state_tracker(data)
+
+    @activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
+    async def mock_update_ns_state(self, data: UpdateNsStateInput) -> None:
+        self.mock_update_ns_state_tracker(data)
+
+    async def setUp(self):
+        self.env = await WorkflowEnvironment.start_time_skipping()
+        self.mock_update_lcm_operation_state_tracker = Mock()
+        self.mock_update_ns_state_tracker = Mock()
+
+    async def test_instantiate_workflow(self):
+        async with self.env as env:
+            async with Worker(
+                env.client,
+                task_queue=LCM_TASK_QUEUE,
+                workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow],
+                activities=[
+                    mock_create_model,
+                    mock_get_vnf_record_ids,
+                    self.mock_update_ns_state,
+                    self.mock_update_lcm_operation_state,
+                ],
+                debug_mode=True,
+            ):
+                await env.client.execute_workflow(
+                    NsInstantiateWorkflow,
+                    arg=self.input,
+                    id=self.input.nslcmop["nsInstanceId"],
+                    task_queue=LCM_TASK_QUEUE,
+                )
+
+        assert self.mock_update_lcm_operation_state_tracker.call_count == 2
+        assert_lcm_op_states(
+            self.mock_update_lcm_operation_state_tracker.call_args_list,
+            [LcmOperationState.PROCESSING, LcmOperationState.COMPLETED],
+        )
+        assert_ns_states(
+            self.mock_update_ns_state_tracker.call_args_list, [NsState.INSTANTIATED]
+        )
+
+    async def test_instantiate_workflow_with_exception_updates_lcm_operation(self):
+        async with self.env as env:
+            async with Worker(
+                env.client,
+                task_queue=LCM_TASK_QUEUE,
+                workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow],
+                activities=[
+                    mock_create_model_raises,
+                    mock_get_vnf_record_ids,
+                    self.mock_update_ns_state,
+                    self.mock_update_lcm_operation_state,
+                ],
+                debug_mode=True,
+            ):
+                # TODO: Check this WorkflowFailureError is caused by a TestException
+                with self.assertRaises(WorkflowFailureError):
+                    await env.client.execute_workflow(
+                        NsInstantiateWorkflow,
+                        arg=self.input,
+                        id=self.input.nslcmop["nsInstanceId"],
+                        task_queue=LCM_TASK_QUEUE,
+                    )
+        assert self.mock_update_lcm_operation_state_tracker.call_count == 2
+        assert_lcm_op_states(
+            self.mock_update_lcm_operation_state_tracker.call_args_list,
+            [LcmOperationState.PROCESSING, LcmOperationState.FAILED],
+        )
+
+    async def test_instantiate_workflow_with_exception_updates_ns_state(self):
+        async with self.env as env:
+            async with Worker(
+                env.client,
+                task_queue=LCM_TASK_QUEUE,
+                workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow],
+                activities=[
+                    mock_create_model_raises,
+                    mock_get_vnf_record_ids,
+                    self.mock_update_ns_state,
+                    self.mock_update_lcm_operation_state,
+                ],
+                debug_mode=True,
+            ):
+                # TODO: Check this WorkflowFailureError is caused by a TestException
+                with self.assertRaises(WorkflowFailureError):
+                    await env.client.execute_workflow(
+                        NsInstantiateWorkflow,
+                        arg=self.input,
+                        id=self.input.nslcmop["nsInstanceId"],
+                        task_queue=LCM_TASK_QUEUE,
+                    )
+        assert self.mock_update_ns_state_tracker.call_count == 1
+        assert_ns_states(
+            self.mock_update_ns_state_tracker.call_args_list, [NsState.INSTANTIATED]
+        )
+
+
+def assert_ns_states(call_args_list, expected_states):
+    """Asserts that the NS state was set to each of the expected states (in order)."""
+    assert [call.args[0].state for call in call_args_list] == expected_states
+
+
+def assert_lcm_op_states(call_args_list, expected_states):
+    """Asserts that the LCM operation was set to each of the exepcted states (in order)."""
+    assert [call.args[0].op_state for call in call_args_list] == expected_states