Adding check charm status input
[osm/common.git] / osm_common / tests / test_wftemporal.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17
18 from contextlib import contextmanager
19 from typing import Generator
20 from unittest import mock
21 from unittest.mock import AsyncMock, Mock
22
23 from osm_common.wftemporal import WFTemporal
24 import pytest
25 from temporalio.client import Client, WorkflowHandle
26
27
28 @contextmanager
29 def generate_mock_client() -> Generator[AsyncMock, None, None]:
30 # Note the mock instance has to be created before patching for autospec to work
31 mock_instance = mock.create_autospec(Client)
32 mock_instance.connect.return_value = mock_instance
33
34 with mock.patch("osm_common.wftemporal.Client.connect") as client:
35 client.return_value = mock_instance
36 yield mock_instance
37
38
39 @pytest.mark.asyncio
40 @mock.patch("osm_common.wftemporal.uuid")
41 async def test_start_workflow_no_id(mock_uuid):
42 WFTemporal._client = None
43
44 mock_uuid.uuid4.return_value = "01234567-89abc-def-0123-456789abcdef"
45
46 with generate_mock_client() as client:
47 client.start_workflow.return_value = "handle"
48 temporal = WFTemporal()
49 result = await temporal.start_workflow(
50 task_queue="q", workflow_name="workflow", workflow_data="data"
51 )
52 assert result == "handle"
53 client.start_workflow.assert_awaited_once_with(
54 workflow="workflow",
55 arg="data",
56 id="01234567-89abc-def-0123-456789abcdef",
57 task_queue="q",
58 )
59
60
61 @pytest.mark.asyncio
62 async def test_start_workflow_with_id():
63 WFTemporal._client = None
64
65 with generate_mock_client() as client:
66 client.start_workflow.return_value = "handle"
67 temporal = WFTemporal()
68 result = await temporal.start_workflow(
69 task_queue="q", id="id", workflow_name="workflow", workflow_data="data"
70 )
71 assert result == "handle"
72 client.start_workflow.assert_awaited_once_with(
73 workflow="workflow", arg="data", id="id", task_queue="q"
74 )
75
76
77 @pytest.mark.asyncio
78 async def test_execute_workflow_with_id():
79 WFTemporal._client = None
80
81 with generate_mock_client() as client:
82 handle = Mock(WorkflowHandle)
83 handle.result = AsyncMock(return_value="success")
84 client.start_workflow.return_value = handle
85
86 temporal = WFTemporal()
87 result = await temporal.execute_workflow(
88 task_queue="q",
89 id="another id",
90 workflow_name="workflow-with-result",
91 workflow_data="data",
92 )
93 assert result == "success"
94 client.start_workflow.assert_awaited_once_with(
95 workflow="workflow-with-result", arg="data", id="another id", task_queue="q"
96 )
97
98
99 @pytest.mark.asyncio
100 async def test_client_cache():
101 WFTemporal._client = None
102
103 with generate_mock_client() as client:
104 handle = Mock(WorkflowHandle)
105 handle.result = AsyncMock(return_value="success")
106 client.start_workflow.return_value = handle
107
108 temporal = WFTemporal()
109 result = await temporal.execute_workflow(
110 task_queue="q",
111 id="another id",
112 workflow_name="workflow-with-result",
113 workflow_data="data",
114 )
115 assert result == "success"
116 client.start_workflow.assert_awaited_once_with(
117 workflow="workflow-with-result", arg="data", id="another id", task_queue="q"
118 )
119
120 temporal = WFTemporal()
121 result = await temporal.execute_workflow(
122 task_queue="q",
123 id="yet another id",
124 workflow_name="workflow-with-result",
125 workflow_data="more data",
126 )
127 assert result == "success"
128 client.start_workflow.assert_awaited_with(
129 workflow="workflow-with-result",
130 arg="more data",
131 id="yet another id",
132 task_queue="q",
133 )