4 from aiokafka
import AIOKafkaConsumer
5 from aiokafka
import AIOKafkaProducer
6 from aiokafka
.errors
import KafkaError
7 from msgbase
import MsgBase
, MsgException
11 class MsgKafka(MsgBase
):
12 def __init__(self
, logger_name
='msg'):
13 self
.logger
= logging
.getLogger(logger_name
)
18 # create a different file for each topic
21 def connect(self
, config
):
23 if "logger_name" in config
:
24 self
.logger
= logging
.getLogger(config
["logger_name"])
25 self
.host
= config
["host"]
26 self
.port
= config
["port"]
28 self
.loop
= asyncio
.get_event_loop()
29 self
.broker
= str(self
.host
) + ":" + str(self
.port
)
31 except Exception as e
: # TODO refine
32 raise MsgException(str(e
))
34 def write(self
, topic
, key
, msg
):
36 self
.loop
.run_until_complete(self
.aiowrite(topic
=topic
, key
=key
, msg
=yaml
.safe_dump(msg
, default_flow_style
=True)))
38 except Exception as e
:
39 raise MsgException("Error writing {} topic: {}".format(topic
, str(e
)))
41 def read(self
, topic
):
42 #self.topic_lst.append(topic)
44 return self
.loop
.run_until_complete(self
.aioread(topic
))
45 except Exception as e
:
46 raise MsgException("Error reading {} topic: {}".format(topic
, str(e
)))
48 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
52 self
.producer
= AIOKafkaProducer(loop
=loop
, key_serializer
=str.encode
, value_serializer
=str.encode
,
53 bootstrap_servers
=self
.broker
)
54 await self
.producer
.start()
55 await self
.producer
.send(topic
=topic
, key
=key
, value
=msg
)
56 except Exception as e
:
57 raise MsgException("Error publishing to {} topic: {}".format(topic
, str(e
)))
59 await self
.producer
.stop()
61 async def aioread(self
, topic
, loop
=None):
64 self
.consumer
= AIOKafkaConsumer(loop
=loop
, bootstrap_servers
=self
.broker
)
65 await self
.consumer
.start()
66 self
.consumer
.subscribe([topic
])
68 async for message
in self
.consumer
:
69 return yaml
.load(message
.key
), yaml
.load(message
.value
)
70 except KafkaError
as e
:
71 raise MsgException(str(e
))
73 await self
.consumer
.stop()