581d53f9a07ae8cfbb7ee530a9ea23b2005c7eb5
[osm/POL.git] / osm_policy_module / tests / integration / test_kafka_messages.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 import os
27 import sys
28 import unittest
29
30 from kafka import KafkaProducer, KafkaConsumer
31 from kafka.errors import KafkaError
32
33 from osm_policy_module.core.config import Config
34
35 log = logging.getLogger()
36 log.level = logging.INFO
37 stream_handler = logging.StreamHandler(sys.stdout)
38 log.addHandler(stream_handler)
39
40
41 class KafkaMessagesTest(unittest.TestCase):
42 def setUp(self):
43 try:
44 cfg = Config.instance()
45 kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
46 cfg.OSMPOL_MESSAGE_PORT)
47 self.producer = KafkaProducer(bootstrap_servers=kafka_server,
48 key_serializer=str.encode,
49 value_serializer=str.encode)
50 self.consumer = KafkaConsumer(bootstrap_servers=kafka_server,
51 key_deserializer=bytes.decode,
52 value_deserializer=bytes.decode,
53 auto_offset_reset='earliest',
54 consumer_timeout_ms=5000)
55 self.consumer.subscribe(['ns'])
56 except KafkaError:
57 self.skipTest('Kafka server not present.')
58
59 def tearDown(self):
60 self.producer.close()
61 self.consumer.close()
62
63 def test_send_instantiated_msg(self):
64 with open(
65 os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
66 payload = json.load(file)
67 self.producer.send('ns', json.dumps(payload), key="instantiated")
68 self.producer.flush()
69
70 for message in self.consumer:
71 if message.key == 'instantiated':
72 self.assertIsNotNone(message.value)
73 return
74 self.fail("No message received in consumer")
75
76
77 if __name__ == '__main__':
78 unittest.main()