blob: debc378b491380f0e1e7cd3be15b2bfd65b8b205 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: legal@canonical.com
#
# To get in touch with the maintainers, please contact:
# osm-charmers@lists.launchpad.net
##
import sys
from typing import NoReturn
import unittest
from charm import PlaCharm
from ops.model import ActiveStatus, BlockedStatus
from ops.testing import Harness
class TestCharm(unittest.TestCase):
"""Pla Charm unit tests."""
def setUp(self) -> NoReturn:
"""Test setup"""
self.image_info = sys.modules["oci_image"].OCIImageResource().fetch()
self.harness = Harness(PlaCharm)
self.harness.set_leader(is_leader=True)
self.harness.begin()
self.config = {
"log_level": "INFO",
"mongodb_uri": "",
}
self.harness.update_config(self.config)
def test_config_changed_no_relations(
self,
) -> NoReturn:
"""Test ingress resources without HTTP."""
self.harness.charm.on.config_changed.emit()
# Assertions
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
self.assertTrue(
all(
relation in self.harness.charm.unit.status.message
for relation in ["mongodb", "kafka"]
)
)
def test_config_changed_non_leader(
self,
) -> NoReturn:
"""Test ingress resources without HTTP."""
self.harness.set_leader(is_leader=False)
self.harness.charm.on.config_changed.emit()
# Assertions
self.assertIsInstance(self.harness.charm.unit.status, ActiveStatus)
def test_with_relations_and_mongodb_config(
self,
) -> NoReturn:
"Test with relations and mongodb config (internal)"
self.initialize_kafka_relation()
self.initialize_mongo_config()
# Verifying status
self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
def test_with_relations(
self,
) -> NoReturn:
"Test with relations (internal)"
self.initialize_kafka_relation()
self.initialize_mongo_relation()
# Verifying status
self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
def test_exception_mongodb_relation_and_config(
self,
) -> NoReturn:
"Test with relation and config for Mongodb. Test must fail"
self.initialize_mongo_relation()
self.initialize_mongo_config()
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
def initialize_kafka_relation(self):
kafka_relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(kafka_relation_id, "kafka/0")
self.harness.update_relation_data(
kafka_relation_id, "kafka/0", {"host": "kafka", "port": 9092}
)
def initialize_mongo_config(self):
self.harness.update_config({"mongodb_uri": "mongodb://mongo:27017"})
def initialize_mongo_relation(self):
mongodb_relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(mongodb_relation_id, "mongodb/0")
self.harness.update_relation_data(
mongodb_relation_id,
"mongodb/0",
{"connection_string": "mongodb://mongo:27017"},
)
if __name__ == "__main__":
unittest.main()