X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_policy_module%2Ftests%2Fintegration%2Ftest_kafka_messages.py;h=725cc3fcc2ba8e79bb68fe9fc06c48fa9811ef38;hb=c72b9d5f574d51608e4810294004414c7a9c02fe;hp=29d88e2db36989d3c5e9b7603714c6da6f75f888;hpb=16256cbbf6cdfde8debc3254bf55ce0b8fa51b08;p=osm%2FPOL.git diff --git a/osm_policy_module/tests/integration/test_kafka_messages.py b/osm_policy_module/tests/integration/test_kafka_messages.py index 29d88e2..725cc3f 100644 --- a/osm_policy_module/tests/integration/test_kafka_messages.py +++ b/osm_policy_module/tests/integration/test_kafka_messages.py @@ -42,41 +42,49 @@ log.addHandler(stream_handler) class KafkaMessagesTest(unittest.TestCase): def setUp(self): super() - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) + cfg = Config() + self.kafka_server = "{}:{}".format( + cfg.get("message", "host"), cfg.get("message", "port") + ) self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) def tearDown(self): super() def test_send_instantiated_msg(self): async def test_send_instantiated_msg(): - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode, + ) await producer.start() consumer = AIOKafkaConsumer( "ns", loop=self.loop, bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000, - auto_offset_reset='earliest', + auto_offset_reset="earliest", value_deserializer=bytes.decode, - key_deserializer=bytes.decode) + key_deserializer=bytes.decode, + ) await consumer.start() try: with open( - os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file: + os.path.join( + os.path.dirname(__file__), "../examples/instantiated.json" + ) + ) as file: payload = json.load(file) - await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload)) + await producer.send_and_wait( + "ns", key="instantiated", value=json.dumps(payload) + ) finally: await producer.stop() try: async for message in consumer: - if message.key == 'instantiated': + if message.key == "instantiated": self.assertIsNotNone(message.value) return finally: @@ -85,8 +93,8 @@ class KafkaMessagesTest(unittest.TestCase): try: self.loop.run_until_complete(test_send_instantiated_msg()) except KafkaError: - self.skipTest('Kafka server not present.') + self.skipTest("Kafka server not present.") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()