blob: 68130fcfb7bbe90ad93acab13233908952713885 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: legal@canonical.com
#
# To get in touch with the maintainers, please contact:
# osm-charmers@lists.launchpad.net
##
from typing import NoReturn
import unittest
from ops.model import BlockedStatus
from ops.testing import Harness
from charm import PolCharm
class TestCharm(unittest.TestCase):
"""POL Charm unit tests."""
def setUp(self) -> NoReturn:
"""Test setup"""
self.harness = Harness(PolCharm)
self.harness.set_leader(is_leader=True)
self.harness.begin()
def test_on_start_without_relations(self) -> NoReturn:
"""Test installation without any relation."""
self.harness.charm.on.start.emit()
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("kafka", self.harness.charm.unit.status.message)
self.assertIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
def test_on_start_with_relations(self) -> NoReturn:
"""Test deployment without keystone."""
expected_result = {
"version": 3,
"containers": [
{
"name": "pol",
"imageDetails": self.harness.charm.image.fetch(),
"imagePullPolicy": "Always",
"ports": [
{
"name": "pol",
"containerPort": 80,
"protocol": "TCP",
}
],
"envConfig": {
"ALLOW_ANONYMOUS_LOGIN": "yes",
"OSMPOL_GLOBAL_LOGLEVEL": "INFO",
"OSMPOL_MESSAGE_HOST": "kafka",
"OSMPOL_MESSAGE_DRIVER": "kafka",
"OSMPOL_MESSAGE_PORT": 9092,
"OSMPOL_DATABASE_DRIVER": "mongo",
"OSMPOL_DATABASE_URI": "mongodb://mongo:27017",
},
}
],
"kubernetesResources": {"ingressResources": []},
}
self.harness.charm.on.start.emit()
# Check if kafka datastore is initialized
self.assertIsNone(self.harness.charm.state.message_host)
self.assertIsNone(self.harness.charm.state.message_port)
# Check if mongodb datastore is initialized
self.assertIsNone(self.harness.charm.state.database_uri)
# Initializing the kafka relation
kafka_relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(kafka_relation_id, "kafka/0")
self.harness.update_relation_data(
kafka_relation_id, "kafka/0", {"host": "kafka", "port": 9092}
)
# Initializing the mongo relation
mongodb_relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(mongodb_relation_id, "mongodb/0")
self.harness.update_relation_data(
mongodb_relation_id,
"mongodb/0",
{"connection_string": "mongodb://mongo:27017"},
)
# Checking if kafka data is stored
self.assertEqual(self.harness.charm.state.message_host, "kafka")
self.assertEqual(self.harness.charm.state.message_port, 9092)
# Checking if mongodb data is stored
self.assertEqual(self.harness.charm.state.database_uri, "mongodb://mongo:27017")
# Verifying status
self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
pod_spec, _ = self.harness.get_pod_spec()
self.assertDictEqual(expected_result, pod_spec)
def test_on_kafka_app_relation_changed(self) -> NoReturn:
"""Test to see if kafka relation is updated."""
self.harness.charm.on.start.emit()
self.assertIsNone(self.harness.charm.state.message_host)
self.assertIsNone(self.harness.charm.state.message_port)
relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(relation_id, "kafka/0")
self.harness.update_relation_data(
relation_id, "kafka", {"host": "kafka", "port": 9092}
)
self.assertEqual(self.harness.charm.state.message_host, "kafka")
self.assertEqual(self.harness.charm.state.message_port, 9092)
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertNotIn("kafka", self.harness.charm.unit.status.message)
self.assertIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
def test_on_kafka_unit_relation_changed(self) -> NoReturn:
"""Test to see if kafka relation is updated."""
self.harness.charm.on.start.emit()
self.assertIsNone(self.harness.charm.state.message_host)
self.assertIsNone(self.harness.charm.state.message_port)
relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(relation_id, "kafka/0")
self.harness.update_relation_data(
relation_id, "kafka/0", {"host": "kafka", "port": 9092}
)
self.assertEqual(self.harness.charm.state.message_host, "kafka")
self.assertEqual(self.harness.charm.state.message_port, 9092)
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertNotIn("kafka", self.harness.charm.unit.status.message)
self.assertIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
def test_on_mongodb_app_relation_changed(self) -> NoReturn:
"""Test to see if mongodb relation is updated."""
self.harness.charm.on.start.emit()
self.assertIsNone(self.harness.charm.state.database_uri)
relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(relation_id, "mongodb/0")
self.harness.update_relation_data(
relation_id, "mongodb", {"connection_string": "mongodb://mongo:27017"}
)
self.assertEqual(self.harness.charm.state.database_uri, "mongodb://mongo:27017")
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("kafka", self.harness.charm.unit.status.message)
self.assertNotIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
def test_on_mongodb_unit_relation_changed(self) -> NoReturn:
"""Test to see if mongodb relation is updated."""
self.harness.charm.on.start.emit()
self.assertIsNone(self.harness.charm.state.database_uri)
relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(relation_id, "mongodb/0")
self.harness.update_relation_data(
relation_id, "mongodb/0", {"connection_string": "mongodb://mongo:27017"}
)
self.assertEqual(self.harness.charm.state.database_uri, "mongodb://mongo:27017")
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("kafka", self.harness.charm.unit.status.message)
self.assertNotIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
if __name__ == "__main__":
unittest.main()