From: Daniel Arndt Date: Tue, 18 Apr 2023 14:26:16 +0000 (-0300) Subject: Implement instantiate NS workflow X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=c56d33668a12b0071343f3f210a8eb93889cfba8;p=osm%2FLCM.git Implement instantiate NS workflow Change-Id: I61defdc64865396cd6af4a20ffb67443450bd742 Signed-off-by: Daniel Arndt Signed-off-by: Mark Beierl --- diff --git a/.gitignore b/.gitignore index 4517ab1..9df0136 100644 --- a/.gitignore +++ b/.gitignore @@ -32,4 +32,5 @@ nosetests.xml .pydevproject pyvenv.cfg venv/ -.idea \ No newline at end of file +.idea +deb_dist diff --git a/osm_lcm/nglcm.py b/osm_lcm/nglcm.py index a3c57b3..5ddae3f 100644 --- a/osm_lcm/nglcm.py +++ b/osm_lcm/nglcm.py @@ -22,34 +22,35 @@ import logging import logging.handlers import os import sys -import yaml +from os import path +import yaml from osm_common.dbbase import DbException from osm_common.temporal_constants import LCM_TASK_QUEUE +from temporalio.client import Client +from temporalio.worker import Worker + from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.lcm_config import LcmCfg from osm_lcm.lcm_utils import LcmException -from os import path +from osm_lcm.temporal.juju_paas_activities import JujuPaasConnector from osm_lcm.temporal.lcm_activities import NsLcmActivity from osm_lcm.temporal.lcm_workflows import NsNoOpWorkflow +from osm_lcm.temporal.ns_activities import NsDbActivity, NsOperations +from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow +from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow from osm_lcm.temporal.vim_activities import VimDbActivity -from osm_lcm.temporal.juju_paas_activities import JujuPaasConnector from osm_lcm.temporal.vim_workflows import ( VimCreateWorkflow, VimDeleteWorkflow, VimUpdateWorkflow, ) -from osm_lcm.temporal.vdu_workflows import VduInstantiateWorkflow -from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow from osm_lcm.temporal.vnf_activities import ( VnfDbActivity, VnfOperations, VnfSendNotifications, ) -from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow -from osm_lcm.temporal.ns_activities import NsOperations, NsDbActivity -from temporalio.client import Client -from temporalio.worker import Worker +from osm_lcm.temporal.vnf_workflows import VnfInstantiateWorkflow, VnfPrepareWorkflow class NGLcm: @@ -159,10 +160,8 @@ class NGLcm: VnfPrepareWorkflow, ] activities = [ - ns_data_activity_instance.get_model_info, ns_data_activity_instance.update_ns_state, - ns_operation_instance.check_ns_instantiate_finished, - ns_operation_instance.deploy_ns, + ns_operation_instance.get_vnf_records, nslcm_activity_instance.update_ns_lcm_operation_state, nslcm_activity_instance.no_op, paas_connector_instance.create_model, diff --git a/osm_lcm/temporal/lcm_workflows.py b/osm_lcm/temporal/lcm_workflows.py index 837e586..9ab8e5d 100644 --- a/osm_lcm/temporal/lcm_workflows.py +++ b/osm_lcm/temporal/lcm_workflows.py @@ -14,23 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import traceback from abc import ABC, abstractmethod from datetime import timedelta -import logging -from temporalio import workflow -from temporalio.common import RetryPolicy -from temporalio.exceptions import ActivityError + from osm_common.dataclasses.temporal_dataclasses import ( + LcmOperationState, NsLcmOperationInput, UpdateLcmOperationStateInput, - LcmOperationState, ) from osm_common.temporal_constants import ( + ACTIVITY_NSLCM_NO_OP, ACTIVITY_UPDATE_LCM_OPERATION_STATE, WORKFLOW_NSLCM_NO_OP, - ACTIVITY_NSLCM_NO_OP, ) -import traceback +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError class LcmOperationWorkflow(ABC): @@ -60,14 +61,8 @@ class LcmOperationWorkflow(ABC): def __init__(self): self.logger = logging.getLogger(f"lcm.wfl.{self.__class__.__name__}") - self.op_state = self.op_state = UpdateLcmOperationStateInput( - op_id=None, - op_state=None, - detailed_status="", - error_message="", - stage="", - ) self.op_id = None + self.stage = "" @abstractmethod async def workflow(self, input: NsLcmOperationInput): @@ -75,36 +70,47 @@ class LcmOperationWorkflow(ABC): async def wrap_nslcmop(self, input: NsLcmOperationInput): self.op_id = input.nslcmop["_id"] - self.op_state.op_id = self.op_id - self.op_state.op_state = LcmOperationState.PROCESSING - await self.update_operation_state() - + await self.update_operation_state(LcmOperationState.PROCESSING) try: await self.workflow(input=input) - self.op_state.op_state = LcmOperationState.COMPLETED - except ActivityError as e: # TODO: Deteremine the best content for Activity Errors OSM-994 self.logger.exception(e) - self.op_state.op_state = LcmOperationState.FAILED - self.op_state.detailed_status = str(e.cause.with_traceback(e.__traceback__)) - self.op_state.error_message = str(e.cause.message) + await self.update_operation_state( + LcmOperationState.FAILED, + error_message=str(e.cause.message), + detailed_status=str(e.cause.with_traceback(e.__traceback__)), + ) raise e - except Exception as e: self.logger.exception(e) - self.op_state.op_state = LcmOperationState.FAILED - self.op_state.detailed_status = traceback.format_exc() - self.op_state.error_message = str(e) + self.update_operation_state( + LcmOperationState.FAILED, + error_message=str(e), + detailed_status=traceback.format_exc(), + ) raise e - - finally: - await self.update_operation_state() - - async def update_operation_state(self) -> None: + await self.update_operation_state(LcmOperationState.COMPLETED) + + async def update_operation_state( + self, + op_state: LcmOperationState, + stage: str = None, + error_message: str = "", + detailed_status: str = "", + ) -> None: + if stage is not None: + self.stage = stage + input = UpdateLcmOperationStateInput( + op_id=self.op_id, + op_state=op_state, + stage=self.stage, + error_message=error_message, + detailed_status=detailed_status, + ) await workflow.execute_activity( activity=ACTIVITY_UPDATE_LCM_OPERATION_STATE, - arg=self.op_state, + arg=input, activity_id=f"{ACTIVITY_UPDATE_LCM_OPERATION_STATE}-{self.op_id}", schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, retry_policy=LcmOperationWorkflow.retry_policy, diff --git a/osm_lcm/temporal/ns_activities.py b/osm_lcm/temporal/ns_activities.py index eee4325..3ea7dcf 100644 --- a/osm_lcm/temporal/ns_activities.py +++ b/osm_lcm/temporal/ns_activities.py @@ -16,20 +16,17 @@ import logging from time import time -from temporalio import activity -from osm_common.temporal_constants import ( - ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED, - ACTIVITY_DEPLOY_NS, - ACTIVITY_GET_MODEL_INFO, - ACTIVITY_UPDATE_NS_STATE, -) + from osm_common.dataclasses.temporal_dataclasses import ( - ModelInfo, - NsInstantiateInput, + GetVnfRecordIdsInput, + GetVnfRecordIdsOutput, UpdateNsStateInput, - VduInstantiateInput, ) -from osm_lcm.temporal.juju_paas_activities import CharmInfoUtils +from osm_common.temporal_constants import ( + ACTIVITY_GET_VNF_RECORD_IDS, + ACTIVITY_UPDATE_NS_STATE, +) +from temporalio import activity class NsOperations: @@ -37,38 +34,12 @@ class NsOperations: self.db = db self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") - # TODO: Change to a workflow OSM-990 - @activity.defn(name=ACTIVITY_DEPLOY_NS) - async def deploy_ns(self, ns_instantiate_input: NsInstantiateInput) -> None: - vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": ns_instantiate_input.ns_uuid}) - for vnfr in vnfrs: - await self.deploy_vnf(vnfr) - - async def deploy_vnf(self, vnfr: dict): - vim_id = vnfr.get("vim-account-id") - model_name = "model-name" - vnfd_id = vnfr["vnfd-id"] - vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) - sw_image_descs = vnfd.get("sw-image-desc") - for vdu in vnfd.get("vdu"): - await self.deploy_vdu(vdu, sw_image_descs, vim_id, model_name) - - async def deploy_vdu( - self, vdu: dict, sw_image_descs: str, vim_id: str, model_name: str - ) -> None: - vdu_info = CharmInfoUtils.get_charm_info(vdu, sw_image_descs) - vdu_instantiate_input = VduInstantiateInput(vim_id, model_name, vdu_info) - workflow_id = ( - vdu_instantiate_input.model_name + vdu_instantiate_input.charm_info.app_name - ) - self.logger.info(f"TODO: Start VDU Workflow {workflow_id}") - - @activity.defn(name=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED) - async def check_ns_instantiate_finished( - self, ns_instantiate: NsInstantiateInput - ) -> None: - # TODO: Implement OSM-993 - pass + @activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS) + async def get_vnf_records( + self, get_vnf_records_input: GetVnfRecordIdsInput + ) -> GetVnfRecordIdsOutput: + # TODO: [OSMENG-1043] Implement Get VNF Records + return GetVnfRecordIdsOutput(vnfr_ids=[""]) class NsDbActivity: @@ -83,37 +54,6 @@ class NsDbActivity: self.db = db self.logger = logging.getLogger(f"lcm.act.{self.__class__.__name__}") - @activity.defn(name=ACTIVITY_GET_MODEL_INFO) - async def get_model_info( - self, ns_instantiate_input: NsInstantiateInput - ) -> ModelInfo: - """Returns a ModelInfo. Contains VIM ID and model name. - - Collaborators: - DB Read: nsrs - - Raises (Retryable): - DbException If the target DB record does not exist or DB is not reachable. - - Activity Lifecycle: - This activity will not report a heartbeat due to its - short-running nature. - - As this is a direct DB update, it is not recommended to have - any specific retry policy - - """ - ns_uuid = ns_instantiate_input.ns_uuid - nsr = self.db.get_one("nsrs", {"_id": ns_uuid}) - vim_uuid = nsr.get("datacenter") - model_name = self._get_namespace(ns_uuid, vim_uuid) - return ModelInfo(vim_uuid, model_name) - - @staticmethod - def _get_namespace(ns_id: str, vim_id: str) -> str: - """The NS namespace is the combination if the NS ID and the VIM ID.""" - return ns_id[-12:] + "-" + vim_id[-12:] - @activity.defn(name=ACTIVITY_UPDATE_NS_STATE) async def update_ns_state(self, data: UpdateNsStateInput) -> None: """ diff --git a/osm_lcm/temporal/ns_workflows.py b/osm_lcm/temporal/ns_workflows.py index bef6766..e78392e 100644 --- a/osm_lcm/temporal/ns_workflows.py +++ b/osm_lcm/temporal/ns_workflows.py @@ -14,26 +14,27 @@ # See the License for the specific language governing permissions and # limitations under the License. -from temporalio import workflow -from temporalio.converter import value_to_type -from temporalio.exceptions import ActivityError +import asyncio from osm_common.dataclasses.temporal_dataclasses import ( + GetVnfRecordIdsInput, + GetVnfRecordIdsOutput, ModelInfo, - NsInstantiateInput, NsLcmOperationInput, NsState, UpdateNsStateInput, + VnfInstantiateInput, ) from osm_common.temporal_constants import ( ACTIVITY_CREATE_MODEL, - ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED, - ACTIVITY_DEPLOY_NS, - ACTIVITY_GET_MODEL_INFO, + ACTIVITY_GET_VNF_RECORD_IDS, ACTIVITY_UPDATE_NS_STATE, - LCM_TASK_QUEUE, WORKFLOW_NS_INSTANTIATE, + WORKFLOW_VNF_INSTANTIATE, ) +from temporalio import workflow +from temporalio.converter import value_to_type +from temporalio.exceptions import ActivityError from osm_lcm.temporal.lcm_workflows import LcmOperationWorkflow @@ -47,72 +48,70 @@ class NsInstantiateWorkflow(LcmOperationWorkflow): await super().wrap_nslcmop(input=input) async def workflow(self, input: NsLcmOperationInput) -> None: - # Temporary until we align on what is really needed where. For now, this - # reduced set looks to be all that is needed. - input = NsInstantiateInput( - ns_uuid=input.nslcmop["nsInstanceId"], op_id=input.nslcmop["_id"] - ) + self.logger.info(f"Executing {WORKFLOW_NS_INSTANTIATE} with {input}") - ns_state = UpdateNsStateInput(input.ns_uuid, NsState.INSTANTIATED, "Done") + # TODO: Can we clean up the input? Perhaps this workflow could receive NsInstantiateInput + # directly. + ns_uuid = input.nslcmop["nsInstanceId"] + vim_uuid = input.nslcmop["operationParams"]["vimAccountId"] + model_name = self._get_namespace(ns_uuid, vim_uuid) try: - model_info = value_to_type( - ModelInfo, - await workflow.execute_activity( - activity=ACTIVITY_GET_MODEL_INFO, - arg=input, - activity_id=f"{ACTIVITY_GET_MODEL_INFO}-{input.ns_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, - retry_policy=LcmOperationWorkflow.no_retry_policy, - ), - ) - await workflow.execute_activity( activity=ACTIVITY_CREATE_MODEL, - arg=model_info, - activity_id=f"{ACTIVITY_CREATE_MODEL}-{input.ns_uuid}", - task_queue=LCM_TASK_QUEUE, + arg=ModelInfo(vim_uuid=vim_uuid, model_name=model_name), + activity_id=f"{ACTIVITY_CREATE_MODEL}-{ns_uuid}", schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, retry_policy=LcmOperationWorkflow.no_retry_policy, ) - # TODO: Change this to a workflow OSM-990 - await workflow.execute_activity( - activity=ACTIVITY_DEPLOY_NS, - arg=input, - activity_id=f"{ACTIVITY_DEPLOY_NS}-{input.ns_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, - retry_policy=LcmOperationWorkflow.no_retry_policy, + # TODO: Provide the correct VNFR ids to the VNF instantiate workflow + vnf_record_ids_output: GetVnfRecordIdsOutput = value_to_type( + GetVnfRecordIdsOutput, + await workflow.execute_activity( + activity=ACTIVITY_GET_VNF_RECORD_IDS, + arg=GetVnfRecordIdsInput(ns_uuid=ns_uuid), + activity_id=f"{ACTIVITY_GET_VNF_RECORD_IDS}-{ns_uuid}", + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.no_retry_policy, + ), ) - - await workflow.execute_activity( - activity=ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED, - arg=input, - activity_id=f"{ACTIVITY_CHECK_NS_INSTANTIATION_FINISHED}-{input.ns_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, - retry_policy=LcmOperationWorkflow.retry_policy, + asyncio.gather( + workflow.execute_child_workflow( + workflow=WORKFLOW_VNF_INSTANTIATE, + arg=VnfInstantiateInput(vnfr_uuid=vnfr_uuid, model_name=model_name), + id=f"{WORKFLOW_VNF_INSTANTIATE}-{vnfr_uuid}", + ) + for vnfr_uuid in vnf_record_ids_output.vnfr_ids ) except ActivityError as e: - ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, e.cause.message) + await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, e.cause.message) self.logger.error( f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e.cause.message)}" ) raise e except Exception as e: - ns_state = UpdateNsStateInput(input.ns_uuid, NsState.ERROR, str(e)) + await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, str(e)) self.logger.error(f"{WORKFLOW_NS_INSTANTIATE} failed with {str(e)}") raise e + await self.update_ns_state(ns_uuid, NsState.INSTANTIATED, "Done") - finally: - await workflow.execute_activity( - activity=ACTIVITY_UPDATE_NS_STATE, - arg=ns_state, - activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{input.ns_uuid}", - task_queue=LCM_TASK_QUEUE, - schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, - retry_policy=LcmOperationWorkflow.retry_policy, - ) + @staticmethod + async def update_ns_state( + ns_uuid: str, + state: NsState, + message: str, + ) -> None: + input = UpdateNsStateInput(ns_uuid, state, message) + await workflow.execute_activity( + activity=ACTIVITY_UPDATE_NS_STATE, + arg=input, + activity_id=f"{ACTIVITY_UPDATE_NS_STATE}-{ns_uuid}", + schedule_to_close_timeout=LcmOperationWorkflow.default_schedule_to_close_timeout, + retry_policy=LcmOperationWorkflow.retry_policy, + ) + + def _get_namespace(self, ns_id: str, vim_id: str) -> str: + """The NS namespace is the combination if the NS ID and the VIM ID.""" + return ns_id[-12:] + "-" + vim_id[-12:] diff --git a/osm_lcm/tests/test_lcm_utils.py b/osm_lcm/tests/test_lcm_utils.py index 32abbf5..3305fa6 100644 --- a/osm_lcm/tests/test_lcm_utils.py +++ b/osm_lcm/tests/test_lcm_utils.py @@ -17,18 +17,18 @@ ## import logging import tempfile -from unittest.mock import Mock, patch, MagicMock, mock_open from unittest import TestCase +from unittest.mock import MagicMock, Mock, mock_open, patch +from zipfile import BadZipfile -from osm_common.msgkafka import MsgKafka +import yaml from osm_common import fslocal +from osm_common.msgkafka import MsgKafka + from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem from osm_lcm.lcm_utils import LcmBase, LcmException from osm_lcm.tests import test_db_descriptors as descriptors -import yaml -from zipfile import BadZipfile - tmpdir = tempfile.mkdtemp()[1] tmpfile = tempfile.mkstemp()[1] diff --git a/osm_lcm/tests/test_ns_activities.py b/osm_lcm/tests/test_ns_activities.py deleted file mode 100644 index b12693b..0000000 --- a/osm_lcm/tests/test_ns_activities.py +++ /dev/null @@ -1,59 +0,0 @@ -####################################################################################### -# Copyright ETSI Contributors and Others. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import asynctest -from osm_common.dataclasses.temporal_dataclasses import ModelInfo, NsInstantiateInput -from osm_common.dbbase import DbException -from osm_lcm.temporal.ns_activities import NsDbActivity -from temporalio.testing import ActivityEnvironment -from unittest.mock import Mock - - -vim_id = "some-vim-uuid" -ns_id = "0123456789-9876543210" -nsr = { - "_id": ns_id, - "datacenter": vim_id, -} -input = NsInstantiateInput(ns_id, "op_id") -expected_namespace = "9-9876543210-ome-vim-uuid" -expected_model_info = ModelInfo(vim_id, expected_namespace) - - -class TestGetModelInfo(asynctest.TestCase): - def setUp(self): - self.db = Mock() - self.env = ActivityEnvironment() - self.ns_db_activity = NsDbActivity(self.db) - - async def test_nominal_case(self): - self.db.get_one.return_value = nsr - model_info = await self.env.run(self.ns_db_activity.get_model_info, input) - self.assertEqual(model_info, expected_model_info) - - async def test_db_raises_exception(self): - self.db.get_one.side_effect = DbException("not found") - with self.assertRaises(DbException): - model_info = await self.env.run(self.ns_db_activity.get_model_info, input) - self.assertIsNone(model_info) - - async def test_no_datacenter_raises_exception(self): - nsr = {"_id": ns_id} - self.db.get_one.return_value = nsr - with self.assertRaises(TypeError): - model_info = await self.env.run(self.ns_db_activity.get_model_info, input) - self.assertIsNone(model_info) diff --git a/osm_lcm/tests/test_ns_workflows.py b/osm_lcm/tests/test_ns_workflows.py new file mode 100644 index 0000000..9885a16 --- /dev/null +++ b/osm_lcm/tests/test_ns_workflows.py @@ -0,0 +1,199 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from unittest.mock import Mock + +import asynctest +from osm_common.dataclasses.temporal_dataclasses import ( + GetVnfRecordIdsInput, + GetVnfRecordIdsOutput, + LcmOperationState, + ModelInfo, + NsLcmOperationInput, + NsState, + UpdateLcmOperationStateInput, + UpdateNsStateInput, + VnfInstantiateInput, +) +from osm_common.temporal_constants import ( + ACTIVITY_CREATE_MODEL, + ACTIVITY_GET_VNF_RECORD_IDS, + ACTIVITY_UPDATE_LCM_OPERATION_STATE, + ACTIVITY_UPDATE_NS_STATE, + LCM_TASK_QUEUE, + WORKFLOW_VNF_INSTANTIATE, +) +from temporalio import activity, workflow +from temporalio.client import WorkflowFailureError +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from osm_lcm.temporal.ns_workflows import NsInstantiateWorkflow + + +class TestException(Exception): + pass + + +@activity.defn(name=ACTIVITY_CREATE_MODEL) +async def mock_create_model(create_model_input: ModelInfo) -> None: + pass + + +@activity.defn(name=ACTIVITY_GET_VNF_RECORD_IDS) +async def mock_get_vnf_record_ids( + get_vnf_record_ids_input: GetVnfRecordIdsInput, +) -> None: + return GetVnfRecordIdsOutput( + vnfr_ids=[ + "828d91ee-fa04-43bb-8471-f66ea74597e7", + "c4bbeb41-df7e-4daa-863d-c8fd29fac96d", + ] + ) + + +@activity.defn(name=ACTIVITY_CREATE_MODEL) +async def mock_create_model_raises(create_model_input: ModelInfo) -> None: + raise TestException("Test exception") + + +@workflow.defn(name=WORKFLOW_VNF_INSTANTIATE, sandboxed=False) +class MockVnfInstantiateWorkflow: + @workflow.run + async def run(self, input: VnfInstantiateInput) -> None: + pass + + +class TestNsInstantiateWorkflow(asynctest.TestCase): + input = NsLcmOperationInput( + nslcmop={ + "_id": "1234", + "nsInstanceId": "5678", + "operationParams": {"vimAccountId": "9876"}, + } + ) + + @activity.defn(name=ACTIVITY_UPDATE_LCM_OPERATION_STATE) + async def mock_update_lcm_operation_state( + self, + data: UpdateLcmOperationStateInput, + ) -> None: + self.mock_update_lcm_operation_state_tracker(data) + + @activity.defn(name=ACTIVITY_UPDATE_NS_STATE) + async def mock_update_ns_state(self, data: UpdateNsStateInput) -> None: + self.mock_update_ns_state_tracker(data) + + async def setUp(self): + self.env = await WorkflowEnvironment.start_time_skipping() + self.mock_update_lcm_operation_state_tracker = Mock() + self.mock_update_ns_state_tracker = Mock() + + async def test_instantiate_workflow(self): + async with self.env as env: + async with Worker( + env.client, + task_queue=LCM_TASK_QUEUE, + workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow], + activities=[ + mock_create_model, + mock_get_vnf_record_ids, + self.mock_update_ns_state, + self.mock_update_lcm_operation_state, + ], + debug_mode=True, + ): + await env.client.execute_workflow( + NsInstantiateWorkflow, + arg=self.input, + id=self.input.nslcmop["nsInstanceId"], + task_queue=LCM_TASK_QUEUE, + ) + + assert self.mock_update_lcm_operation_state_tracker.call_count == 2 + assert_lcm_op_states( + self.mock_update_lcm_operation_state_tracker.call_args_list, + [LcmOperationState.PROCESSING, LcmOperationState.COMPLETED], + ) + assert_ns_states( + self.mock_update_ns_state_tracker.call_args_list, [NsState.INSTANTIATED] + ) + + async def test_instantiate_workflow_with_exception_updates_lcm_operation(self): + async with self.env as env: + async with Worker( + env.client, + task_queue=LCM_TASK_QUEUE, + workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow], + activities=[ + mock_create_model_raises, + mock_get_vnf_record_ids, + self.mock_update_ns_state, + self.mock_update_lcm_operation_state, + ], + debug_mode=True, + ): + # TODO: Check this WorkflowFailureError is caused by a TestException + with self.assertRaises(WorkflowFailureError): + await env.client.execute_workflow( + NsInstantiateWorkflow, + arg=self.input, + id=self.input.nslcmop["nsInstanceId"], + task_queue=LCM_TASK_QUEUE, + ) + assert self.mock_update_lcm_operation_state_tracker.call_count == 2 + assert_lcm_op_states( + self.mock_update_lcm_operation_state_tracker.call_args_list, + [LcmOperationState.PROCESSING, LcmOperationState.FAILED], + ) + + async def test_instantiate_workflow_with_exception_updates_ns_state(self): + async with self.env as env: + async with Worker( + env.client, + task_queue=LCM_TASK_QUEUE, + workflows=[NsInstantiateWorkflow, MockVnfInstantiateWorkflow], + activities=[ + mock_create_model_raises, + mock_get_vnf_record_ids, + self.mock_update_ns_state, + self.mock_update_lcm_operation_state, + ], + debug_mode=True, + ): + # TODO: Check this WorkflowFailureError is caused by a TestException + with self.assertRaises(WorkflowFailureError): + await env.client.execute_workflow( + NsInstantiateWorkflow, + arg=self.input, + id=self.input.nslcmop["nsInstanceId"], + task_queue=LCM_TASK_QUEUE, + ) + assert self.mock_update_ns_state_tracker.call_count == 1 + assert_ns_states( + self.mock_update_ns_state_tracker.call_args_list, [NsState.INSTANTIATED] + ) + + +def assert_ns_states(call_args_list, expected_states): + """Asserts that the NS state was set to each of the expected states (in order).""" + assert [call.args[0].state for call in call_args_list] == expected_states + + +def assert_lcm_op_states(call_args_list, expected_states): + """Asserts that the LCM operation was set to each of the exepcted states (in order).""" + assert [call.args[0].op_state for call in call_args_list] == expected_states