Migrates POL code from MON repo
[osm/POL.git] / osm_policy_module / tests / integration / test_kafka_messages.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 import os
27 import sys
28 import unittest
29
30 from kafka import KafkaProducer, KafkaConsumer
31 from kafka.errors import KafkaError
32
33 from osm_policy_module.core.agent import PolicyModuleAgent
34 from osm_policy_module.core.config import Config
35
36 log = logging.getLogger()
37 log.level = logging.INFO
38 stream_handler = logging.StreamHandler(sys.stdout)
39 log.addHandler(stream_handler)
40
41
42 class KafkaMessagesTest(unittest.TestCase):
43 def setUp(self):
44 try:
45 cfg = Config.instance()
46 kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
47 cfg.OSMPOL_MESSAGE_PORT)
48 self.producer = KafkaProducer(bootstrap_servers=kafka_server,
49 key_serializer=str.encode,
50 value_serializer=str.encode)
51 self.consumer = KafkaConsumer(bootstrap_servers=kafka_server,
52 key_deserializer=bytes.decode,
53 value_deserializer=bytes.decode,
54 auto_offset_reset='earliest',
55 consumer_timeout_ms=5000)
56 self.consumer.subscribe(['ns'])
57 except KafkaError:
58 self.skipTest('Kafka server not present.')
59
60 def tearDown(self):
61 self.producer.close()
62 self.consumer.close()
63
64 def test_send_instantiated_msg(self):
65 with open(
66 os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
67 payload = json.load(file)
68 self.producer.send('ns', json.dumps(payload), key="instantiated")
69 self.producer.flush()
70
71 for message in self.consumer:
72 if message.key == 'instantiated':
73 self.assertIsNotNone(message.value)
74 return
75 self.fail("No message received in consumer")
76
77
78 if __name__ == '__main__':
79 unittest.main()