2 # Copyright 2021 Canonical Ltd.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # For those usages not covered by the Apache License, Version 2.0 please
17 # contact: legal@canonical.com
19 # To get in touch with the maintainers, please contact:
20 # osm-charmers@lists.launchpad.net
24 from typing
import NoReturn
28 from charm
import PlaCharm
29 from ops
.model
import ActiveStatus
, BlockedStatus
30 from ops
.testing
import Harness
33 class TestCharm(unittest
.TestCase
):
34 """Pla Charm unit tests."""
36 def setUp(self
) -> NoReturn
:
38 self
.image_info
= sys
.modules
["oci_image"].OCIImageResource().fetch()
39 self
.harness
= Harness(PlaCharm
)
40 self
.harness
.set_leader(is_leader
=True)
46 self
.harness
.update_config(self
.config
)
48 def test_config_changed_no_relations(
51 """Test ingress resources without HTTP."""
53 self
.harness
.charm
.on
.config_changed
.emit()
56 self
.assertIsInstance(self
.harness
.charm
.unit
.status
, BlockedStatus
)
59 relation
in self
.harness
.charm
.unit
.status
.message
60 for relation
in ["mongodb", "kafka"]
64 def test_config_changed_non_leader(
67 """Test ingress resources without HTTP."""
68 self
.harness
.set_leader(is_leader
=False)
69 self
.harness
.charm
.on
.config_changed
.emit()
72 self
.assertIsInstance(self
.harness
.charm
.unit
.status
, ActiveStatus
)
74 def test_with_relations_and_mongodb_config(
77 "Test with relations and mongodb config (internal)"
78 self
.initialize_kafka_relation()
79 self
.initialize_mongo_config()
81 self
.assertNotIsInstance(self
.harness
.charm
.unit
.status
, BlockedStatus
)
83 def test_with_relations(
86 "Test with relations (internal)"
87 self
.initialize_kafka_relation()
88 self
.initialize_mongo_relation()
90 self
.assertNotIsInstance(self
.harness
.charm
.unit
.status
, BlockedStatus
)
92 def test_exception_mongodb_relation_and_config(
95 "Test with relation and config for Mongodb. Test must fail"
96 self
.initialize_mongo_relation()
97 self
.initialize_mongo_config()
99 self
.assertIsInstance(self
.harness
.charm
.unit
.status
, BlockedStatus
)
101 def initialize_kafka_relation(self
):
102 kafka_relation_id
= self
.harness
.add_relation("kafka", "kafka")
103 self
.harness
.add_relation_unit(kafka_relation_id
, "kafka/0")
104 self
.harness
.update_relation_data(
105 kafka_relation_id
, "kafka/0", {"host": "kafka", "port": 9092}
108 def initialize_mongo_config(self
):
109 self
.harness
.update_config({"mongodb_uri": "mongodb://mongo:27017"})
111 def initialize_mongo_relation(self
):
112 mongodb_relation_id
= self
.harness
.add_relation("mongodb", "mongodb")
113 self
.harness
.add_relation_unit(mongodb_relation_id
, "mongodb/0")
114 self
.harness
.update_relation_data(
117 {"connection_string": "mongodb://mongo:27017"},
121 if __name__
== "__main__":