From: Mark Beierl Date: Fri, 24 Feb 2023 21:23:48 +0000 (+0000) Subject: Initial temporal client X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=9468ea357b051daa2b173958ab32a5b2ec12fcd7;p=osm%2Fcommon.git Initial temporal client An initial attempt at defining a temporal workflow client that can be used from any module Change-Id: I6095793617acbc7bd2438a28c07eb1d854f3ff1c Signed-off-by: Mark Beierl --- diff --git a/osm_common/tests/test_wftemporal.py b/osm_common/tests/test_wftemporal.py new file mode 100644 index 0000000..b8ea257 --- /dev/null +++ b/osm_common/tests/test_wftemporal.py @@ -0,0 +1,123 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### + +from contextlib import contextmanager +from typing import Generator +from unittest import mock +from unittest.mock import AsyncMock, Mock + +from osm_common.wftemporal import WFTemporal +import pytest +from temporalio.client import Client, WorkflowHandle + + +@contextmanager +def generate_mock_client() -> Generator[AsyncMock, None, None]: + # Note the mock instance has to be created before patching for autospec to work + mock_instance = mock.create_autospec(Client) + mock_instance.connect.return_value = mock_instance + + with mock.patch("osm_common.wftemporal.Client.connect") as client: + client.return_value = mock_instance + yield mock_instance + + +@pytest.mark.asyncio +async def test_get_client(): + WFTemporal.clients = {} + with generate_mock_client() as client: + temporal = WFTemporal(temporal_api="localhost") + result = await temporal.get_client() + assert result == client + + +@pytest.mark.asyncio +async def test_get_cached_client(): + WFTemporal.clients = {} + client_1 = None + client_2 = None + with generate_mock_client() as client: + temporal = WFTemporal(temporal_api="localhost") + client_1 = await temporal.get_client() + assert client_1 == client + + with generate_mock_client() as client: + temporal = WFTemporal(temporal_api="localhost") + client_2 = await temporal.get_client() + # We should get the same client as before, not a new one + assert client_2 == client_1 + assert client_2 != client + + +@pytest.mark.asyncio +@mock.patch("osm_common.wftemporal.uuid") +async def test_start_workflow_no_id(mock_uuid): + WFTemporal.clients = {} + + mock_uuid.uuid4.return_value = "01234567-89abc-def-0123-456789abcdef" + + with generate_mock_client() as client: + client.start_workflow.return_value = "handle" + temporal = WFTemporal(temporal_api="localhost") + result = await temporal.start_workflow( + task_queue="q", workflow_name="workflow", workflow_data="data" + ) + assert result == "handle" + client.start_workflow.assert_awaited_once_with( + workflow="workflow", + arg="data", + id="01234567-89abc-def-0123-456789abcdef", + task_queue="q", + ) + + +@pytest.mark.asyncio +async def test_start_workflow_with_id(): + WFTemporal.clients = {} + + with generate_mock_client() as client: + client.start_workflow.return_value = "handle" + temporal = WFTemporal(temporal_api="localhost") + result = await temporal.start_workflow( + task_queue="q", id="id", workflow_name="workflow", workflow_data="data" + ) + assert result == "handle" + client.start_workflow.assert_awaited_once_with( + workflow="workflow", arg="data", id="id", task_queue="q" + ) + + +@pytest.mark.asyncio +async def test_execute_workflow_with_id(): + WFTemporal.clients = {} + + with generate_mock_client() as client: + handle = Mock(WorkflowHandle) + handle.result = AsyncMock(return_value="success") + client.start_workflow.return_value = handle + + temporal = WFTemporal(temporal_api="localhost") + result = await temporal.execute_workflow( + task_queue="q", + id="another id", + workflow_name="workflow-with-result", + workflow_data="data", + ) + assert result == "success" + client.start_workflow.assert_awaited_once_with( + workflow="workflow-with-result", arg="data", id="another id", task_queue="q" + ) diff --git a/osm_common/wftemporal.py b/osm_common/wftemporal.py new file mode 100644 index 0000000..0f7d421 --- /dev/null +++ b/osm_common/wftemporal.py @@ -0,0 +1,67 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### + +import logging +import uuid + +from temporalio.client import Client + + +class WFTemporal(object): + clients = {} + + def __init__(self, temporal_api=None, logger_name="temporal.client"): + self.logger = logging.getLogger(logger_name) + self.temporal_api = temporal_api + + async def execute_workflow( + self, task_queue: str, workflow_name: str, workflow_data: any, id: str = None + ): + handle = await self.start_workflow( + task_queue=task_queue, + workflow_name=workflow_name, + workflow_data=workflow_data, + id=id, + ) + result = await handle.result() + self.logger.info(f"Completed workflow {workflow_name}, id {id}") + return result + + async def start_workflow( + self, task_queue: str, workflow_name: str, workflow_data: any, id: str = None + ): + client = await self.get_client() + if id is None: + id = str(uuid.uuid4()) + self.logger.info(f"Starting workflow {workflow_name}, id {id}") + handle = await client.start_workflow( + workflow=workflow_name, arg=workflow_data, id=id, task_queue=task_queue + ) + return handle + + async def get_client(self): + if self.temporal_api in WFTemporal.clients: + client = WFTemporal.clients[self.temporal_api] + else: + self.logger.debug( + f"No cached client found, connecting to {self.temporal_api}" + ) + client = await Client.connect(self.temporal_api) + WFTemporal.clients[self.temporal_api] = client + + self.logger.debug(f"Using client {client} for {self.temporal_api}") + return client diff --git a/releasenotes/notes/temporal_client-020c20bce95f3438.yaml b/releasenotes/notes/temporal_client-020c20bce95f3438.yaml new file mode 100644 index 0000000..735ffa8 --- /dev/null +++ b/releasenotes/notes/temporal_client-020c20bce95f3438.yaml @@ -0,0 +1,21 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +--- +features: + - | + Temporal client: a library to manage the execution of workflows in a + particular temporal cluster. diff --git a/requirements-test.in b/requirements-test.in index 0378739..86d30f1 100644 --- a/requirements-test.in +++ b/requirements-test.in @@ -15,4 +15,5 @@ coverage nose2 -pytest \ No newline at end of file +pytest +pytest-asyncio \ No newline at end of file diff --git a/requirements-test.txt b/requirements-test.txt index bed91b0..d31c36c 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -16,7 +16,7 @@ ####################################################################################### attrs==22.2.0 # via pytest -coverage==7.1.0 +coverage==7.2.1 # via -r requirements-test.in exceptiongroup==1.1.0 # via pytest @@ -29,6 +29,10 @@ packaging==23.0 pluggy==1.0.0 # via pytest pytest==7.2.1 + # via + # -r requirements-test.in + # pytest-asyncio +pytest-asyncio==0.20.3 # via -r requirements-test.in tomli==2.0.1 # via pytest diff --git a/requirements.in b/requirements.in index 21033b8..6031584 100644 --- a/requirements.in +++ b/requirements.in @@ -15,7 +15,10 @@ pymongo<4 aiokafka +charset-normalizer<4 # Required by aiohttp in LCM pyyaml==5.4.1 pycryptodome dataclasses -motor==1.3.1 \ No newline at end of file +motor==1.3.1 +temporalio +protobuf<4 # Required by Juju diff --git a/requirements.txt b/requirements.txt index fd903ca..c0f467d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,8 @@ aiokafka==0.8.0 # via -r requirements.in async-timeout==4.0.2 # via aiokafka +charset-normalizer==3.0.1 + # via -r requirements.in dataclasses==0.6 # via -r requirements.in kafka-python==2.0.2 @@ -26,11 +28,25 @@ motor==1.3.1 # via -r requirements.in packaging==23.0 # via aiokafka +protobuf==3.20.3 + # via + # -r requirements.in + # temporalio pycryptodome==3.17 # via -r requirements.in pymongo==3.13.0 # via # -r requirements.in # motor +python-dateutil==2.8.2 + # via temporalio pyyaml==5.4.1 # via -r requirements.in +six==1.16.0 + # via python-dateutil +temporalio==1.1.0 + # via -r requirements.in +types-protobuf==4.22.0.0 + # via temporalio +typing-extensions==4.5.0 + # via temporalio