diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py
index ea08f9d..570e6bb 100644
--- a/osm_lcm/temporal/ns_workflows.py
+++ b/osm_lcm/temporal/ns_workflows.py
@@ -15,24 +15,41 @@
 # limitations under the License.
 
 import asyncio
+from datetime import timedelta
 import traceback
 
 from osm_common.temporal.activities.paas import CreateModel
 from osm_common.temporal.activities.ns import (
+    DeleteNsRecord,
     GetVnfDetails,
     GetNsRecord,
     UpdateNsState,
 )
 from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
-from osm_common.temporal.workflows.ns import NsInstantiateWorkflow
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow
+from osm_common.temporal.workflows.ns import (
+    NsInstantiateWorkflow,
+    NsDeleteRecordsWorkflow,
+)
+from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
+from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
 from osm_common.temporal.states import NsState
 from temporalio import workflow
+from temporalio.common import RetryPolicy
 from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError, ChildWorkflowError
+from temporalio.exceptions import (
+    ActivityError,
+    ApplicationError,
+    ChildWorkflowError,
+    FailureError,
+)
+
+_SANDBOXED = False
+retry_policy = RetryPolicy(maximum_attempts=3)
+no_retry_policy = RetryPolicy(maximum_attempts=1)
+default_schedule_to_close_timeout = timedelta(minutes=10)
 
 
-@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=False)
+@workflow.defn(name=NsInstantiateWorkflow.__name__, sandboxed=_SANDBOXED)
 class NsInstantiateWorkflowImpl(LcmOperationWorkflow):
     @workflow.run
     async def wrap_nslcmop(self, workflow_input: NsInstantiateWorkflow.Input) -> None:
@@ -152,3 +169,71 @@
             if vnf_config.get("member-vnf-index") == vnf_member_index_ref:
                 return vnf_config
         return {}
+
+
+@workflow.defn(name=NsDeleteRecordsWorkflow.__name__, sandboxed=_SANDBOXED)
+class NsDeleteRecordsWorkflowImpl(NsDeleteRecordsWorkflow):
+    @workflow.run
+    async def run(self, workflow_input: NsDeleteRecordsWorkflow.Input) -> None:
+        ns_uuid = workflow_input.ns_uuid
+        try:
+            nsr = value_to_type(
+                GetNsRecord.Output,
+                await workflow.execute_activity(
+                    activity=GetNsRecord.__name__,
+                    arg=GetNsRecord.Input(ns_uuid),
+                    activity_id=f"{GetNsRecord.__name__}-{ns_uuid}",
+                    task_queue=LCM_TASK_QUEUE,
+                    schedule_to_close_timeout=default_schedule_to_close_timeout,
+                    retry_policy=retry_policy,
+                ),
+            ).nsr
+            instantiation_state = nsr.get("nsState")
+            if instantiation_state != NsState.NOT_INSTANTIATED.name:
+                raise ApplicationError(
+                    f"NS must be in {NsState.NOT_INSTANTIATED.name} state",
+                    non_retryable=True,
+                )
+            vnf_details = value_to_type(
+                GetVnfDetails.Output,
+                await workflow.execute_activity(
+                    activity=GetVnfDetails.__name__,
+                    arg=GetVnfDetails.Input(ns_uuid=ns_uuid),
+                    activity_id=f"{GetVnfDetails.__name__}-{ns_uuid}",
+                    schedule_to_close_timeout=default_schedule_to_close_timeout,
+                    retry_policy=retry_policy,
+                ),
+            )
+
+            await asyncio.gather(
+                *(
+                    workflow.execute_child_workflow(
+                        workflow=VnfDeleteWorkflow.__name__,
+                        arg=VnfDeleteWorkflow.Input(
+                            vnfr_uuid=vnfr_uuid,
+                        ),
+                        id=f"{VnfDeleteWorkflow.__name__}-{vnfr_uuid}",
+                    )
+                    for vnfr_uuid, _ in vnf_details.vnf_details
+                )
+            )
+            await workflow.execute_activity(
+                activity=DeleteNsRecord.__name__,
+                arg=DeleteNsRecord.Input(ns_uuid),
+                activity_id=f"{DeleteNsRecord.__name__}-{ns_uuid}",
+                task_queue=LCM_TASK_QUEUE,
+                schedule_to_close_timeout=default_schedule_to_close_timeout,
+                retry_policy=retry_policy,
+            )
+
+        except FailureError as e:
+            if not hasattr(e, "cause") or e.cause is None:
+                self.logger.error(
+                    f"{NsDeleteRecordsWorkflow.__name__} failed with {str(e)}"
+                )
+                raise e
+            err_details = str(e.cause.with_traceback(e.__traceback__))
+            self.logger.error(
+                f"{NsDeleteRecordsWorkflow.__name__} failed with {err_details}"
+            )
+            raise e
diff --git a/osm_lcm/tests/test_ns_workflows.py b/osm_lcm/tests/test_ns_workflows.py
index 56852df..77a05bf 100644
--- a/osm_lcm/tests/test_ns_workflows.py
+++ b/osm_lcm/tests/test_ns_workflows.py
@@ -16,7 +16,9 @@
 import asynctest
 import copy
 from datetime import timedelta
+from typing import Any
 from unittest.mock import Mock, patch
+import uuid
 
 from osm_common.temporal_task_queues.task_queues_mappings import LCM_TASK_QUEUE
 from osm_common.temporal.activities.lcm import (
@@ -26,14 +28,17 @@
     GetNsRecord,
     GetVnfDetails,
     UpdateNsState,
+    DeleteNsRecord,
 )
 from osm_common.temporal.activities.paas import (
     CreateModel,
 )
-from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow
+from osm_common.temporal.workflows.vnf import VnfInstantiateWorkflow, VnfDeleteWorkflow
 from osm_common.temporal.workflows.lcm import LcmOperationWorkflow
 from osm_common.temporal.states import LcmOperationState, NsState
 from osm_lcm.temporal.ns_workflows import (
+    NsDeleteRecordsWorkflow,
+    NsDeleteRecordsWorkflowImpl,
     NsInstantiateWorkflow,
     NsInstantiateWorkflowImpl,
 )
@@ -42,6 +47,7 @@
 from temporalio.common import RetryPolicy
 from temporalio.exceptions import (
     ActivityError,
+    ApplicationError,
     ChildWorkflowError,
     RetryState,
     TimeoutError,
@@ -106,6 +112,20 @@
     pass
 
 
+class MockActivity:
+    def __init__(self, *args: Any, **kwargs: Any):
+        self.tracker = Mock(return_value=None)
+
+    async def __call__(self, *args: Any, **kwargs: Any) -> Any:
+        return self.tracker(*args, **kwargs)
+
+    def __setattr__(self, __name: str, __value: Any) -> None:
+        if __name in ("return_value", "side_effect"):
+            self.tracker.__setattr__(__name, __value)
+        else:
+            super().__setattr__(__name, __value)
+
+
 @activity.defn(name=CreateModel.__name__)
 async def mock_create_model(create_model_input: CreateModel.Input) -> None:
     pass
@@ -117,17 +137,17 @@
 
 
 @activity.defn(name=GetVnfDetails.__name__)
-async def mock_get_vnf_details(
-    get_vnf_details_input: GetVnfDetails.Input,
-) -> GetVnfDetails.Output:
-    return GetVnfDetails.Output(vnf_details=vnf_details)
+class MockGetVnfDetails(MockActivity):
+    def __init__(self, *args: Any, **kwargs: Any):
+        super().__init__(*args, **kwargs)
+        self.tracker.return_value = GetVnfDetails.Output(vnf_details=vnf_details)
 
 
 @activity.defn(name=GetVnfDetails.__name__)
-async def mock_get_vnf_details_raises(
-    et_vnf_details_input: GetVnfDetails.Input,
-) -> GetVnfDetails.Output:
-    raise TestException(f"{GetVnfDetails.__name__} failed.")
+class MockGetVnfDetailsRaises(MockActivity):
+    def __init__(self, *args: Any, **kwargs: Any):
+        super().__init__(*args, **kwargs)
+        self.tracker.side_effect = TestException(f"{GetVnfDetails.__name__} failed.")
 
 
 @activity.defn(name=GetNsRecord.__name__)
@@ -144,6 +164,24 @@
     raise TestException(f"{GetNsRecord.__name__} failed.")
 
 
+@activity.defn(name=GetNsRecord.__name__)
+async def mock_get_ns_record_not_instantiated_state(
+    get_ns_record_input: GetNsRecord.Input,
+) -> GetNsRecord.Output:
+    nsr = copy.deepcopy(sample_nsr)
+    nsr["nsState"] = NsState.NOT_INSTANTIATED.name
+    return GetNsRecord.Output(nsr=nsr)
+
+
+@activity.defn(name=GetNsRecord.__name__)
+async def mock_get_ns_record_instantiated_state(
+    get_ns_record_input: GetNsRecord.Input,
+) -> GetNsRecord.Output:
+    nsr = copy.deepcopy(sample_nsr)
+    nsr["nsState"] = NsState.INSTANTIATED.name
+    return GetNsRecord.Output(nsr=nsr)
+
+
 @workflow.defn(name=VnfInstantiateWorkflow.__name__, sandboxed=SANDBOXED)
 class MockVnfInstantiateWorkflow:
     @workflow.run
@@ -167,6 +205,29 @@
         )
 
 
+@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfDeleteWorkflow:
+    @workflow.run
+    async def run(self, input: VnfDeleteWorkflow.Input) -> None:
+        pass
+
+
+@workflow.defn(name=VnfDeleteWorkflow.__name__, sandboxed=SANDBOXED)
+class MockVnfDeleteWorkflowFailed:
+    @workflow.run
+    async def run(self, input: VnfDeleteWorkflow.Input) -> None:
+        raise ChildWorkflowError(
+            message=f"{VnfDeleteWorkflow.__name__} child workflow failed.",
+            namespace="default",
+            workflow_id="123",
+            run_id="1",
+            workflow_type=VnfDeleteWorkflow.__name__,
+            initiated_event_id=0,
+            started_event_id=0,
+            retry_state=RetryState.NON_RETRYABLE_FAILURE,
+        )
+
+
 @patch(
     "osm_lcm.temporal.ns_workflows.NsInstantiateWorkflowImpl._get_namespace",
     new=mock_get_namespace,
@@ -222,7 +283,7 @@
     async def test_instantiate_workflow__successful__update_lcm_op_state(self):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -239,7 +300,7 @@
     async def test_instantiate_workflow__successful__update_ns_state(self):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -257,7 +318,7 @@
     ):
         activities = [
             mock_create_model_raises,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -276,7 +337,7 @@
     ):
         activities = [
             mock_create_model_raises,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -296,7 +357,7 @@
     ):
         activities = [
             mock_create_model_raises,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -313,7 +374,7 @@
     ):
         activities = [
             mock_create_model_raises,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -333,7 +394,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record_raise_exception,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -353,7 +414,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record_raise_exception,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -370,7 +431,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details_raises,
+            MockGetVnfDetailsRaises(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -390,7 +451,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details_raises,
+            MockGetVnfDetailsRaises(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -409,7 +470,7 @@
         # Because of workflow task timeout policy, workflow times out
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -429,7 +490,7 @@
         # Workflow task failure
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -451,7 +512,7 @@
         # Workflow task failure
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -472,7 +533,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -491,7 +552,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -513,7 +574,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -534,7 +595,7 @@
     ):
         activities = [
             mock_create_model,
-            mock_get_vnf_details,
+            MockGetVnfDetails(),
             mock_get_ns_record,
             self.mock_update_ns_state,
             self.mock_update_lcm_operation_state,
@@ -604,3 +665,188 @@
             vnf_member_index_ref_2, sample_nsr
         )
         self.assertEqual(result, {})
+
+
+class TestNsDeleteRecordsWorkflow(asynctest.TestCase):
+    async def setUp(self):
+        self.env = await WorkflowEnvironment.start_time_skipping()
+        self.client = self.env.client
+        self.task_queue = LCM_TASK_QUEUE
+        self.wf_input = NsDeleteRecordsWorkflow.Input(ns_uuid=ns_uuid)
+        self.mock_delete_ns_record_tracker = Mock()
+
+    def get_worker(self, task_queue: str, workflows: list, activities: list) -> Worker:
+        return Worker(
+            self.client,
+            task_queue=task_queue,
+            workflows=workflows,
+            activities=activities,
+            debug_mode=DEBUG_MODE,
+        )
+
+    async def execute_workflow(self, wf_input):
+        return await self.client.execute_workflow(
+            NsDeleteRecordsWorkflowImpl.run,
+            arg=wf_input,
+            id=uuid.uuid4().hex,
+            task_queue=self.task_queue,
+            task_timeout=TASK_TIMEOUT,
+            execution_timeout=EXECUTION_TIMEOUT,
+        )
+
+    def assert_ns_record_is_called(self):
+        self.assertEqual(
+            self.mock_delete_ns_record_tracker.call_args_list[0].args[0],
+            DeleteNsRecord.Input(ns_uuid=ns_uuid),
+        )
+
+    @activity.defn(name=DeleteNsRecord.__name__)
+    async def mock_delete_ns_record(
+        self,
+        delete_ns_record_input: DeleteNsRecord.Input,
+    ) -> None:
+        self.mock_delete_ns_record_tracker(delete_ns_record_input)
+
+    @activity.defn(name=DeleteNsRecord.__name__)
+    async def mock_delete_ns_record_raises_exception(
+        self,
+        delete_ns_record_input: DeleteNsRecord.Input,
+    ) -> None:
+        self.mock_delete_ns_record_tracker(delete_ns_record_input)
+        raise TestException(f"{DeleteNsRecord.__name__} failed.")
+
+    async def test_delete_ns_records_workflow__successful(self):
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_ns_record_not_instantiated_state,
+            self.mock_delete_ns_record,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            await self.execute_workflow(self.wf_input)
+            self.assert_ns_record_is_called()
+
+    @patch(
+        "temporalio.workflow.execute_child_workflow",
+        wraps=workflow.execute_child_workflow,
+    )
+    async def test_ns_terminate_workflow__successful__executes_vnf_delete_child_workflows_concurrently(
+        self,
+        mock_execute_child_workflow: asynctest.CoroutineMock,
+    ):
+        # Choose a number of child workflows that would take longer to execute than the execution timeout
+        num_child_workflows = EXECUTION_TIMEOUT.seconds * 2 + 1
+        mock_get_vnf_details = MockGetVnfDetails()
+        mock_get_vnf_details.return_value = GetVnfDetails.Output(
+            vnf_details=[
+                (f"vnfr-{uuid.uuid4().hex}", vnf_member_index_ref_1)
+                for _ in range(num_child_workflows)
+            ]
+        )
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+        activities = [
+            mock_get_vnf_details,
+            mock_get_ns_record_not_instantiated_state,
+            self.mock_delete_ns_record,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            await self.execute_workflow(self.wf_input)
+        self.assert_ns_record_is_called()
+        self.assertEqual(mock_execute_child_workflow.call_count, num_child_workflows)
+        # Check that VnfDeleteWorkflow is executed
+        for i in range(0, num_child_workflows):
+            call_args = mock_execute_child_workflow.call_args_list[i].kwargs["workflow"]
+            self.assertEqual(call_args, VnfDeleteWorkflow.__name__)
+
+    async def test_delete_ns_records_workflow__instantiated_state__fails(self):
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_ns_record_instantiated_state,
+            self.mock_delete_ns_record,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertIsInstance(err.exception.cause, ApplicationError)
+            self.assertEqual(
+                str(err.exception.cause),
+                "NS must be in NOT_INSTANTIATED state",
+            )
+            self.mock_delete_ns_record_tracker.assert_not_called()
+
+    async def test_delete_ns_records_workflow__get_vnf_details_fails__workflow_fails(
+        self,
+    ):
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+        activities = [
+            MockGetVnfDetailsRaises(),
+            mock_get_ns_record_not_instantiated_state,
+            self.mock_delete_ns_record,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertIsInstance(err.exception.cause, ActivityError)
+            self.assertEqual(
+                str(err.exception.cause.cause),
+                f"TestException: {GetVnfDetails.__name__} failed.",
+            )
+            self.mock_delete_ns_record_tracker.assert_not_called()
+
+    async def test_delete_ns_records_workflow__get_ns_record_fails__workflow_fails(
+        self,
+    ):
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_ns_record_raise_exception,
+            self.mock_delete_ns_record,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertIsInstance(err.exception.cause, ActivityError)
+            self.assertEqual(
+                str(err.exception.cause.cause),
+                f"TestException: {GetNsRecord.__name__} failed.",
+            )
+            self.mock_delete_ns_record_tracker.assert_not_called()
+
+    async def test_delete_ns_records_workflow__delete_ns_record_fails__workflow_fails(
+        self,
+    ):
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflow]
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_ns_record_not_instantiated_state,
+            self.mock_delete_ns_record_raises_exception,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertIsInstance(err.exception.cause, ActivityError)
+            self.assertEqual(
+                str(err.exception.cause.cause),
+                f"TestException: {DeleteNsRecord.__name__} failed.",
+            )
+            self.mock_delete_ns_record_tracker.assert_called()
+
+    async def test_delete_ns_records_workflow__vnf_delete_workflow_failed__workflow_fails(
+        self,
+    ):
+        workflows = [NsDeleteRecordsWorkflowImpl, MockVnfDeleteWorkflowFailed]
+        activities = [
+            MockGetVnfDetails(),
+            mock_get_ns_record_not_instantiated_state,
+            self.mock_delete_ns_record,
+        ]
+        async with self.env, self.get_worker(self.task_queue, workflows, activities):
+            with self.assertRaises(WorkflowFailureError) as err:
+                await self.execute_workflow(self.wf_input)
+            self.assertIsInstance(err.exception.cause, ChildWorkflowError)
+            self.assertEqual(
+                str(err.exception.cause.cause),
+                "VnfDeleteWorkflow child workflow failed.",
+            )
+            self.mock_delete_ns_record_tracker.assert_not_called()
