+@mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
+class EntityStateTest(TestCase):
+ def setUp(self):
+ self.model = Model()
+ self.model._connector = mock.MagicMock()
+ self.loop = asyncio.new_event_loop()
+ self.application = Mock(Application)
+ self.upgrade_file = None
+ self.line_number = 1
+
+ def _fetch_next_delta(self):
+ delta = None
+ while delta is None:
+ raw_data = self.upgrade_file.readline()
+ if not raw_data:
+ raise EOFError("Log file is out of events")
+ try:
+ delta = json.loads(raw_data)
+ except ValueError:
+ continue
+
+ if delta[0] == "unit":
+ if delta[2]["life"] == "dead":
+ # Remove the unit from the application
+ for unit in self.application.units:
+ if unit.entity_id == delta[2]["name"]:
+ self.application.units.remove(unit)
+ else:
+ unit_present = False
+ for unit in self.application.units:
+ if unit.entity_id == delta[2]["name"]:
+ unit_present = True
+
+ if not unit_present:
+ print("Application gets a new unit: {}".format(delta[2]["name"]))
+ unit = Mock(Unit)
+ unit.entity_id = delta[2]["name"]
+ unit.entity_type = "unit"
+ self.application.units.append(unit)
+
+ print("{} {}".format(self.line_number, delta))
+ self.line_number = self.line_number + 1
+
+ return AllWatcherNextResults(
+ deltas=[
+ delta,
+ ]
+ )
+
+ def _ensure_state(self, filename, mock_all_watcher):
+ with open(
+ os.path.join(os.path.dirname(__file__), "testdata", filename),
+ "r",
+ ) as self.upgrade_file:
+
+ all_changes = AsyncMock()
+ all_changes.Next.side_effect = self._fetch_next_delta
+ mock_all_watcher.return_value = all_changes
+
+ self.loop.run_until_complete(
+ JujuModelWatcher.ensure_units_idle(
+ model=self.model, application=self.application
+ )
+ )
+
+ with self.assertRaises(EOFError, msg="Not all events consumed"):
+ change = self._fetch_next_delta()
+ print(change.deltas[0].deltas)
+
+ def _slow_changes(self):
+ sleep(0.1)
+ return AllWatcherNextResults(
+ deltas=[
+ json.loads(
+ """["unit","change",
+ {
+ "name": "app-vnf-7a49ace2b6-z0/2",
+ "application": "app-vnf-7a49ace2b6-z0",
+ "workload-status": {
+ "current": "active",
+ "message": "",
+ "since": "2022-04-26T18:50:27.579802723Z"},
+ "agent-status": {
+ "current": "idle",
+ "message": "",
+ "since": "2022-04-26T18:50:28.592142816Z"}
+ }]"""
+ ),
+ ]
+ )
+
+ def test_timeout(self, mock_all_watcher):
+ unit1 = Mock(Unit)
+ unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
+ unit1.entity_type = "unit"
+ self.application.units = [
+ unit1,
+ ]
+
+ all_changes = AsyncMock()
+ all_changes.Next.side_effect = self._slow_changes
+ mock_all_watcher.return_value = all_changes
+
+ with self.assertRaises(TimeoutError):
+ self.loop.run_until_complete(
+ JujuModelWatcher.wait_for_units_idle(
+ model=self.model, application=self.application, timeout=0.01
+ )
+ )
+
+ def test_machine_unit_upgrade(self, mock_all_watcher):
+ unit1 = Mock(Unit)
+ unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
+ unit1.entity_type = "unit"
+ unit2 = Mock(Unit)
+ unit2.entity_id = "app-vnf-7a49ace2b6-z0/1"
+ unit2.entity_type = "unit"
+ unit3 = Mock(Unit)
+ unit3.entity_id = "app-vnf-7a49ace2b6-z0/2"
+ unit3.entity_type = "unit"
+
+ self.application.units = [unit1, unit2, unit3]
+
+ self._ensure_state("upgrade-machine.log", mock_all_watcher)
+
+ def test_operator_upgrade(self, mock_all_watcher):
+ unit1 = Mock(Unit)
+ unit1.entity_id = "sshproxy/0"
+ unit1.entity_type = "unit"
+ self.application.units = [
+ unit1,
+ ]
+ self._ensure_state("upgrade-operator.log", mock_all_watcher)
+
+ def test_podspec_stateful_upgrade(self, mock_all_watcher):
+ unit1 = Mock(Unit)
+ unit1.entity_id = "mongodb/0"
+ unit1.entity_type = "unit"
+ self.application.units = [
+ unit1,
+ ]
+ self._ensure_state("upgrade-podspec-stateful.log", mock_all_watcher)
+
+ def test_podspec_stateless_upgrade(self, mock_all_watcher):
+ unit1 = Mock(Unit)
+ unit1.entity_id = "lcm/9"
+ unit1.entity_type = "unit"
+ self.application.units = [
+ unit1,
+ ]
+ self._ensure_state("upgrade-podspec-stateless.log", mock_all_watcher)
+
+ def test_sidecar_upgrade(self, mock_all_watcher):
+ unit1 = Mock(Unit)
+ unit1.entity_id = "kafka/0"
+ unit1.entity_type = "unit"
+ self.application.units = [
+ unit1,
+ ]
+ self._ensure_state("upgrade-sidecar.log", mock_all_watcher)
+
+