1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from time
import sleep
21 from n2vc
.juju_watcher
import JujuModelWatcher
, entity_ready
, status
22 from n2vc
.exceptions
import EntityInvalidException
23 from .utils
import FakeN2VC
, AsyncMock
, Deltas
, FakeWatcher
24 from juju
.application
import Application
25 from juju
.action
import Action
26 from juju
.annotation
import Annotation
27 from juju
.client
._definitions
import AllWatcherNextResults
28 from juju
.machine
import Machine
29 from juju
.model
import Model
30 from juju
.unit
import Unit
31 from unittest
import mock
, TestCase
32 from unittest
.mock
import Mock
35 class JujuWatcherTest(asynctest
.TestCase
):
37 self
.n2vc
= FakeN2VC()
39 self
.loop
= asyncio
.new_event_loop()
41 def test_get_status(self
):
44 (status
, message
, vca_status
) = JujuModelWatcher
.get_status(test
.delta
)
45 self
.assertEqual(status
, test
.entity_status
.status
)
46 self
.assertEqual(message
, test
.entity_status
.message
)
47 self
.assertEqual(vca_status
, test
.entity_status
.vca_status
)
49 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
50 def test_model_watcher(self
, allwatcher
):
52 allwatcher
.return_value
= FakeWatcher()
55 with self
.assertRaises(asyncio
.TimeoutError
):
56 allwatcher
.return_value
.delta_to_return
= [test
.delta
]
57 self
.loop
.run_until_complete(
58 JujuModelWatcher
.model_watcher(
60 test
.filter.entity_id
,
61 test
.filter.entity_type
,
63 db_dict
={"something"},
69 n2vc
.write_app_status_to_db
.assert_called()
71 @mock.patch("n2vc.juju_watcher.asyncio.wait")
72 def test_wait_for(self
, wait
):
73 wait
.return_value
= asyncio
.Future()
74 wait
.return_value
.set_result(None)
77 self
.loop
.run_until_complete(JujuModelWatcher
.wait_for(self
.model
, machine
))
79 @mock.patch("n2vc.juju_watcher.asyncio.wait")
80 def test_wait_for_exception(self
, wait
):
81 wait
.return_value
= asyncio
.Future()
82 wait
.return_value
.set_result(None)
83 wait
.side_effect
= Exception("error")
86 with self
.assertRaises(Exception):
87 self
.loop
.run_until_complete(JujuModelWatcher
.wait_for(self
.model
, machine
))
89 def test_wait_for_invalid_entity_exception(self
):
90 with self
.assertRaises(EntityInvalidException
):
91 self
.loop
.run_until_complete(
92 JujuModelWatcher
.wait_for(
94 Annotation(0, self
.model
),
96 progress_timeout
=None,
101 class EntityReadyTest(TestCase
):
102 @mock.patch("juju.application.Application.units")
103 def setUp(self
, mock_units
):
105 self
.model
._connector
= mock
.MagicMock()
107 def test_invalid_entity(self
):
108 with self
.assertRaises(EntityInvalidException
):
109 entity_ready(Annotation(0, self
.model
))
111 @mock.patch("juju.machine.Machine.agent_status")
112 def test_machine_entity(self
, mock_machine_agent_status
):
113 entity
= Machine(0, self
.model
)
114 self
.assertEqual(entity
.entity_type
, "machine")
115 self
.assertTrue(isinstance(entity_ready(entity
), bool))
117 @mock.patch("juju.action.Action.status")
118 def test_action_entity(self
, mock_action_status
):
119 entity
= Action(0, self
.model
)
120 self
.assertEqual(entity
.entity_type
, "action")
121 self
.assertTrue(isinstance(entity_ready(entity
), bool))
123 @mock.patch("juju.application.Application.status")
124 def test_application_entity(self
, mock_application_status
):
125 entity
= Application(0, self
.model
)
126 self
.assertEqual(entity
.entity_type
, "application")
127 self
.assertTrue(isinstance(entity_ready(entity
), bool))
130 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
131 class EntityStateTest(TestCase
):
134 self
.model
._connector
= mock
.MagicMock()
135 self
.loop
= asyncio
.new_event_loop()
136 self
.application
= Mock(Application
)
137 self
.upgrade_file
= None
140 def _fetch_next_delta(self
):
143 raw_data
= self
.upgrade_file
.readline()
145 raise EOFError("Log file is out of events")
147 delta
= json
.loads(raw_data
)
151 if delta
[0] == "unit":
152 if delta
[2]["life"] == "dead":
153 # Remove the unit from the application
154 for unit
in self
.application
.units
:
155 if unit
.entity_id
== delta
[2]["name"]:
156 self
.application
.units
.remove(unit
)
159 for unit
in self
.application
.units
:
160 if unit
.entity_id
== delta
[2]["name"]:
164 print("Application gets a new unit: {}".format(delta
[2]["name"]))
166 unit
.entity_id
= delta
[2]["name"]
167 unit
.entity_type
= "unit"
168 self
.application
.units
.append(unit
)
170 print("{} {}".format(self
.line_number
, delta
))
171 self
.line_number
= self
.line_number
+ 1
173 return AllWatcherNextResults(
179 def _ensure_state(self
, filename
, mock_all_watcher
):
181 os
.path
.join(os
.path
.dirname(__file__
), "testdata", filename
),
183 ) as self
.upgrade_file
:
184 all_changes
= AsyncMock()
185 all_changes
.Next
.side_effect
= self
._fetch
_next
_delta
186 mock_all_watcher
.return_value
= all_changes
188 self
.loop
.run_until_complete(
189 JujuModelWatcher
.ensure_units_idle(
190 model
=self
.model
, application
=self
.application
194 with self
.assertRaises(EOFError, msg
="Not all events consumed"):
195 change
= self
._fetch
_next
_delta
()
196 print(change
.deltas
[0].deltas
)
198 def _slow_changes(self
):
200 return AllWatcherNextResults(
205 "name": "app-vnf-7a49ace2b6-z0/2",
206 "application": "app-vnf-7a49ace2b6-z0",
210 "since": "2022-04-26T18:50:27.579802723Z"},
214 "since": "2022-04-26T18:50:28.592142816Z"}
220 def test_timeout(self
, mock_all_watcher
):
222 unit1
.entity_id
= "app-vnf-7a49ace2b6-z0/0"
223 unit1
.entity_type
= "unit"
224 self
.application
.units
= [
228 all_changes
= AsyncMock()
229 all_changes
.Next
.side_effect
= self
._slow
_changes
230 mock_all_watcher
.return_value
= all_changes
232 with self
.assertRaises(TimeoutError
):
233 self
.loop
.run_until_complete(
234 JujuModelWatcher
.wait_for_units_idle(
235 model
=self
.model
, application
=self
.application
, timeout
=0.01
239 def test_machine_unit_upgrade(self
, mock_all_watcher
):
241 unit1
.entity_id
= "app-vnf-7a49ace2b6-z0/0"
242 unit1
.entity_type
= "unit"
244 unit2
.entity_id
= "app-vnf-7a49ace2b6-z0/1"
245 unit2
.entity_type
= "unit"
247 unit3
.entity_id
= "app-vnf-7a49ace2b6-z0/2"
248 unit3
.entity_type
= "unit"
250 self
.application
.units
= [unit1
, unit2
, unit3
]
252 self
._ensure
_state
("upgrade-machine.log", mock_all_watcher
)
254 def test_operator_upgrade(self
, mock_all_watcher
):
256 unit1
.entity_id
= "sshproxy/0"
257 unit1
.entity_type
= "unit"
258 self
.application
.units
= [
261 self
._ensure
_state
("upgrade-operator.log", mock_all_watcher
)
263 def test_podspec_stateful_upgrade(self
, mock_all_watcher
):
265 unit1
.entity_id
= "mongodb/0"
266 unit1
.entity_type
= "unit"
267 self
.application
.units
= [
270 self
._ensure
_state
("upgrade-podspec-stateful.log", mock_all_watcher
)
272 def test_podspec_stateless_upgrade(self
, mock_all_watcher
):
274 unit1
.entity_id
= "lcm/9"
275 unit1
.entity_type
= "unit"
276 self
.application
.units
= [
279 self
._ensure
_state
("upgrade-podspec-stateless.log", mock_all_watcher
)
281 def test_sidecar_upgrade(self
, mock_all_watcher
):
283 unit1
.entity_id
= "kafka/0"
284 unit1
.entity_type
= "unit"
285 self
.application
.units
= [
288 self
._ensure
_state
("upgrade-sidecar.log", mock_all_watcher
)
291 class StatusTest(TestCase
):
294 self
.model
._connector
= mock
.MagicMock()
296 @mock.patch("n2vc.juju_watcher.derive_status")
297 def test_invalid_entity(self
, mock_derive_status
):
298 application
= mock
.MagicMock()
299 mock_derive_status
.return_value
= "active"
303 def workload_status(self
):
306 application
.units
= [FakeUnit()]
307 value
= status(application
)
308 mock_derive_status
.assert_called_once()
309 self
.assertTrue(isinstance(value
, str))
312 @asynctest.mock
.patch("asyncio.sleep")
313 class WaitForModelTest(asynctest
.TestCase
):
314 @asynctest.mock
.patch("juju.client.connector.Connector.connect")
315 def setUp(self
, mock_connect
=None):
316 self
.loop
= asyncio
.new_event_loop()
319 @asynctest.mock
.patch("juju.model.Model.block_until")
320 def test_wait_for_model(self
, mock_block_until
, mock_sleep
):
321 self
.loop
.run_until_complete(
322 JujuModelWatcher
.wait_for_model(self
.model
, timeout
=None)
324 mock_block_until
.assert_called()
326 @asynctest.mock
.patch("asyncio.ensure_future")
327 @asynctest.mock
.patch("asyncio.wait")
328 def test_wait_for_model_exception(self
, mock_wait
, mock_ensure_future
, mock_sleep
):
330 mock_ensure_future
.return_value
= task
331 mock_wait
.side_effect
= Exception
332 with self
.assertRaises(Exception):
333 self
.loop
.run_until_complete(
334 JujuModelWatcher
.wait_for_model(self
.model
, timeout
=None)
336 task
.cancel
.assert_called()