Reformat POL to standardized format
[osm/POL.git] / osm_policy_module / tests / integration / test_kafka_messages.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 import json
26 import logging
27 import os
28 import sys
29 import unittest
30
31 from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
32 from kafka.errors import KafkaError
33
34 from osm_policy_module.core.config import Config
35
36 log = logging.getLogger()
37 log.level = logging.INFO
38 stream_handler = logging.StreamHandler(sys.stdout)
39 log.addHandler(stream_handler)
40
41
42 class KafkaMessagesTest(unittest.TestCase):
43 def setUp(self):
44 super()
45 cfg = Config()
46 self.kafka_server = "{}:{}".format(
47 cfg.get("message", "host"), cfg.get("message", "port")
48 )
49 self.loop = asyncio.new_event_loop()
50
51 def tearDown(self):
52 super()
53
54 def test_send_instantiated_msg(self):
55 async def test_send_instantiated_msg():
56 producer = AIOKafkaProducer(
57 loop=self.loop,
58 bootstrap_servers=self.kafka_server,
59 key_serializer=str.encode,
60 value_serializer=str.encode,
61 )
62 await producer.start()
63 consumer = AIOKafkaConsumer(
64 "ns",
65 loop=self.loop,
66 bootstrap_servers=self.kafka_server,
67 consumer_timeout_ms=10000,
68 auto_offset_reset="earliest",
69 value_deserializer=bytes.decode,
70 key_deserializer=bytes.decode,
71 )
72 await consumer.start()
73 try:
74 with open(
75 os.path.join(
76 os.path.dirname(__file__), "../examples/instantiated.json"
77 )
78 ) as file:
79 payload = json.load(file)
80 await producer.send_and_wait(
81 "ns", key="instantiated", value=json.dumps(payload)
82 )
83 finally:
84 await producer.stop()
85 try:
86 async for message in consumer:
87 if message.key == "instantiated":
88 self.assertIsNotNone(message.value)
89 return
90 finally:
91 await consumer.stop()
92
93 try:
94 self.loop.run_until_complete(test_send_instantiated_msg())
95 except KafkaError:
96 self.skipTest("Kafka server not present.")
97
98
99 if __name__ == "__main__":
100 unittest.main()