1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
30 from kafka
import KafkaProducer
, KafkaConsumer
31 from kafka
.errors
import KafkaError
33 from osm_policy_module
.core
.agent
import PolicyModuleAgent
34 from osm_policy_module
.core
.config
import Config
36 log
= logging
.getLogger()
37 log
.level
= logging
.INFO
38 stream_handler
= logging
.StreamHandler(sys
.stdout
)
39 log
.addHandler(stream_handler
)
42 class KafkaMessagesTest(unittest
.TestCase
):
45 cfg
= Config
.instance()
46 kafka_server
= '{}:{}'.format(cfg
.OSMPOL_MESSAGE_HOST
,
47 cfg
.OSMPOL_MESSAGE_PORT
)
48 self
.producer
= KafkaProducer(bootstrap_servers
=kafka_server
,
49 key_serializer
=str.encode
,
50 value_serializer
=str.encode
)
51 self
.consumer
= KafkaConsumer(bootstrap_servers
=kafka_server
,
52 key_deserializer
=bytes
.decode
,
53 value_deserializer
=bytes
.decode
,
54 auto_offset_reset
='earliest',
55 consumer_timeout_ms
=5000)
56 self
.consumer
.subscribe(['ns'])
58 self
.skipTest('Kafka server not present.')
64 def test_send_instantiated_msg(self
):
66 os
.path
.join(os
.path
.dirname(__file__
), '../examples/instantiated.json')) as file:
67 payload
= json
.load(file)
68 self
.producer
.send('ns', json
.dumps(payload
), key
="instantiated")
71 for message
in self
.consumer
:
72 if message
.key
== 'instantiated':
73 self
.assertIsNotNone(message
.value
)
75 self
.fail("No message received in consumer")
78 if __name__
== '__main__':