1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
31 from aiokafka
import AIOKafkaProducer
, AIOKafkaConsumer
32 from kafka
.errors
import KafkaError
34 from osm_policy_module
.core
.config
import Config
36 log
= logging
.getLogger()
37 log
.level
= logging
.INFO
38 stream_handler
= logging
.StreamHandler(sys
.stdout
)
39 log
.addHandler(stream_handler
)
42 class KafkaMessagesTest(unittest
.TestCase
):
46 self
.kafka_server
= "{}:{}".format(
47 cfg
.get("message", "host"), cfg
.get("message", "port")
49 self
.loop
= asyncio
.new_event_loop()
54 def test_send_instantiated_msg(self
):
55 async def test_send_instantiated_msg():
56 producer
= AIOKafkaProducer(
58 bootstrap_servers
=self
.kafka_server
,
59 key_serializer
=str.encode
,
60 value_serializer
=str.encode
,
62 await producer
.start()
63 consumer
= AIOKafkaConsumer(
66 bootstrap_servers
=self
.kafka_server
,
67 consumer_timeout_ms
=10000,
68 auto_offset_reset
="earliest",
69 value_deserializer
=bytes
.decode
,
70 key_deserializer
=bytes
.decode
,
72 await consumer
.start()
76 os
.path
.dirname(__file__
), "../examples/instantiated.json"
79 payload
= json
.load(file)
80 await producer
.send_and_wait(
81 "ns", key
="instantiated", value
=json
.dumps(payload
)
86 async for message
in consumer
:
87 if message
.key
== "instantiated":
88 self
.assertIsNotNone(message
.value
)
94 self
.loop
.run_until_complete(test_send_instantiated_msg())
96 self
.skipTest("Kafka server not present.")
99 if __name__
== "__main__":