Args:
event (EventBase): Kafka relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- message_host = event.relation.data[data_loc].get("host")
- message_port = event.relation.data[data_loc].get("port")
+ message_host = event.relation.data[event.unit].get("host")
+ message_port = event.relation.data[event.unit].get("port")
if (
message_host
Args:
event (EventBase): DB relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- database_uri = event.relation.data[data_loc].get("connection_string")
+ database_uri = event.relation.data[event.unit].get("connection_string")
if database_uri and self.state.database_uri != database_uri:
self.state.database_uri = database_uri
Args:
event (EventBase): Keystone relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- keystone_host = event.relation.data[data_loc].get("host")
- keystone_port = event.relation.data[data_loc].get("port")
- keystone_user_domain_name = event.relation.data[data_loc].get(
+ keystone_host = event.relation.data[event.unit].get("host")
+ keystone_port = event.relation.data[event.unit].get("port")
+ keystone_user_domain_name = event.relation.data[event.unit].get(
"user_domain_name"
)
- keystone_project_domain_name = event.relation.data[data_loc].get(
+ keystone_project_domain_name = event.relation.data[event.unit].get(
"project_domain_name"
)
- keystone_username = event.relation.data[data_loc].get("username")
- keystone_password = event.relation.data[data_loc].get("password")
- keystone_service = event.relation.data[data_loc].get("service")
+ keystone_username = event.relation.data[event.unit].get("username")
+ keystone_password = event.relation.data[event.unit].get("password")
+ keystone_service = event.relation.data[event.unit].get("service")
if (
keystone_host
Args:
event (EventBase): Prometheus relation event.
"""
- data_loc = event.unit if event.unit else event.app
-
- prometheus_host = event.relation.data[data_loc].get("hostname")
- prometheus_port = event.relation.data[data_loc].get("port")
+ prometheus_host = event.relation.data[event.unit].get("hostname")
+ prometheus_port = event.relation.data[event.unit].get("port")
if (
prometheus_host
self.assertNotIn("keystone", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
- def test_on_kafka_app_relation_changed(self) -> NoReturn:
- """Test to see if kafka relation is updated."""
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.message_host)
- self.assertIsNone(self.harness.charm.state.message_port)
-
- relation_id = self.harness.add_relation("kafka", "kafka")
- self.harness.add_relation_unit(relation_id, "kafka/0")
- self.harness.update_relation_data(
- relation_id, "kafka", {"host": "kafka", "port": 9092}
- )
-
- self.assertEqual(self.harness.charm.state.message_host, "kafka")
- self.assertEqual(self.harness.charm.state.message_port, 9092)
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertNotIn("kafka", self.harness.charm.unit.status.message)
- self.assertIn("mongodb", self.harness.charm.unit.status.message)
- self.assertIn("prometheus", self.harness.charm.unit.status.message)
- self.assertNotIn("keystone", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_on_mongodb_unit_relation_changed(self) -> NoReturn:
"""Test to see if mongodb relation is updated."""
self.harness.charm.on.start.emit()
self.assertNotIn("keystone", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
- def test_on_mongodb_app_relation_changed(self) -> NoReturn:
- """Test to see if mongodb relation is updated."""
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.database_uri)
-
- relation_id = self.harness.add_relation("mongodb", "mongodb")
- self.harness.add_relation_unit(relation_id, "mongodb/0")
- self.harness.update_relation_data(
- relation_id, "mongodb", {"connection_string": "mongodb://mongo:27017"}
- )
-
- self.assertEqual(self.harness.charm.state.database_uri, "mongodb://mongo:27017")
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertIn("kafka", self.harness.charm.unit.status.message)
- self.assertNotIn("mongodb", self.harness.charm.unit.status.message)
- self.assertIn("prometheus", self.harness.charm.unit.status.message)
- self.assertNotIn("keystone", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_on_prometheus_unit_relation_changed(self) -> NoReturn:
"""Test to see if prometheus relation is updated."""
self.harness.charm.on.start.emit()
self.assertNotIn("keystone", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
- def test_on_prometheus_app_relation_changed(self) -> NoReturn:
- """Test to see if prometheus relation is updated."""
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.prometheus_host)
- self.assertIsNone(self.harness.charm.state.prometheus_port)
-
- relation_id = self.harness.add_relation("prometheus", "prometheus")
- self.harness.add_relation_unit(relation_id, "prometheus/0")
- self.harness.update_relation_data(
- relation_id, "prometheus", {"hostname": "prometheus", "port": 9090}
- )
-
- self.assertEqual(self.harness.charm.state.prometheus_host, "prometheus")
- self.assertEqual(self.harness.charm.state.prometheus_port, 9090)
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertIn("kafka", self.harness.charm.unit.status.message)
- self.assertIn("mongodb", self.harness.charm.unit.status.message)
- self.assertNotIn("prometheus", self.harness.charm.unit.status.message)
- self.assertNotIn("keystone", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_on_keystone_unit_relation_changed(self) -> NoReturn:
"""Test to see if keystone relation is updated."""
self.harness.update_config({"auth_backend": "keystone"})
self.assertNotIn("keystone", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
- def test_on_keystone_app_relation_changed(self) -> NoReturn:
- """Test to see if keystone relation is updated."""
- self.harness.update_config({"auth_backend": "keystone"})
-
- self.harness.charm.on.start.emit()
-
- self.assertIsNone(self.harness.charm.state.keystone_host)
- self.assertIsNone(self.harness.charm.state.keystone_port)
- self.assertIsNone(self.harness.charm.state.keystone_user_domain_name)
- self.assertIsNone(self.harness.charm.state.keystone_project_domain_name)
- self.assertIsNone(self.harness.charm.state.keystone_username)
- self.assertIsNone(self.harness.charm.state.keystone_password)
- self.assertIsNone(self.harness.charm.state.keystone_service)
-
- relation_id = self.harness.add_relation("keystone", "keystone")
- self.harness.add_relation_unit(relation_id, "keystone/0")
- self.harness.update_relation_data(
- relation_id,
- "keystone",
- {
- "host": "keystone",
- "port": 5000,
- "user_domain_name": "default",
- "project_domain_name": "default",
- "username": "nbi",
- "password": "nbi",
- "service": "service",
- },
- )
-
- self.assertEqual(self.harness.charm.state.keystone_host, "keystone")
- self.assertEqual(self.harness.charm.state.keystone_port, 5000)
- self.assertEqual(self.harness.charm.state.keystone_user_domain_name, "default")
- self.assertEqual(
- self.harness.charm.state.keystone_project_domain_name, "default"
- )
- self.assertEqual(self.harness.charm.state.keystone_username, "nbi")
- self.assertEqual(self.harness.charm.state.keystone_password, "nbi")
- self.assertEqual(self.harness.charm.state.keystone_service, "service")
-
- # Verifying status
- self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
-
- # Verifying status message
- self.assertGreater(len(self.harness.charm.unit.status.message), 0)
- self.assertTrue(
- self.harness.charm.unit.status.message.startswith("Waiting for ")
- )
- self.assertIn("kafka", self.harness.charm.unit.status.message)
- self.assertIn("mongodb", self.harness.charm.unit.status.message)
- self.assertIn("prometheus", self.harness.charm.unit.status.message)
- self.assertNotIn("keystone", self.harness.charm.unit.status.message)
- self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
-
def test_publish_nbi_info(self) -> NoReturn:
"""Test to see if nbi relation is updated."""
expected_result = {
relation_id = self.harness.add_relation("nbi", "ng-ui")
self.harness.add_relation_unit(relation_id, "ng-ui/0")
- relation_data = self.harness.get_relation_data(relation_id, "nbi")
+ relation_data = self.harness.get_relation_data(relation_id, "nbi/0")
self.assertDictEqual(expected_result, relation_data)