self.harness.begin()
self.config = {
"log_level": "INFO",
+ "mongodb_uri": "",
}
self.harness.update_config(self.config)
# Assertions
self.assertIsInstance(self.harness.charm.unit.status, ActiveStatus)
+ def test_with_relations_and_mongodb_config(
+ self,
+ ) -> NoReturn:
+ "Test with relations and mongodb config (internal)"
+ self.initialize_mysql_relation()
+ self.initialize_kafka_relation()
+ self.initialize_mongo_config()
+ # Verifying status
+ self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
+
def test_with_relations(
self,
) -> NoReturn:
"Test with relations (internal)"
self.initialize_kafka_relation()
self.initialize_mongo_relation()
+ self.initialize_mysql_relation()
+ # Verifying status
+ self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
+
+ def test_exception_mongodb_relation_and_config(
+ self,
+ ) -> NoReturn:
+ "Test with relation and config for Mongodb. Must fail"
+ self.initialize_mongo_relation()
+ self.initialize_mongo_config()
+ # Verifying status
+ self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
+
+ def test_mysql_config_success(self):
+ self.initialize_kafka_relation()
+ self.initialize_mongo_relation()
+ self.initialize_mysql_config()
# Verifying status
self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
+ def test_mysql_config_wrong_value(self):
+ self.initialize_kafka_relation()
+ self.initialize_mongo_relation()
+ self.initialize_mysql_config(uri="wrong_uri")
+ # Verifying status
+ self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
+ self.assertIn(
+ "mysql_uri is not properly formed",
+ self.harness.charm.unit.status.message,
+ )
+
+ def test_mysql_config_and_relation(self):
+ self.initialize_mysql_relation()
+ self.initialize_mysql_config()
+ # Verifying status
+ self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
+ # import pdb; pdb.set_trace()
+ self.assertIn(
+ "Mysql data cannot be provided via config and relation",
+ self.harness.charm.unit.status.message,
+ )
+
def initialize_kafka_relation(self):
kafka_relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(kafka_relation_id, "kafka/0")
kafka_relation_id, "kafka/0", {"host": "kafka", "port": 9092}
)
+ def initialize_mongo_config(self):
+ self.harness.update_config({"mongodb_uri": "mongodb://mongo:27017"})
+
def initialize_mongo_relation(self):
mongodb_relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(mongodb_relation_id, "mongodb/0")
{"connection_string": "mongodb://mongo:27017"},
)
+ def initialize_mysql_config(self, uri=None):
+ self.harness.update_config(
+ {"mysql_uri": uri or "mysql://user:pass@mysql-host:3306/database"}
+ )
+
+ def initialize_mysql_relation(self):
+ mongodb_relation_id = self.harness.add_relation("mysql", "mysql")
+ self.harness.add_relation_unit(mongodb_relation_id, "mysql/0")
+ self.harness.update_relation_data(
+ mongodb_relation_id,
+ "mysql/0",
+ {
+ "user": "user",
+ "password": "pass",
+ "host": "host",
+ "port": "1234",
+ "database": "pol",
+ "root_password": "root_password",
+ },
+ )
+
if __name__ == "__main__":
unittest.main()