1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from time
import sleep
21 from n2vc
.juju_watcher
import JujuModelWatcher
, entity_ready
, status
22 from n2vc
.exceptions
import EntityInvalidException
23 from .utils
import FakeN2VC
, AsyncMock
, Deltas
, FakeWatcher
24 from juju
.application
import Application
25 from juju
.action
import Action
26 from juju
.annotation
import Annotation
27 from juju
.client
._definitions
import AllWatcherNextResults
28 from juju
.machine
import Machine
29 from juju
.model
import Model
30 from juju
.unit
import Unit
31 from unittest
import mock
, TestCase
32 from unittest
.mock
import Mock
35 class JujuWatcherTest(asynctest
.TestCase
):
37 self
.n2vc
= FakeN2VC()
39 self
.loop
= asyncio
.new_event_loop()
41 def test_get_status(self
):
44 (status
, message
, vca_status
) = JujuModelWatcher
.get_status(test
.delta
)
45 self
.assertEqual(status
, test
.entity_status
.status
)
46 self
.assertEqual(message
, test
.entity_status
.message
)
47 self
.assertEqual(vca_status
, test
.entity_status
.vca_status
)
49 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
50 def test_model_watcher(self
, allwatcher
):
52 allwatcher
.return_value
= FakeWatcher()
55 with self
.assertRaises(asyncio
.TimeoutError
):
56 allwatcher
.return_value
.delta_to_return
= [test
.delta
]
57 self
.loop
.run_until_complete(
58 JujuModelWatcher
.model_watcher(
60 test
.filter.entity_id
,
61 test
.filter.entity_type
,
63 db_dict
={"something"},
69 n2vc
.write_app_status_to_db
.assert_called()
71 @mock.patch("n2vc.juju_watcher.asyncio.wait")
72 def test_wait_for(self
, wait
):
73 wait
.return_value
= asyncio
.Future()
74 wait
.return_value
.set_result(None)
77 self
.loop
.run_until_complete(JujuModelWatcher
.wait_for(self
.model
, machine
))
79 @mock.patch("n2vc.juju_watcher.asyncio.wait")
80 def test_wait_for_exception(self
, wait
):
81 wait
.return_value
= asyncio
.Future()
82 wait
.return_value
.set_result(None)
83 wait
.side_effect
= Exception("error")
86 with self
.assertRaises(Exception):
87 self
.loop
.run_until_complete(JujuModelWatcher
.wait_for(self
.model
, machine
))
89 def test_wait_for_invalid_entity_exception(self
):
90 with self
.assertRaises(EntityInvalidException
):
91 self
.loop
.run_until_complete(
92 JujuModelWatcher
.wait_for(
94 Annotation(0, self
.model
),
96 progress_timeout
=None,
101 class EntityReadyTest(TestCase
):
102 @mock.patch("juju.application.Application.units")
103 def setUp(self
, mock_units
):
105 self
.model
._connector
= mock
.MagicMock()
107 def test_invalid_entity(self
):
108 with self
.assertRaises(EntityInvalidException
):
109 entity_ready(Annotation(0, self
.model
))
111 @mock.patch("juju.machine.Machine.agent_status")
112 def test_machine_entity(self
, mock_machine_agent_status
):
113 entity
= Machine(0, self
.model
)
114 self
.assertEqual(entity
.entity_type
, "machine")
115 self
.assertTrue(isinstance(entity_ready(entity
), bool))
117 @mock.patch("juju.action.Action.status")
118 def test_action_entity(self
, mock_action_status
):
119 entity
= Action(0, self
.model
)
120 self
.assertEqual(entity
.entity_type
, "action")
121 self
.assertTrue(isinstance(entity_ready(entity
), bool))
123 @mock.patch("juju.application.Application.status")
124 def test_application_entity(self
, mock_application_status
):
125 entity
= Application(0, self
.model
)
126 self
.assertEqual(entity
.entity_type
, "application")
127 self
.assertTrue(isinstance(entity_ready(entity
), bool))
130 @mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
131 class EntityStateTest(TestCase
):
134 self
.model
._connector
= mock
.MagicMock()
135 self
.loop
= asyncio
.new_event_loop()
136 self
.application
= Mock(Application
)
137 self
.upgrade_file
= None
140 def _fetch_next_delta(self
):
143 raw_data
= self
.upgrade_file
.readline()
145 raise EOFError("Log file is out of events")
147 delta
= json
.loads(raw_data
)
151 if delta
[0] == "unit":
152 if delta
[2]["life"] == "dead":
153 # Remove the unit from the application
154 for unit
in self
.application
.units
:
155 if unit
.entity_id
== delta
[2]["name"]:
156 self
.application
.units
.remove(unit
)
159 for unit
in self
.application
.units
:
160 if unit
.entity_id
== delta
[2]["name"]:
164 print("Application gets a new unit: {}".format(delta
[2]["name"]))
166 unit
.entity_id
= delta
[2]["name"]
167 unit
.entity_type
= "unit"
168 self
.application
.units
.append(unit
)
170 print("{} {}".format(self
.line_number
, delta
))
171 self
.line_number
= self
.line_number
+ 1
173 return AllWatcherNextResults(
179 def _ensure_state(self
, filename
, mock_all_watcher
):
181 os
.path
.join(os
.path
.dirname(__file__
), "testdata", filename
),
183 ) as self
.upgrade_file
:
185 all_changes
= AsyncMock()
186 all_changes
.Next
.side_effect
= self
._fetch
_next
_delta
187 mock_all_watcher
.return_value
= all_changes
189 self
.loop
.run_until_complete(
190 JujuModelWatcher
.ensure_units_idle(
191 model
=self
.model
, application
=self
.application
195 with self
.assertRaises(EOFError, msg
="Not all events consumed"):
196 change
= self
._fetch
_next
_delta
()
197 print(change
.deltas
[0].deltas
)
199 def _slow_changes(self
):
201 return AllWatcherNextResults(
206 "name": "app-vnf-7a49ace2b6-z0/2",
207 "application": "app-vnf-7a49ace2b6-z0",
211 "since": "2022-04-26T18:50:27.579802723Z"},
215 "since": "2022-04-26T18:50:28.592142816Z"}
221 def test_timeout(self
, mock_all_watcher
):
223 unit1
.entity_id
= "app-vnf-7a49ace2b6-z0/0"
224 unit1
.entity_type
= "unit"
225 self
.application
.units
= [
229 all_changes
= AsyncMock()
230 all_changes
.Next
.side_effect
= self
._slow
_changes
231 mock_all_watcher
.return_value
= all_changes
233 with self
.assertRaises(TimeoutError
):
234 self
.loop
.run_until_complete(
235 JujuModelWatcher
.wait_for_units_idle(
236 model
=self
.model
, application
=self
.application
, timeout
=0.01
240 def test_machine_unit_upgrade(self
, mock_all_watcher
):
242 unit1
.entity_id
= "app-vnf-7a49ace2b6-z0/0"
243 unit1
.entity_type
= "unit"
245 unit2
.entity_id
= "app-vnf-7a49ace2b6-z0/1"
246 unit2
.entity_type
= "unit"
248 unit3
.entity_id
= "app-vnf-7a49ace2b6-z0/2"
249 unit3
.entity_type
= "unit"
251 self
.application
.units
= [unit1
, unit2
, unit3
]
253 self
._ensure
_state
("upgrade-machine.log", mock_all_watcher
)
255 def test_operator_upgrade(self
, mock_all_watcher
):
257 unit1
.entity_id
= "sshproxy/0"
258 unit1
.entity_type
= "unit"
259 self
.application
.units
= [
262 self
._ensure
_state
("upgrade-operator.log", mock_all_watcher
)
264 def test_podspec_stateful_upgrade(self
, mock_all_watcher
):
266 unit1
.entity_id
= "mongodb/0"
267 unit1
.entity_type
= "unit"
268 self
.application
.units
= [
271 self
._ensure
_state
("upgrade-podspec-stateful.log", mock_all_watcher
)
273 def test_podspec_stateless_upgrade(self
, mock_all_watcher
):
275 unit1
.entity_id
= "lcm/9"
276 unit1
.entity_type
= "unit"
277 self
.application
.units
= [
280 self
._ensure
_state
("upgrade-podspec-stateless.log", mock_all_watcher
)
282 def test_sidecar_upgrade(self
, mock_all_watcher
):
284 unit1
.entity_id
= "kafka/0"
285 unit1
.entity_type
= "unit"
286 self
.application
.units
= [
289 self
._ensure
_state
("upgrade-sidecar.log", mock_all_watcher
)
292 class StatusTest(TestCase
):
295 self
.model
._connector
= mock
.MagicMock()
297 @mock.patch("n2vc.juju_watcher.derive_status")
298 def test_invalid_entity(self
, mock_derive_status
):
299 application
= mock
.MagicMock()
300 mock_derive_status
.return_value
= "active"
304 def workload_status(self
):
307 application
.units
= [FakeUnit()]
308 value
= status(application
)
309 mock_derive_status
.assert_called_once()
310 self
.assertTrue(isinstance(value
, str))
313 @asynctest.mock
.patch("asyncio.sleep")
314 class WaitForModelTest(asynctest
.TestCase
):
315 @asynctest.mock
.patch("juju.client.connector.Connector.connect")
316 def setUp(self
, mock_connect
=None):
317 self
.loop
= asyncio
.new_event_loop()
320 @asynctest.mock
.patch("juju.model.Model.block_until")
321 def test_wait_for_model(self
, mock_block_until
, mock_sleep
):
322 self
.loop
.run_until_complete(
323 JujuModelWatcher
.wait_for_model(self
.model
, timeout
=None)
325 mock_block_until
.assert_called()
327 @asynctest.mock
.patch("asyncio.ensure_future")
328 @asynctest.mock
.patch("asyncio.wait")
329 def test_wait_for_model_exception(self
, mock_wait
, mock_ensure_future
, mock_sleep
):
331 mock_ensure_future
.return_value
= task
332 mock_wait
.side_effect
= Exception
333 with self
.assertRaises(Exception):
334 self
.loop
.run_until_complete(
335 JujuModelWatcher
.wait_for_model(self
.model
, timeout
=None)
337 task
.cancel
.assert_called()