Reformat POL to standardized format
[osm/POL.git] / osm_policy_module / tests / integration / test_kafka_messages.py
index 28b2e0f..725cc3f 100644 (file)
@@ -43,8 +43,9 @@ class KafkaMessagesTest(unittest.TestCase):
     def setUp(self):
         super()
         cfg = Config()
-        self.kafka_server = '{}:{}'.format(cfg.get('message', 'host'),
-                                           cfg.get('message', 'port'))
+        self.kafka_server = "{}:{}".format(
+            cfg.get("message", "host"), cfg.get("message", "port")
+        )
         self.loop = asyncio.new_event_loop()
 
     def tearDown(self):
@@ -52,30 +53,38 @@ class KafkaMessagesTest(unittest.TestCase):
 
     def test_send_instantiated_msg(self):
         async def test_send_instantiated_msg():
-            producer = AIOKafkaProducer(loop=self.loop,
-                                        bootstrap_servers=self.kafka_server,
-                                        key_serializer=str.encode,
-                                        value_serializer=str.encode)
+            producer = AIOKafkaProducer(
+                loop=self.loop,
+                bootstrap_servers=self.kafka_server,
+                key_serializer=str.encode,
+                value_serializer=str.encode,
+            )
             await producer.start()
             consumer = AIOKafkaConsumer(
                 "ns",
                 loop=self.loop,
                 bootstrap_servers=self.kafka_server,
                 consumer_timeout_ms=10000,
-                auto_offset_reset='earliest',
+                auto_offset_reset="earliest",
                 value_deserializer=bytes.decode,
-                key_deserializer=bytes.decode)
+                key_deserializer=bytes.decode,
+            )
             await consumer.start()
             try:
                 with open(
-                        os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
+                    os.path.join(
+                        os.path.dirname(__file__), "../examples/instantiated.json"
+                    )
+                ) as file:
                     payload = json.load(file)
-                    await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload))
+                    await producer.send_and_wait(
+                        "ns", key="instantiated", value=json.dumps(payload)
+                    )
             finally:
                 await producer.stop()
             try:
                 async for message in consumer:
-                    if message.key == 'instantiated':
+                    if message.key == "instantiated":
                         self.assertIsNotNone(message.value)
                         return
             finally:
@@ -84,8 +93,8 @@ class KafkaMessagesTest(unittest.TestCase):
         try:
             self.loop.run_until_complete(test_send_instantiated_msg())
         except KafkaError:
-            self.skipTest('Kafka server not present.')
+            self.skipTest("Kafka server not present.")
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     unittest.main()