.pydevproject
pyvenv.cfg
venv/
-.idea
\ No newline at end of file
+.idea
+deb_dist
import logging.handlers
import os
import sys
-import yaml
+from os import path
+import yaml
from osm_common.dbbase import DbException
from osm_common.temporal_constants import LCM_TASK_QUEUE
+from temporalio.client import Client
+from temporalio.worker import Worker
+
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.lcm_config import LcmCfg
from osm_lcm.lcm_utils import LcmException
-from os import path
+from osm_lcm.temporal.juju_paas_activities import JujuPaasConnector
from osm_lcm.temporal.lcm_activities import NsLcmActivity
from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflow
+from osm_lcm.temporal.ns_activities import NsDbActivity, NsOperations
+from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
+from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
from osm_lcm.temporal.vim_activities import VimDbActivity
-from osm_lcm.temporal.juju_paas_activities import JujuPaasConnector
from osm_lcm.temporal.vim_workflows import (
VimCreateWorkflow,
VimDeleteWorkflow,
VimUpdateWorkflow,
)
-from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow
-from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow
from osm_lcm.temporal.vnf_activities import (
VnfDbActivity,
VnfOperations,
VnfSendNotifications,
)
-from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
-from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity
-from temporalio.client import Client
-from temporalio.worker import Worker
+from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow
class NGLcm:
VnfPrepareWorkflow,
]
activities = [
- ns_data_activity_instance.get_model_info,
ns_data_activity_instance.update_ns_state,
- ns_operation_instance.check_ns_instantiate_finished,
- ns_operation_instance.deploy_ns,
+ ns_operation_instance.get_vnf_records,
nslcm_activity_instance.update_ns_lcm_operation_state,
nslcm_activity_instance.no_op,
paas_connector_instance.create_model,
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import traceback
from abc import ABC, abstractmethod
from datetime import timedelta
-import logging
-from temporalio import workflow
-from temporalio.common import RetryPolicy
-from temporalio.exceptions import ActivityError
+
from osm_common.dataclasses.temporal_dataclasses import (
+ LcmOperationState,
NsLcmOperationInput,
UpdateLcmOperationStateInput,
- LcmOperationState,
)
from osm_common.temporal_constants import (
+ ACTIVITY_NSLCM_NO_OP,
ACTIVITY_UPDATE_LCM_OPERATION_STATE,
WORKFLOW_NSLCM_NO_OP,
- ACTIVITY_NSLCM_NO_OP,
)
-import traceback
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+from temporalio.exceptions import ActivityError
class LcmOperationWorkflow(ABC):
def __init__(self):
self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}")
- self.op_state = self.op_state = UpdateLcmOperationStateInput(
- op_id=None,
- op_state=None,
- detailed_status="",
- error_message="",
- stage="",
- )
self.op_id = None
+ self.stage = ""
@abstractmethod
async def workflow(self, input: NsLcmOperationInput):
async def wrap_nslcmop(self, input: NsLcmOperationInput):
self.op_id = input.nslcmop["_id"]
- self.op_state.op_id = self.op_id
- self.op_state.op_state = LcmOperationState.PROCESSING
- await self.update_operation_state()
-
+ await self.update_operation_state(LcmOperationState.PROCESSING)
try:
await self.workflow(input=input)
- self.op_state.op_state = LcmOperationState.COMPLETED
-
except ActivityError as e:
# TODO: Deteremine the best content for Activity Errors OSM-994
self.logger.exception(e)
- self.op_state.op_state = LcmOperationState.FAILED
- self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__))
- self.op_state.error_message = str(e.cause.message)
+ await self.update_operation_state(
+ LcmOperationState.FAILED,
+ error_message=str(e.cause.message),
+ detailed_status=str(e.cause.with_traceback(e.__traceback__)),
+ )
raise e
-
except Exception as e:
self.logger.exception(e)
- self.op_state.op_state = LcmOperationState.FAILED
- self.op_state.detailed_status = traceback.format_exc()
- self.op_state.error_message = str(e)
+ self.update_operation_state(
+ LcmOperationState.FAILED,
+ error_message=str(e),
+ detailed_status=traceback.format_exc(),
+ )
raise e
-
- finally:
- await self.update_operation_state()
-
- async def update_operation_state(self) -> None:
+ await self.update_operation_state(LcmOperationState.COMPLETED)
+
+ async def update_operation_state(
+ self,
+ op_state: LcmOperationState,
+ stage: str = None,
+ error_message: str = "",
+ detailed_status: str = "",
+ ) -> None:
+ if stage is not None:
+ self.stage = stage
+ input = UpdateLcmOperationStateInput(
+ op_id=self.op_id,
+ op_state=op_state,
+ stage=self.stage,
+ error_message=error_message,
+ detailed_status=detailed_status,
+ )
await workflow.execute_activity(
activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE,
- arg=self.op_state,
+ arg=input,
activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}",
schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
retry_policy=LcmOperationWorkflow.retry_policy,
import logging
from time import time
-from temporalio import activity
-from osm_common.temporal_constants import (
- ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
- ACTIVITY_DEPLOY_NS,
- ACTIVITY_GET_MODEL_INFO,
- ACTIVITY_UPDATE_NS_STATE,
-)
+
from osm_common.dataclasses.temporal_dataclasses import (
- ModelInfo,
- NsInstantiateInput,
+ GetVnfRecordIdsInput,
+ GetVnfRecordIdsOutput,
UpdateNsStateInput,
- VduInstantiateInput,
)
-from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils
+from osm_common.temporal_constants import (
+ ACTIVITY_GET_VNF_RECORD_IDS,
+ ACTIVITY_UPDATE_NS_STATE,
+)
+from temporalio import activity
class NsOperations:
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- # TODO: Change to a workflow OSM-990
- @activity.defn(name=ACTIVITY_DEPLOY_NS)
- async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None:
- vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid})
- for vnfr in vnfrs:
- await self.deploy_vnf(vnfr)
-
- async def deploy_vnf(self, vnfr: dict):
- vim_id = vnfr.get("vim-account-id")
- model_name = "model-name"
- vnfd_id = vnfr["vnfd-id"]
- vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
- sw_image_descs = vnfd.get("sw-image-desc")
- for vdu in vnfd.get("vdu"):
- await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name)
-
- async def deploy_vdu(
- self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str
- ) -> None:
- vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs)
- vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info)
- workflow_id = (
- vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name
- )
- self.logger.info(f"TODO: Start VDU Workflow {workflow_id}")
-
- @activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED)
- async def check_ns_instantiate_finished(
- self, ns_instantiate: NsInstantiateInput
- ) -> None:
- # TODO: Implement OSM-993
- pass
+ @activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS)
+ async def get_vnf_records(
+ self, get_vnf_records_input: GetVnfRecordIdsInput
+ ) -> GetVnfRecordIdsOutput:
+ # TODO: [OSMENG-1043] Implement Get VNF Records
+ return GetVnfRecordIdsOutput(vnfr_ids=[""])
class NsDbActivity:
self.db = db
self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}")
- @activity.defn(name=ACTIVITY_GET_MODEL_INFO)
- async def get_model_info(
- self, ns_instantiate_input: NsInstantiateInput
- ) -> ModelInfo:
- """Returns a ModelInfo. Contains VIM ID and model name.
-
- Collaborators:
- DB Read: nsrs
-
- Raises (Retryable):
- DbException If the target DB record does not exist or DB is not reachable.
-
- Activity Lifecycle:
- This activity will not report a heartbeat due to its
- short-running nature.
-
- As this is a direct DB update, it is not recommended to have
- any specific retry policy
-
- """
- ns_uuid = ns_instantiate_input.ns_uuid
- nsr = self.db.get_one("nsrs", {"_id": ns_uuid})
- vim_uuid = nsr.get("datacenter")
- model_name = self._get_namespace(ns_uuid, vim_uuid)
- return ModelInfo(vim_uuid, model_name)
-
- @staticmethod
- def _get_namespace(ns_id: str, vim_id: str) -> str:
- """The NS namespace is the combination if the NS ID and the VIM ID."""
- return ns_id[-12:] + "-" + vim_id[-12:]
-
@activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
async def update_ns_state(self, data: UpdateNsStateInput) -> None:
"""
# See the License for the specific language governing permissions and
# limitations under the License.
-from temporalio import workflow
-from temporalio.converter import value_to_type
-from temporalio.exceptions import ActivityError
+import asyncio
from osm_common.dataclasses.temporal_dataclasses import (
+ GetVnfRecordIdsInput,
+ GetVnfRecordIdsOutput,
ModelInfo,
- NsInstantiateInput,
NsLcmOperationInput,
NsState,
UpdateNsStateInput,
+ VnfInstantiateInput,
)
from osm_common.temporal_constants import (
ACTIVITY_CREATE_MODEL,
- ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
- ACTIVITY_DEPLOY_NS,
- ACTIVITY_GET_MODEL_INFO,
+ ACTIVITY_GET_VNF_RECORD_IDS,
ACTIVITY_UPDATE_NS_STATE,
- LCM_TASK_QUEUE,
WORKFLOW_NS_INSTANTIATE,
+ WORKFLOW_VNF_INSTANTIATE,
)
+from temporalio import workflow
+from temporalio.converter import value_to_type
+from temporalio.exceptions import ActivityError
from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow
await super().wrap_nslcmop(input=input)
async def workflow(self, input: NsLcmOperationInput) -> None:
- # Temporary until we align on what is really needed where. For now, this
- # reduced set looks to be all that is needed.
- input = NsInstantiateInput(
- ns_uuid=input.nslcmop["nsInstanceId"], op_id=input.nslcmop["_id"]
- )
+ self.logger.info(f"Executing {WORKFLOW_NS_INSTANTIATE} with {input}")
- ns_state = UpdateNsStateInput(input.ns_uuid, NsState.INSTANTIATED, "Done")
+ # TODO: Can we clean up the input? Perhaps this workflow could receive NsInstantiateInput
+ # directly.
+ ns_uuid = input.nslcmop["nsInstanceId"]
+ vim_uuid = input.nslcmop["operationParams"]["vimAccountId"]
+ model_name = self._get_namespace(ns_uuid, vim_uuid)
try:
- model_info = value_to_type(
- ModelInfo,
- await workflow.execute_activity(
- activity=ACTIVITY_GET_MODEL_INFO,
- arg=input,
- activity_id=f"{ACTIVITY_GET_MODEL_INFO}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
- ),
- )
-
await workflow.execute_activity(
activity=ACTIVITY_CREATE_MODEL,
- arg=model_info,
- activity_id=f"{ACTIVITY_CREATE_MODEL}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
+ arg=ModelInfo(vim_uuid=vim_uuid, model_name=model_name),
+ activity_id=f"{ACTIVITY_CREATE_MODEL}-{ns_uuid}",
schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
retry_policy=LcmOperationWorkflow.no_retry_policy,
)
- # TODO: Change this to a workflow OSM-990
- await workflow.execute_activity(
- activity=ACTIVITY_DEPLOY_NS,
- arg=input,
- activity_id=f"{ACTIVITY_DEPLOY_NS}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.no_retry_policy,
+ # TODO: Provide the correct VNFR ids to the VNF instantiate workflow
+ vnf_record_ids_output: GetVnfRecordIdsOutput = value_to_type(
+ GetVnfRecordIdsOutput,
+ await workflow.execute_activity(
+ activity=ACTIVITY_GET_VNF_RECORD_IDS,
+ arg=GetVnfRecordIdsInput(ns_uuid=ns_uuid),
+ activity_id=f"{ACTIVITY_GET_VNF_RECORD_IDS}-{ns_uuid}",
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.no_retry_policy,
+ ),
)
-
- await workflow.execute_activity(
- activity=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED,
- arg=input,
- activity_id=f"{ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
+ asyncio.gather(
+ workflow.execute_child_workflow(
+ workflow=WORKFLOW_VNF_INSTANTIATE,
+ arg=VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name),
+ id=f"{WORKFLOW_VNF_INSTANTIATE}-{vnfr_uuid}",
+ )
+ for vnfr_uuid in vnf_record_ids_output.vnfr_ids
)
except ActivityError as e:
- ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, e.cause.message)
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, e.cause.message)
self.logger.error(
f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}"
)
raise e
except Exception as e:
- ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, str(e))
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, str(e))
self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}")
raise e
+ await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, "Done")
- finally:
- await workflow.execute_activity(
- activity=ACTIVITY_UPDATE_NS_STATE,
- arg=ns_state,
- activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{input.ns_uuid}",
- task_queue=LCM_TASK_QUEUE,
- schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
- retry_policy=LcmOperationWorkflow.retry_policy,
- )
+ @staticmethod
+ async def update_ns_state(
+ ns_uuid: str,
+ state: NsState,
+ message: str,
+ ) -> None:
+ input = UpdateNsStateInput(ns_uuid, state, message)
+ await workflow.execute_activity(
+ activity=ACTIVITY_UPDATE_NS_STATE,
+ arg=input,
+ activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{ns_uuid}",
+ schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout,
+ retry_policy=LcmOperationWorkflow.retry_policy,
+ )
+
+ def _get_namespace(self, ns_id: str, vim_id: str) -> str:
+ """The NS namespace is the combination if the NS ID and the VIM ID."""
+ return ns_id[-12:] + "-" + vim_id[-12:]
##
import logging
import tempfile
-from unittest.mock import Mock, patch, MagicMock, mock_open
from unittest import TestCase
+from unittest.mock import MagicMock, Mock, mock_open, patch
+from zipfile import BadZipfile
-from osm_common.msgkafka import MsgKafka
+import yaml
from osm_common import fslocal
+from osm_common.msgkafka import MsgKafka
+
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from osm_lcm.lcm_utils import LcmBase, LcmException
from osm_lcm.tests import test_db_descriptors as descriptors
-import yaml
-from zipfile import BadZipfile
-
tmpdir = tempfile.mkdtemp()[1]
tmpfile = tempfile.mkstemp()[1]
+++ /dev/null
-#######################################################################################
-# Copyright ETSI Contributors and Others.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import asynctest
-from osm_common.dataclasses.temporal_dataclasses import ModelInfo, NsInstantiateInput
-from osm_common.dbbase import DbException
-from osm_lcm.temporal.ns_activities import NsDbActivity
-from temporalio.testing import ActivityEnvironment
-from unittest.mock import Mock
-
-
-vim_id = "some-vim-uuid"
-ns_id = "0123456789-9876543210"
-nsr = {
- "_id": ns_id,
- "datacenter": vim_id,
-}
-input = NsInstantiateInput(ns_id, "op_id")
-expected_namespace = "9-9876543210-ome-vim-uuid"
-expected_model_info = ModelInfo(vim_id, expected_namespace)
-
-
-class TestGetModelInfo(asynctest.TestCase):
- def setUp(self):
- self.db = Mock()
- self.env = ActivityEnvironment()
- self.ns_db_activity = NsDbActivity(self.db)
-
- async def test_nominal_case(self):
- self.db.get_one.return_value = nsr
- model_info = await self.env.run(self.ns_db_activity.get_model_info, input)
- self.assertEqual(model_info, expected_model_info)
-
- async def test_db_raises_exception(self):
- self.db.get_one.side_effect = DbException("not found")
- with self.assertRaises(DbException):
- model_info = await self.env.run(self.ns_db_activity.get_model_info, input)
- self.assertIsNone(model_info)
-
- async def test_no_datacenter_raises_exception(self):
- nsr = {"_id": ns_id}
- self.db.get_one.return_value = nsr
- with self.assertRaises(TypeError):
- model_info = await self.env.run(self.ns_db_activity.get_model_info, input)
- self.assertIsNone(model_info)
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from unittest.mock import Mock
+
+import asynctest
+from osm_common.dataclasses.temporal_dataclasses import (
+ GetVnfRecordIdsInput,
+ GetVnfRecordIdsOutput,
+ LcmOperationState,
+ ModelInfo,
+ NsLcmOperationInput,
+ NsState,
+ UpdateLcmOperationStateInput,
+ UpdateNsStateInput,
+ VnfInstantiateInput,
+)
+from osm_common.temporal_constants import (
+ ACTIVITY_CREATE_MODEL,
+ ACTIVITY_GET_VNF_RECORD_IDS,
+ ACTIVITY_UPDATE_LCM_OPERATION_STATE,
+ ACTIVITY_UPDATE_NS_STATE,
+ LCM_TASK_QUEUE,
+ WORKFLOW_VNF_INSTANTIATE,
+)
+from temporalio import activity, workflow
+from temporalio.client import WorkflowFailureError
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
+from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow
+
+
+class TestException(Exception):
+ pass
+
+
+@activity.defn(name=ACTIVITY_CREATE_MODEL)
+async def mock_create_model(create_model_input: ModelInfo) -> None:
+ pass
+
+
+@activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS)
+async def mock_get_vnf_record_ids(
+ get_vnf_record_ids_input: GetVnfRecordIdsInput,
+) -> None:
+ return GetVnfRecordIdsOutput(
+ vnfr_ids=[
+ "828d91ee-fa04-43bb-8471-f66ea74597e7",
+ "c4bbeb41-df7e-4daa-863d-c8fd29fac96d",
+ ]
+ )
+
+
+@activity.defn(name=ACTIVITY_CREATE_MODEL)
+async def mock_create_model_raises(create_model_input: ModelInfo) -> None:
+ raise TestException("Test exception")
+
+
+@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=False)
+class MockVnfInstantiateWorkflow:
+ @workflow.run
+ async def run(self, input: VnfInstantiateInput) -> None:
+ pass
+
+
+class TestNsInstantiateWorkflow(asynctest.TestCase):
+ input = NsLcmOperationInput(
+ nslcmop={
+ "_id": "1234",
+ "nsInstanceId": "5678",
+ "operationParams": {"vimAccountId": "9876"},
+ }
+ )
+
+ @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE)
+ async def mock_update_lcm_operation_state(
+ self,
+ data: UpdateLcmOperationStateInput,
+ ) -> None:
+ self.mock_update_lcm_operation_state_tracker(data)
+
+ @activity.defn(name=ACTIVITY_UPDATE_NS_STATE)
+ async def mock_update_ns_state(self, data: UpdateNsStateInput) -> None:
+ self.mock_update_ns_state_tracker(data)
+
+ async def setUp(self):
+ self.env = await WorkflowEnvironment.start_time_skipping()
+ self.mock_update_lcm_operation_state_tracker = Mock()
+ self.mock_update_ns_state_tracker = Mock()
+
+ async def test_instantiate_workflow(self):
+ async with self.env as env:
+ async with Worker(
+ env.client,
+ task_queue=LCM_TASK_QUEUE,
+ workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow],
+ activities=[
+ mock_create_model,
+ mock_get_vnf_record_ids,
+ self.mock_update_ns_state,
+ self.mock_update_lcm_operation_state,
+ ],
+ debug_mode=True,
+ ):
+ await env.client.execute_workflow(
+ NsInstantiateWorkflow,
+ arg=self.input,
+ id=self.input.nslcmop["nsInstanceId"],
+ task_queue=LCM_TASK_QUEUE,
+ )
+
+ assert self.mock_update_lcm_operation_state_tracker.call_count == 2
+ assert_lcm_op_states(
+ self.mock_update_lcm_operation_state_tracker.call_args_list,
+ [LcmOperationState.PROCESSING, LcmOperationState.COMPLETED],
+ )
+ assert_ns_states(
+ self.mock_update_ns_state_tracker.call_args_list, [NsState.INSTANTIATED]
+ )
+
+ async def test_instantiate_workflow_with_exception_updates_lcm_operation(self):
+ async with self.env as env:
+ async with Worker(
+ env.client,
+ task_queue=LCM_TASK_QUEUE,
+ workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow],
+ activities=[
+ mock_create_model_raises,
+ mock_get_vnf_record_ids,
+ self.mock_update_ns_state,
+ self.mock_update_lcm_operation_state,
+ ],
+ debug_mode=True,
+ ):
+ # TODO: Check this WorkflowFailureError is caused by a TestException
+ with self.assertRaises(WorkflowFailureError):
+ await env.client.execute_workflow(
+ NsInstantiateWorkflow,
+ arg=self.input,
+ id=self.input.nslcmop["nsInstanceId"],
+ task_queue=LCM_TASK_QUEUE,
+ )
+ assert self.mock_update_lcm_operation_state_tracker.call_count == 2
+ assert_lcm_op_states(
+ self.mock_update_lcm_operation_state_tracker.call_args_list,
+ [LcmOperationState.PROCESSING, LcmOperationState.FAILED],
+ )
+
+ async def test_instantiate_workflow_with_exception_updates_ns_state(self):
+ async with self.env as env:
+ async with Worker(
+ env.client,
+ task_queue=LCM_TASK_QUEUE,
+ workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow],
+ activities=[
+ mock_create_model_raises,
+ mock_get_vnf_record_ids,
+ self.mock_update_ns_state,
+ self.mock_update_lcm_operation_state,
+ ],
+ debug_mode=True,
+ ):
+ # TODO: Check this WorkflowFailureError is caused by a TestException
+ with self.assertRaises(WorkflowFailureError):
+ await env.client.execute_workflow(
+ NsInstantiateWorkflow,
+ arg=self.input,
+ id=self.input.nslcmop["nsInstanceId"],
+ task_queue=LCM_TASK_QUEUE,
+ )
+ assert self.mock_update_ns_state_tracker.call_count == 1
+ assert_ns_states(
+ self.mock_update_ns_state_tracker.call_args_list, [NsState.INSTANTIATED]
+ )
+
+
+def assert_ns_states(call_args_list, expected_states):
+ """Asserts that the NS state was set to each of the expected states (in order)."""
+ assert [call.args[0].state for call in call_args_list] == expected_states
+
+
+def assert_lcm_op_states(call_args_list, expected_states):
+ """Asserts that the LCM operation was set to each of the exepcted states (in order)."""
+ assert [call.args[0].op_state for call in call_args_list] == expected_states