6 from kafka
import KafkaProducer
, KafkaConsumer
7 from kafka
.errors
import KafkaError
9 log
= logging
.getLogger(__name__
)
12 class ScalingConfigTest(unittest
.TestCase
):
15 kafka_server
= '{}:{}'.format(os
.getenv("KAFKA_SERVER_HOST", "localhost"),
16 os
.getenv("KAFKA_SERVER_PORT", "9092"))
17 self
.producer
= KafkaProducer(bootstrap_servers
=kafka_server
,
18 key_serializer
=str.encode
,
19 value_serializer
=str.encode
)
20 self
.consumer
= KafkaConsumer(bootstrap_servers
='localhost:9092',
22 self
.consumer
.subscribe(['lcm_pm'])
24 self
.skipTest('Kafka server not present.')
26 def test_send_scaling_config_msg(self
):
29 os
.path
.join(os
.path
.dirname(__file__
), '../examples/configure_scaling_full_example.json')) as file:
30 payload
= json
.load(file)
31 future
= self
.producer
.send('lcm_pm', json
.dumps(payload
), key
="configure_scaling")
32 result
= future
.get(timeout
=60)
33 log
.info('Result: %s', result
)
36 # TODO: Improve assertions
37 self
.assertIsNotNone(result
)
38 except Exception as e
:
42 if __name__
== '__main__':