1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
31 from aiokafka
import AIOKafkaProducer
, AIOKafkaConsumer
32 from kafka
.errors
import KafkaError
34 from osm_policy_module
.core
.config
import Config
36 log
= logging
.getLogger()
37 log
.level
= logging
.INFO
38 stream_handler
= logging
.StreamHandler(sys
.stdout
)
39 log
.addHandler(stream_handler
)
42 class KafkaMessagesTest(unittest
.TestCase
):
45 cfg
= Config
.instance()
46 self
.kafka_server
= '{}:{}'.format(cfg
.OSMPOL_MESSAGE_HOST
,
47 cfg
.OSMPOL_MESSAGE_PORT
)
48 self
.loop
= asyncio
.new_event_loop()
49 asyncio
.set_event_loop(None)
54 def test_send_instantiated_msg(self
):
55 async def test_send_instantiated_msg():
56 producer
= AIOKafkaProducer(loop
=self
.loop
,
57 bootstrap_servers
=self
.kafka_server
,
58 key_serializer
=str.encode
,
59 value_serializer
=str.encode
)
60 await producer
.start()
61 consumer
= AIOKafkaConsumer(
64 bootstrap_servers
=self
.kafka_server
,
65 consumer_timeout_ms
=10000,
66 auto_offset_reset
='earliest',
67 value_deserializer
=bytes
.decode
,
68 key_deserializer
=bytes
.decode
)
69 await consumer
.start()
72 os
.path
.join(os
.path
.dirname(__file__
), '../examples/instantiated.json')) as file:
73 payload
= json
.load(file)
74 await producer
.send_and_wait("ns", key
="instantiated", value
=json
.dumps(payload
))
78 async for message
in consumer
:
79 if message
.key
== 'instantiated':
80 self
.assertIsNotNone(message
.value
)
86 self
.loop
.run_until_complete(test_send_instantiated_msg())
88 self
.skipTest('Kafka server not present.')
91 if __name__
== '__main__':