Policy Module first commit
[osm/MON.git] / policy_module / osm_policy_module / tests / integration / test_scaling_config_kafka_msg.py
1 import json
2 import logging
3 import os
4 import unittest
5
6 from kafka import KafkaProducer
7
8 log = logging.getLogger(__name__)
9
10
11 # logging.basicConfig(stream=sys.stdout,
12 # format='%(asctime)s %(message)s',
13 # datefmt='%m/%d/%Y %I:%M:%S %p',
14 # level=logging.DEBUG)
15
16 class ScalingConfigTest(unittest.TestCase):
17 def test_send_scaling_config_msg(self):
18 try:
19 with open(
20 os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
21 payload = json.load(file)
22 kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
23 os.getenv("KAFKA_SERVER_PORT", "9092"))
24 producer = KafkaProducer(bootstrap_servers=kafka_server,
25 key_serializer=str.encode,
26 value_serializer=str.encode)
27 future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
28 result = future.get(timeout=60)
29 log.info('Result: %s', result)
30
31 producer.flush()
32 self.assertIsNotNone(result)
33 except Exception as e:
34 self.fail(e)
35
36
37 if __name__ == '__main__':
38 unittest.main()