Fixes bugs for integration with MON module
[osm/MON.git] / policy_module / osm_policy_module / tests / integration / test_scaling_config_kafka_msg.py
1 import json
2 import logging
3 import os
4 import unittest
5
6 from kafka import KafkaProducer, KafkaConsumer
7 from kafka.errors import KafkaError
8
9 log = logging.getLogger(__name__)
10
11
12 class ScalingConfigTest(unittest.TestCase):
13 def setUp(self):
14 try:
15 kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
16 os.getenv("KAFKA_SERVER_PORT", "9092"))
17 self.producer = KafkaProducer(bootstrap_servers=kafka_server,
18 key_serializer=str.encode,
19 value_serializer=str.encode)
20 self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
21 group_id='osm_mon')
22 self.consumer.subscribe(['lcm_pm'])
23 except KafkaError:
24 self.skipTest('Kafka server not present.')
25
26 def test_send_scaling_config_msg(self):
27 try:
28 with open(
29 os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
30 payload = json.load(file)
31 future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
32 result = future.get(timeout=60)
33 log.info('Result: %s', result)
34
35 self.producer.flush()
36 # TODO: Improve assertions
37 self.assertIsNotNone(result)
38 except Exception as e:
39 self.fail(e)
40
41
42 if __name__ == '__main__':
43 unittest.main()