28b2e0f51ee9dc240a07a272ad07ab55d379a52f
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
31 from aiokafka
import AIOKafkaProducer
, AIOKafkaConsumer
32 from kafka
.errors
import KafkaError
34 from osm_policy_module
.core
.config
import Config
36 log
= logging
.getLogger()
37 log
.level
= logging
.INFO
38 stream_handler
= logging
.StreamHandler(sys
.stdout
)
39 log
.addHandler(stream_handler
)
42 class KafkaMessagesTest(unittest
.TestCase
):
46 self
.kafka_server
= '{}:{}'.format(cfg
.get('message', 'host'),
47 cfg
.get('message', 'port'))
48 self
.loop
= asyncio
.new_event_loop()
53 def test_send_instantiated_msg(self
):
54 async def test_send_instantiated_msg():
55 producer
= AIOKafkaProducer(loop
=self
.loop
,
56 bootstrap_servers
=self
.kafka_server
,
57 key_serializer
=str.encode
,
58 value_serializer
=str.encode
)
59 await producer
.start()
60 consumer
= AIOKafkaConsumer(
63 bootstrap_servers
=self
.kafka_server
,
64 consumer_timeout_ms
=10000,
65 auto_offset_reset
='earliest',
66 value_deserializer
=bytes
.decode
,
67 key_deserializer
=bytes
.decode
)
68 await consumer
.start()
71 os
.path
.join(os
.path
.dirname(__file__
), '../examples/instantiated.json')) as file:
72 payload
= json
.load(file)
73 await producer
.send_and_wait("ns", key
="instantiated", value
=json
.dumps(payload
))
77 async for message
in consumer
:
78 if message
.key
== 'instantiated':
79 self
.assertIsNotNone(message
.value
)
85 self
.loop
.run_until_complete(test_send_instantiated_msg())
87 self
.skipTest('Kafka server not present.')
90 if __name__
== '__main__':