Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / tests / unit / test_juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import json
16 import os
17 from time import sleep
18 import asynctest
19 import asyncio
20
21 from n2vc.juju_watcher import JujuModelWatcher, entity_ready, status
22 from n2vc.exceptions import EntityInvalidException
23 from .utils import FakeN2VC, AsyncMock, Deltas, FakeWatcher
24 from juju.application import Application
25 from juju.action import Action
26 from juju.annotation import Annotation
27 from juju.client._definitions import AllWatcherNextResults
28 from juju.machine import Machine
29 from juju.model import Model
30 from juju.unit import Unit
31 from unittest import mock, TestCase
32 from unittest.mock import Mock
33
34
35 class JujuWatcherTest(asynctest.TestCase):
36 def setUp(self):
37 self.n2vc = FakeN2VC()
38 self.model = Mock()
39 self.loop = asyncio.new_event_loop()
40
41 def test_get_status(self):
42 tests = Deltas
43 for test in tests:
44 (status, message, vca_status) = JujuModelWatcher.get_status(test.delta)
45 self.assertEqual(status, test.entity_status.status)
46 self.assertEqual(message, test.entity_status.message)
47 self.assertEqual(vca_status, test.entity_status.vca_status)
48
49 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
50 def test_model_watcher(self, allwatcher):
51 tests = Deltas
52 allwatcher.return_value = FakeWatcher()
53 n2vc = AsyncMock()
54 for test in tests:
55 with self.assertRaises(asyncio.TimeoutError):
56 allwatcher.return_value.delta_to_return = [test.delta]
57 self.loop.run_until_complete(
58 JujuModelWatcher.model_watcher(
59 self.model,
60 test.filter.entity_id,
61 test.filter.entity_type,
62 timeout=0,
63 db_dict={"something"},
64 n2vc=n2vc,
65 vca_id=None,
66 )
67 )
68
69 n2vc.write_app_status_to_db.assert_called()
70
71 @mock.patch("n2vc.juju_watcher.asyncio.wait")
72 def test_wait_for(self, wait):
73 wait.return_value = asyncio.Future()
74 wait.return_value.set_result(None)
75
76 machine = AsyncMock()
77 self.loop.run_until_complete(JujuModelWatcher.wait_for(self.model, machine))
78
79 @mock.patch("n2vc.juju_watcher.asyncio.wait")
80 def test_wait_for_exception(self, wait):
81 wait.return_value = asyncio.Future()
82 wait.return_value.set_result(None)
83 wait.side_effect = Exception("error")
84
85 machine = AsyncMock()
86 with self.assertRaises(Exception):
87 self.loop.run_until_complete(JujuModelWatcher.wait_for(self.model, machine))
88
89 def test_wait_for_invalid_entity_exception(self):
90 with self.assertRaises(EntityInvalidException):
91 self.loop.run_until_complete(
92 JujuModelWatcher.wait_for(
93 self.model,
94 Annotation(0, self.model),
95 total_timeout=None,
96 progress_timeout=None,
97 )
98 )
99
100
101 class EntityReadyTest(TestCase):
102 @mock.patch("juju.application.Application.units")
103 def setUp(self, mock_units):
104 self.model = Model()
105 self.model._connector = mock.MagicMock()
106
107 def test_invalid_entity(self):
108 with self.assertRaises(EntityInvalidException):
109 entity_ready(Annotation(0, self.model))
110
111 @mock.patch("juju.machine.Machine.agent_status")
112 def test_machine_entity(self, mock_machine_agent_status):
113 entity = Machine(0, self.model)
114 self.assertEqual(entity.entity_type, "machine")
115 self.assertTrue(isinstance(entity_ready(entity), bool))
116
117 @mock.patch("juju.action.Action.status")
118 def test_action_entity(self, mock_action_status):
119 entity = Action(0, self.model)
120 self.assertEqual(entity.entity_type, "action")
121 self.assertTrue(isinstance(entity_ready(entity), bool))
122
123 @mock.patch("juju.application.Application.status")
124 def test_application_entity(self, mock_application_status):
125 entity = Application(0, self.model)
126 self.assertEqual(entity.entity_type, "application")
127 self.assertTrue(isinstance(entity_ready(entity), bool))
128
129
130 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
131 class EntityStateTest(TestCase):
132 def setUp(self):
133 self.model = Model()
134 self.model._connector = mock.MagicMock()
135 self.loop = asyncio.new_event_loop()
136 self.application = Mock(Application)
137 self.upgrade_file = None
138 self.line_number = 1
139
140 def _fetch_next_delta(self):
141 delta = None
142 while delta is None:
143 raw_data = self.upgrade_file.readline()
144 if not raw_data:
145 raise EOFError("Log file is out of events")
146 try:
147 delta = json.loads(raw_data)
148 except ValueError:
149 continue
150
151 if delta[0] == "unit":
152 if delta[2]["life"] == "dead":
153 # Remove the unit from the application
154 for unit in self.application.units:
155 if unit.entity_id == delta[2]["name"]:
156 self.application.units.remove(unit)
157 else:
158 unit_present = False
159 for unit in self.application.units:
160 if unit.entity_id == delta[2]["name"]:
161 unit_present = True
162
163 if not unit_present:
164 print("Application gets a new unit: {}".format(delta[2]["name"]))
165 unit = Mock(Unit)
166 unit.entity_id = delta[2]["name"]
167 unit.entity_type = "unit"
168 self.application.units.append(unit)
169
170 print("{} {}".format(self.line_number, delta))
171 self.line_number = self.line_number + 1
172
173 return AllWatcherNextResults(
174 deltas=[
175 delta,
176 ]
177 )
178
179 def _ensure_state(self, filename, mock_all_watcher):
180 with open(
181 os.path.join(os.path.dirname(__file__), "testdata", filename),
182 "r",
183 ) as self.upgrade_file:
184 all_changes = AsyncMock()
185 all_changes.Next.side_effect = self._fetch_next_delta
186 mock_all_watcher.return_value = all_changes
187
188 self.loop.run_until_complete(
189 JujuModelWatcher.ensure_units_idle(
190 model=self.model, application=self.application
191 )
192 )
193
194 with self.assertRaises(EOFError, msg="Not all events consumed"):
195 change = self._fetch_next_delta()
196 print(change.deltas[0].deltas)
197
198 def _slow_changes(self):
199 sleep(0.1)
200 return AllWatcherNextResults(
201 deltas=[
202 json.loads(
203 """["unit","change",
204 {
205 "name": "app-vnf-7a49ace2b6-z0/2",
206 "application": "app-vnf-7a49ace2b6-z0",
207 "workload-status": {
208 "current": "active",
209 "message": "",
210 "since": "2022-04-26T18:50:27.579802723Z"},
211 "agent-status": {
212 "current": "idle",
213 "message": "",
214 "since": "2022-04-26T18:50:28.592142816Z"}
215 }]"""
216 ),
217 ]
218 )
219
220 def test_timeout(self, mock_all_watcher):
221 unit1 = Mock(Unit)
222 unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
223 unit1.entity_type = "unit"
224 self.application.units = [
225 unit1,
226 ]
227
228 all_changes = AsyncMock()
229 all_changes.Next.side_effect = self._slow_changes
230 mock_all_watcher.return_value = all_changes
231
232 with self.assertRaises(TimeoutError):
233 self.loop.run_until_complete(
234 JujuModelWatcher.wait_for_units_idle(
235 model=self.model, application=self.application, timeout=0.01
236 )
237 )
238
239 def test_machine_unit_upgrade(self, mock_all_watcher):
240 unit1 = Mock(Unit)
241 unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
242 unit1.entity_type = "unit"
243 unit2 = Mock(Unit)
244 unit2.entity_id = "app-vnf-7a49ace2b6-z0/1"
245 unit2.entity_type = "unit"
246 unit3 = Mock(Unit)
247 unit3.entity_id = "app-vnf-7a49ace2b6-z0/2"
248 unit3.entity_type = "unit"
249
250 self.application.units = [unit1, unit2, unit3]
251
252 self._ensure_state("upgrade-machine.log", mock_all_watcher)
253
254 def test_operator_upgrade(self, mock_all_watcher):
255 unit1 = Mock(Unit)
256 unit1.entity_id = "sshproxy/0"
257 unit1.entity_type = "unit"
258 self.application.units = [
259 unit1,
260 ]
261 self._ensure_state("upgrade-operator.log", mock_all_watcher)
262
263 def test_podspec_stateful_upgrade(self, mock_all_watcher):
264 unit1 = Mock(Unit)
265 unit1.entity_id = "mongodb/0"
266 unit1.entity_type = "unit"
267 self.application.units = [
268 unit1,
269 ]
270 self._ensure_state("upgrade-podspec-stateful.log", mock_all_watcher)
271
272 def test_podspec_stateless_upgrade(self, mock_all_watcher):
273 unit1 = Mock(Unit)
274 unit1.entity_id = "lcm/9"
275 unit1.entity_type = "unit"
276 self.application.units = [
277 unit1,
278 ]
279 self._ensure_state("upgrade-podspec-stateless.log", mock_all_watcher)
280
281 def test_sidecar_upgrade(self, mock_all_watcher):
282 unit1 = Mock(Unit)
283 unit1.entity_id = "kafka/0"
284 unit1.entity_type = "unit"
285 self.application.units = [
286 unit1,
287 ]
288 self._ensure_state("upgrade-sidecar.log", mock_all_watcher)
289
290
291 class StatusTest(TestCase):
292 def setUp(self):
293 self.model = Model()
294 self.model._connector = mock.MagicMock()
295
296 @mock.patch("n2vc.juju_watcher.derive_status")
297 def test_invalid_entity(self, mock_derive_status):
298 application = mock.MagicMock()
299 mock_derive_status.return_value = "active"
300
301 class FakeUnit:
302 @property
303 def workload_status(self):
304 return "active"
305
306 application.units = [FakeUnit()]
307 value = status(application)
308 mock_derive_status.assert_called_once()
309 self.assertTrue(isinstance(value, str))
310
311
312 @asynctest.mock.patch("asyncio.sleep")
313 class WaitForModelTest(asynctest.TestCase):
314 @asynctest.mock.patch("juju.client.connector.Connector.connect")
315 def setUp(self, mock_connect=None):
316 self.loop = asyncio.new_event_loop()
317 self.model = Model()
318
319 @asynctest.mock.patch("juju.model.Model.block_until")
320 def test_wait_for_model(self, mock_block_until, mock_sleep):
321 self.loop.run_until_complete(
322 JujuModelWatcher.wait_for_model(self.model, timeout=None)
323 )
324 mock_block_until.assert_called()
325
326 @asynctest.mock.patch("asyncio.ensure_future")
327 @asynctest.mock.patch("asyncio.wait")
328 def test_wait_for_model_exception(self, mock_wait, mock_ensure_future, mock_sleep):
329 task = Mock()
330 mock_ensure_future.return_value = task
331 mock_wait.side_effect = Exception
332 with self.assertRaises(Exception):
333 self.loop.run_until_complete(
334 JujuModelWatcher.wait_for_model(self.model, timeout=None)
335 )
336 task.cancel.assert_called()