blob: b9e9e3679fce5b48a2a6621343b63e4eb7ac2f6b [file] [log] [blame]
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from time import sleep
import asynctest
import asyncio
from n2vc.juju_watcher import JujuModelWatcher, entity_ready, status
from n2vc.exceptions import EntityInvalidException
from .utils import FakeN2VC, AsyncMock, Deltas, FakeWatcher
from juju.application import Application
from juju.action import Action
from juju.annotation import Annotation
from juju.client._definitions import AllWatcherNextResults
from juju.machine import Machine
from juju.model import Model
from juju.unit import Unit
from unittest import mock, TestCase
from unittest.mock import Mock
class JujuWatcherTest(asynctest.TestCase):
def setUp(self):
self.n2vc = FakeN2VC()
self.model = Mock()
self.loop = asyncio.new_event_loop()
def test_get_status(self):
tests = Deltas
for test in tests:
(status, message, vca_status) = JujuModelWatcher.get_status(test.delta)
self.assertEqual(status, test.entity_status.status)
self.assertEqual(message, test.entity_status.message)
self.assertEqual(vca_status, test.entity_status.vca_status)
@mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
def test_model_watcher(self, allwatcher):
tests = Deltas
allwatcher.return_value = FakeWatcher()
n2vc = AsyncMock()
for test in tests:
with self.assertRaises(asyncio.TimeoutError):
allwatcher.return_value.delta_to_return = [test.delta]
self.loop.run_until_complete(
JujuModelWatcher.model_watcher(
self.model,
test.filter.entity_id,
test.filter.entity_type,
timeout=0,
db_dict={"something"},
n2vc=n2vc,
vca_id=None,
)
)
n2vc.write_app_status_to_db.assert_called()
@mock.patch("n2vc.juju_watcher.asyncio.wait")
def test_wait_for(self, wait):
wait.return_value = asyncio.Future()
wait.return_value.set_result(None)
machine = AsyncMock()
self.loop.run_until_complete(JujuModelWatcher.wait_for(self.model, machine))
@mock.patch("n2vc.juju_watcher.asyncio.wait")
def test_wait_for_exception(self, wait):
wait.return_value = asyncio.Future()
wait.return_value.set_result(None)
wait.side_effect = Exception("error")
machine = AsyncMock()
with self.assertRaises(Exception):
self.loop.run_until_complete(JujuModelWatcher.wait_for(self.model, machine))
def test_wait_for_invalid_entity_exception(self):
with self.assertRaises(EntityInvalidException):
self.loop.run_until_complete(
JujuModelWatcher.wait_for(
self.model,
Annotation(0, self.model),
total_timeout=None,
progress_timeout=None,
)
)
class EntityReadyTest(TestCase):
@mock.patch("juju.application.Application.units")
def setUp(self, mock_units):
self.model = Model()
self.model._connector = mock.MagicMock()
def test_invalid_entity(self):
with self.assertRaises(EntityInvalidException):
entity_ready(Annotation(0, self.model))
@mock.patch("juju.machine.Machine.agent_status")
def test_machine_entity(self, mock_machine_agent_status):
entity = Machine(0, self.model)
self.assertEqual(entity.entity_type, "machine")
self.assertTrue(isinstance(entity_ready(entity), bool))
@mock.patch("juju.action.Action.status")
def test_action_entity(self, mock_action_status):
entity = Action(0, self.model)
self.assertEqual(entity.entity_type, "action")
self.assertTrue(isinstance(entity_ready(entity), bool))
@mock.patch("juju.application.Application.status")
def test_application_entity(self, mock_application_status):
entity = Application(0, self.model)
self.assertEqual(entity.entity_type, "application")
self.assertTrue(isinstance(entity_ready(entity), bool))
@mock.patch("n2vc.juju_watcher.client.AllWatcherFacade.from_connection")
class EntityStateTest(TestCase):
def setUp(self):
self.model = Model()
self.model._connector = mock.MagicMock()
self.loop = asyncio.new_event_loop()
self.application = Mock(Application)
self.upgrade_file = None
self.line_number = 1
def _fetch_next_delta(self):
delta = None
while delta is None:
raw_data = self.upgrade_file.readline()
if not raw_data:
raise EOFError("Log file is out of events")
try:
delta = json.loads(raw_data)
except ValueError:
continue
if delta[0] == "unit":
if delta[2]["life"] == "dead":
# Remove the unit from the application
for unit in self.application.units:
if unit.entity_id == delta[2]["name"]:
self.application.units.remove(unit)
else:
unit_present = False
for unit in self.application.units:
if unit.entity_id == delta[2]["name"]:
unit_present = True
if not unit_present:
print("Application gets a new unit: {}".format(delta[2]["name"]))
unit = Mock(Unit)
unit.entity_id = delta[2]["name"]
unit.entity_type = "unit"
self.application.units.append(unit)
print("{} {}".format(self.line_number, delta))
self.line_number = self.line_number + 1
return AllWatcherNextResults(
deltas=[
delta,
]
)
def _ensure_state(self, filename, mock_all_watcher):
with open(
os.path.join(os.path.dirname(__file__), "testdata", filename),
"r",
) as self.upgrade_file:
all_changes = AsyncMock()
all_changes.Next.side_effect = self._fetch_next_delta
mock_all_watcher.return_value = all_changes
self.loop.run_until_complete(
JujuModelWatcher.ensure_units_idle(
model=self.model, application=self.application
)
)
with self.assertRaises(EOFError, msg="Not all events consumed"):
change = self._fetch_next_delta()
print(change.deltas[0].deltas)
def _slow_changes(self):
sleep(0.1)
return AllWatcherNextResults(
deltas=[
json.loads(
"""["unit","change",
{
"name": "app-vnf-7a49ace2b6-z0/2",
"application": "app-vnf-7a49ace2b6-z0",
"workload-status": {
"current": "active",
"message": "",
"since": "2022-04-26T18:50:27.579802723Z"},
"agent-status": {
"current": "idle",
"message": "",
"since": "2022-04-26T18:50:28.592142816Z"}
}]"""
),
]
)
def test_timeout(self, mock_all_watcher):
unit1 = Mock(Unit)
unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
unit1.entity_type = "unit"
self.application.units = [
unit1,
]
all_changes = AsyncMock()
all_changes.Next.side_effect = self._slow_changes
mock_all_watcher.return_value = all_changes
with self.assertRaises(TimeoutError):
self.loop.run_until_complete(
JujuModelWatcher.wait_for_units_idle(
model=self.model, application=self.application, timeout=0.01
)
)
def test_machine_unit_upgrade(self, mock_all_watcher):
unit1 = Mock(Unit)
unit1.entity_id = "app-vnf-7a49ace2b6-z0/0"
unit1.entity_type = "unit"
unit2 = Mock(Unit)
unit2.entity_id = "app-vnf-7a49ace2b6-z0/1"
unit2.entity_type = "unit"
unit3 = Mock(Unit)
unit3.entity_id = "app-vnf-7a49ace2b6-z0/2"
unit3.entity_type = "unit"
self.application.units = [unit1, unit2, unit3]
self._ensure_state("upgrade-machine.log", mock_all_watcher)
def test_operator_upgrade(self, mock_all_watcher):
unit1 = Mock(Unit)
unit1.entity_id = "sshproxy/0"
unit1.entity_type = "unit"
self.application.units = [
unit1,
]
self._ensure_state("upgrade-operator.log", mock_all_watcher)
def test_podspec_stateful_upgrade(self, mock_all_watcher):
unit1 = Mock(Unit)
unit1.entity_id = "mongodb/0"
unit1.entity_type = "unit"
self.application.units = [
unit1,
]
self._ensure_state("upgrade-podspec-stateful.log", mock_all_watcher)
def test_podspec_stateless_upgrade(self, mock_all_watcher):
unit1 = Mock(Unit)
unit1.entity_id = "lcm/9"
unit1.entity_type = "unit"
self.application.units = [
unit1,
]
self._ensure_state("upgrade-podspec-stateless.log", mock_all_watcher)
def test_sidecar_upgrade(self, mock_all_watcher):
unit1 = Mock(Unit)
unit1.entity_id = "kafka/0"
unit1.entity_type = "unit"
self.application.units = [
unit1,
]
self._ensure_state("upgrade-sidecar.log", mock_all_watcher)
class StatusTest(TestCase):
def setUp(self):
self.model = Model()
self.model._connector = mock.MagicMock()
@mock.patch("n2vc.juju_watcher.derive_status")
def test_invalid_entity(self, mock_derive_status):
application = mock.MagicMock()
mock_derive_status.return_value = "active"
class FakeUnit:
@property
def workload_status(self):
return "active"
application.units = [FakeUnit()]
value = status(application)
mock_derive_status.assert_called_once()
self.assertTrue(isinstance(value, str))
@asynctest.mock.patch("asyncio.sleep")
class WaitForModelTest(asynctest.TestCase):
@asynctest.mock.patch("juju.client.connector.Connector.connect")
def setUp(self, mock_connect=None):
self.loop = asyncio.new_event_loop()
self.model = Model()
@asynctest.mock.patch("juju.model.Model.block_until")
def test_wait_for_model(self, mock_block_until, mock_sleep):
self.loop.run_until_complete(
JujuModelWatcher.wait_for_model(self.model, timeout=None)
)
mock_block_until.assert_called()
@asynctest.mock.patch("asyncio.ensure_future")
@asynctest.mock.patch("asyncio.wait")
def test_wait_for_model_exception(self, mock_wait, mock_ensure_future, mock_sleep):
task = Mock()
mock_ensure_future.return_value = task
mock_wait.side_effect = Exception
with self.assertRaises(Exception):
self.loop.run_until_complete(
JujuModelWatcher.wait_for_model(self.model, timeout=None)
)
task.cancel.assert_called()