1 from aiokafka
import AIOKafkaConsumer
2 from aiokafka
import AIOKafkaProducer
3 from aiokafka
.errors
import KafkaError
4 from msgbase
import MsgBase
, MsgException
9 class msgKafka(MsgBase
):
15 # create a different file for each topic
18 def connect(self
, config
):
20 self
.host
= config
["host"]
21 self
.port
= config
["port"]
23 self
.loop
= asyncio
.get_event_loop()
24 self
.broker
= str(self
.host
) + ":" + str(self
.port
)
26 except Exception as e
: # TODO refine
27 raise MsgException(str(e
))
29 def write(self
, topic
, msg
, key
):
32 self
.loop
.run_until_complete(self
.aiowrite(key
, msg
=yaml
.safe_dump(msg
, default_flow_style
=True), topic
=topic
))
34 except Exception as e
:
35 raise MsgException("Error writing {} topic: {}".format(topic
, str(e
)))
37 def read(self
, topic
):
38 self
.topic_lst
.append(topic
)
40 return self
.loop
.run_until_complete(self
.aioread(self
.topic_lst
))
41 except Exception as e
:
42 raise MsgException("Error reading {} topic: {}".format(topic
, str(e
)))
44 async def aiowrite(self
, key
, msg
, topic
):
46 self
.producer
= AIOKafkaProducer(loop
=self
.loop
, key_serializer
=str.encode
, value_serializer
=str.encode
,
47 bootstrap_servers
=self
.broker
)
48 await self
.producer
.start()
49 await self
.producer
.send(topic
=topic
, key
=key
, value
=msg
)
50 except Exception as e
:
51 raise MsgException("Error publishing to {} topic: {}".format(topic
, str(e
)))
53 await self
.producer
.stop()
55 async def aioread(self
, topic
):
56 self
.consumer
= AIOKafkaConsumer(loop
=self
.loop
, bootstrap_servers
=self
.broker
)
57 await self
.consumer
.start()
58 self
.consumer
.subscribe(topic
)
60 async for message
in self
.consumer
:
61 return yaml
.load(message
.key
), yaml
.load(message
.value
)
62 except KafkaError
as e
:
63 raise MsgException(str(e
))
65 await self
.consumer
.stop()