6 from kafka
import KafkaProducer
8 log
= logging
.getLogger(__name__
)
11 # logging.basicConfig(stream=sys.stdout,
12 # format='%(asctime)s %(message)s',
13 # datefmt='%m/%d/%Y %I:%M:%S %p',
14 # level=logging.DEBUG)
16 class ScalingConfigTest(unittest
.TestCase
):
17 def test_send_scaling_config_msg(self
):
20 os
.path
.join(os
.path
.dirname(__file__
), '../examples/configure_scaling_full_example.json')) as file:
21 payload
= json
.load(file)
22 kafka_server
= '{}:{}'.format(os
.getenv("KAFKA_SERVER_HOST", "localhost"),
23 os
.getenv("KAFKA_SERVER_PORT", "9092"))
24 producer
= KafkaProducer(bootstrap_servers
=kafka_server
,
25 key_serializer
=str.encode
,
26 value_serializer
=str.encode
)
27 future
= producer
.send('lcm_pm', json
.dumps(payload
), key
="configure_scaling")
28 result
= future
.get(timeout
=60)
29 log
.info('Result: %s', result
)
32 self
.assertIsNotNone(result
)
33 except Exception as e
:
37 if __name__
== '__main__':