07b21276dce87622e0ca6945fbe05a3b8985b177
[osm/N2VC.git] / n2vc / tests / unit / test_juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import json
16 import os
17 from time import sleep
18 import asynctest
19 import asyncio
20
21 from n2vc.juju_watcher import JujuModelWatcher, entity_ready, status
22 from n2vc.exceptions import EntityInvalidException
23 from .utils import FakeN2VC, AsyncMock, Deltas, FakeWatcher
24 from juju.application import Application
25 from juju.action import Action
26 from juju.annotation import Annotation
27 from juju.client._definitions import AllWatcherNextResults
28 from juju.machine import Machine
29 from juju.model import Model
30 from juju.unit import Unit
31 from unittest import mock, TestCase
32 from unittest.mock import Mock
33
34
35 class JujuWatcherTest(asynctest.TestCase):
36 def setUp(self):
37 self.n2vc = FakeN2VC()
38 self.model = Mock()
39 self.loop = asyncio.new_event_loop()
40
41 def test_get_status(self):
42 tests = Deltas
43 for test in tests:
44 (status, message, vca_status) = JujuModelWatcher.get_status(test.delta)
45 self.assertEqual(status, test.entity_status.status)
46 self.assertEqual(message, test.entity_status.message)
47 self.assertEqual(vca_status, test.entity_status.vca_status)
48
49 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
50 def test_model_watcher(self, allwatcher):
51 tests = Deltas
52 allwatcher.return_value = FakeWatcher()
53 n2vc = AsyncMock()
54 for test in tests:
55 with self.assertRaises(asyncio.TimeoutError):
56 allwatcher.return_value.delta_to_return = [test.delta]
57 self.loop.run_until_complete(
58 JujuModelWatcher.model_watcher(
59 self.model,
60 test.filter.entity_id,
61 test.filter.entity_type,
62 timeout=0,
63 db_dict={"something"},
64 n2vc=n2vc,
65 vca_id=None,
66 )
67 )
68
69 n2vc.write_app_status_to_db.assert_called()
70
71 @mock.patch("n2vc.juju_watcher.asyncio.wait")
72 def test_wait_for(self, wait):
73 wait.return_value = asyncio.Future()
74 wait.return_value.set_result(None)
75
76 machine = AsyncMock()
77 self.loop.run_until_complete(JujuModelWatcher.wait_for(self.model, machine))
78
79 @mock.patch("n2vc.juju_watcher.asyncio.wait")
80 def test_wait_for_exception(self, wait):
81 wait.return_value = asyncio.Future()
82 wait.return_value.set_result(None)
83 wait.side_effect = Exception("error")
84
85 machine = AsyncMock()
86 with self.assertRaises(Exception):
87 self.loop.run_until_complete(JujuModelWatcher.wait_for(self.model, machine))
88
89 def test_wait_for_invalid_entity_exception(self):
90 with self.assertRaises(EntityInvalidException):
91 self.loop.run_until_complete(
92 JujuModelWatcher.wait_for(
93 self.model,
94 Annotation(0, self.model),
95 total_timeout=None,
96 progress_timeout=None,
97 )
98 )
99
100
101 class EntityReadyTest(TestCase):
102 @mock.patch("juju.application.Application.units")
103 def setUp(self, mock_units):
104 self.model = Model()
105 self.model._connector = mock.MagicMock()
106
107 def test_invalid_entity(self):
108 with self.assertRaises(EntityInvalidException):
109 entity_ready(Annotation(0, self.model))
110
111 @mock.patch("juju.machine.Machine.agent_status")
112 def test_machine_entity(self, mock_machine_agent_status):
113 entity = Machine(0, self.model)
114 self.assertEqual(entity.entity_type, "machine")
115 self.assertTrue(isinstance(entity_ready(entity), bool))
116
117 @mock.patch("juju.action.Action.status")
118 def test_action_entity(self, mock_action_status):
119 entity = Action(0, self.model)
120 self.assertEqual(entity.entity_type, "action")
121 self.assertTrue(isinstance(entity_ready(entity), bool))
122
123 @mock.patch("juju.application.Application.status")
124 def test_application_entity(self, mock_application_status):
125 entity = Application(0, self.model)
126 self.assertEqual(entity.entity_type, "application")
127 self.assertTrue(isinstance(entity_ready(entity), bool))
128
129
130 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
131 class EntityStateTest(TestCase):
132 def setUp(self):
133 self.model = Model()
134 self.model._connector = mock.MagicMock()
135 self.loop = asyncio.new_event_loop()
136 self.application = Mock(Application)
137 self.upgrade_file = None
138 self.line_number = 1
139
140 def _fetch_next_delta(self):
141 delta = None
142 while delta is None:
143 raw_data = self.upgrade_file.readline()
144 if not raw_data:
145 raise EOFError("Log file is out of events")
146 try:
147 delta = json.loads(raw_data)
148 except ValueError:
149 continue
150
151 if delta[0] == "unit":
152 if delta[2]["life"] == "dead":
153 # Remove the unit from the application
154 for unit in self.application.units:
155 if unit.entity_id == delta[2]["name"]:
156 self.application.units.remove(unit)
157 else:
158 unit_present = False
159 for unit in self.application.units:
160 if unit.entity_id == delta[2]["name"]:
161 unit_present = True
162
163 if not unit_present:
164 print("Application gets a new unit: {}".format(delta[2]["name"]))
165 unit = Mock(Unit)
166 unit.entity_id = delta[2]["name"]
167 unit.entity_type = "unit"
168 self.application.units.append(unit)
169
170 print("{} {}".format(self.line_number, delta))
171 self.line_number = self.line_number + 1
172
173 return AllWatcherNextResults(
174 deltas=[
175 delta,
176 ]
177 )
178
179 def _ensure_state(self, filename, mock_all_watcher):
180 with open(
181 os.path.join(os.path.dirname(__file__), "testdata", filename),
182 "r",
183 ) as self.upgrade_file:
184
185 all_changes = AsyncMock()
186 all_changes.Next.side_effect = self._fetch_next_delta
187 mock_all_watcher.return_value = all_changes
188
189 self.loop.run_until_complete(
190 JujuModelWatcher.ensure_units_idle(
191 model=self.model, application=self.application
192 )
193 )
194
195 with self.assertRaises(EOFError, msg="Not all events consumed"):
196 change = self._fetch_next_delta()
197 print(change.deltas[0].deltas)
198
199 def _slow_changes(self):
200 sleep(0.1)
201 return AllWatcherNextResults(
202 deltas=[
203 json.loads(
204 """["unit","change",
205 {
206 "name": "app-vnf-7a49ace2b6-z0/2",
207 "application": "app-vnf-7a49ace2b6-z0",
208 "workload-status": {
209 "current": "active",
210 "message": "",
211 "since": "2022-04-26T18:50:27.579802723Z"},
212 "agent-status": {
213 "current": "idle",
214 "message": "",
215 "since": "2022-04-26T18:50:28.592142816Z"}
216 }]"""
217 ),
218 ]
219 )
220
221 def test_timeout(self, mock_all_watcher):
222 unit1 = Mock(Unit)
223 unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
224 unit1.entity_type = "unit"
225 self.application.units = [
226 unit1,
227 ]
228
229 all_changes = AsyncMock()
230 all_changes.Next.side_effect = self._slow_changes
231 mock_all_watcher.return_value = all_changes
232
233 with self.assertRaises(TimeoutError):
234 self.loop.run_until_complete(
235 JujuModelWatcher.wait_for_units_idle(
236 model=self.model, application=self.application, timeout=0.01
237 )
238 )
239
240 def test_machine_unit_upgrade(self, mock_all_watcher):
241 unit1 = Mock(Unit)
242 unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
243 unit1.entity_type = "unit"
244 unit2 = Mock(Unit)
245 unit2.entity_id = "app-vnf-7a49ace2b6-z0/1"
246 unit2.entity_type = "unit"
247 unit3 = Mock(Unit)
248 unit3.entity_id = "app-vnf-7a49ace2b6-z0/2"
249 unit3.entity_type = "unit"
250
251 self.application.units = [unit1, unit2, unit3]
252
253 self._ensure_state("upgrade-machine.log", mock_all_watcher)
254
255 def test_operator_upgrade(self, mock_all_watcher):
256 unit1 = Mock(Unit)
257 unit1.entity_id = "sshproxy/0"
258 unit1.entity_type = "unit"
259 self.application.units = [
260 unit1,
261 ]
262 self._ensure_state("upgrade-operator.log", mock_all_watcher)
263
264 def test_podspec_stateful_upgrade(self, mock_all_watcher):
265 unit1 = Mock(Unit)
266 unit1.entity_id = "mongodb/0"
267 unit1.entity_type = "unit"
268 self.application.units = [
269 unit1,
270 ]
271 self._ensure_state("upgrade-podspec-stateful.log", mock_all_watcher)
272
273 def test_podspec_stateless_upgrade(self, mock_all_watcher):
274 unit1 = Mock(Unit)
275 unit1.entity_id = "lcm/9"
276 unit1.entity_type = "unit"
277 self.application.units = [
278 unit1,
279 ]
280 self._ensure_state("upgrade-podspec-stateless.log", mock_all_watcher)
281
282 def test_sidecar_upgrade(self, mock_all_watcher):
283 unit1 = Mock(Unit)
284 unit1.entity_id = "kafka/0"
285 unit1.entity_type = "unit"
286 self.application.units = [
287 unit1,
288 ]
289 self._ensure_state("upgrade-sidecar.log", mock_all_watcher)
290
291
292 class StatusTest(TestCase):
293 def setUp(self):
294 self.model = Model()
295 self.model._connector = mock.MagicMock()
296
297 @mock.patch("n2vc.juju_watcher.derive_status")
298 def test_invalid_entity(self, mock_derive_status):
299 application = mock.MagicMock()
300 mock_derive_status.return_value = "active"
301
302 class FakeUnit:
303 @property
304 def workload_status(self):
305 return "active"
306
307 application.units = [FakeUnit()]
308 value = status(application)
309 mock_derive_status.assert_called_once()
310 self.assertTrue(isinstance(value, str))
311
312
313 @asynctest.mock.patch("asyncio.sleep")
314 class WaitForModelTest(asynctest.TestCase):
315 @asynctest.mock.patch("juju.client.connector.Connector.connect")
316 def setUp(self, mock_connect=None):
317 self.loop = asyncio.new_event_loop()
318 self.model = Model()
319
320 @asynctest.mock.patch("juju.model.Model.block_until")
321 def test_wait_for_model(self, mock_block_until, mock_sleep):
322 self.loop.run_until_complete(
323 JujuModelWatcher.wait_for_model(self.model, timeout=None)
324 )
325 mock_block_until.assert_called()
326
327 @asynctest.mock.patch("asyncio.ensure_future")
328 @asynctest.mock.patch("asyncio.wait")
329 def test_wait_for_model_exception(self, mock_wait, mock_ensure_future, mock_sleep):
330 task = Mock()
331 mock_ensure_future.return_value = task
332 mock_wait.side_effect = Exception
333 with self.assertRaises(Exception):
334 self.loop.run_until_complete(
335 JujuModelWatcher.wait_for_model(self.model, timeout=None)
336 )
337 task.cancel.assert_called()