4 from aiokafka
import AIOKafkaConsumer
5 from aiokafka
import AIOKafkaProducer
6 from aiokafka
.errors
import KafkaError
7 from osm_common
.msgbase
import MsgBase
, MsgException
10 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
11 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
14 class MsgKafka(MsgBase
):
15 def __init__(self
, logger_name
='msg'):
16 self
.logger
= logging
.getLogger(logger_name
)
25 def connect(self
, config
):
27 if "logger_name" in config
:
28 self
.logger
= logging
.getLogger(config
["logger_name"])
29 self
.host
= config
["host"]
30 self
.port
= config
["port"]
31 self
.loop
= asyncio
.get_event_loop()
32 self
.broker
= str(self
.host
) + ":" + str(self
.port
)
33 self
.group_id
= config
.get("group_id")
35 except Exception as e
: # TODO refine
36 raise MsgException(str(e
))
42 except Exception as e
: # TODO refine
43 raise MsgException(str(e
))
45 def write(self
, topic
, key
, msg
):
47 Write a message at kafka bus
48 :param topic: message topic, must be string
49 :param key: message key, must be string
50 :param msg: message content, can be string or dictionary
51 :return: None or raises MsgException on failing
54 self
.loop
.run_until_complete(self
.aiowrite(topic
=topic
, key
=key
, msg
=msg
))
56 except Exception as e
:
57 raise MsgException("Error writing {} topic: {}".format(topic
, str(e
)))
59 def read(self
, topic
):
61 Read from one or several topics.
62 :param topic: can be str: single topic; or str list: several topics
63 :return: topic, key, message; or None
66 return self
.loop
.run_until_complete(self
.aioread(topic
, self
.loop
))
69 except Exception as e
:
70 raise MsgException("Error reading {} topic: {}".format(topic
, str(e
)))
72 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
77 self
.producer
= AIOKafkaProducer(loop
=loop
, key_serializer
=str.encode
, value_serializer
=str.encode
,
78 bootstrap_servers
=self
.broker
)
79 await self
.producer
.start()
80 await self
.producer
.send(topic
=topic
, key
=key
, value
=yaml
.safe_dump(msg
, default_flow_style
=True))
81 except Exception as e
:
82 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic
, key
, e
))
84 await self
.producer
.stop()
86 async def aioread(self
, topic
, loop
=None, callback
=None, *args
):
88 Asyncio read from one or several topics. It blocks
89 :param topic: can be str: single topic; or str list: several topics
90 :param loop: asyncio loop
91 :callback: callback function that will handle the message in kafka bus
92 :*args: optional arguments for callback function
93 :return: topic, key, message
99 if isinstance(topic
, (list, tuple)):
102 topic_list
= (topic
,)
104 self
.consumer
= AIOKafkaConsumer(loop
=loop
, bootstrap_servers
=self
.broker
, group_id
=self
.group_id
)
105 await self
.consumer
.start()
106 self
.consumer
.subscribe(topic_list
)
108 async for message
in self
.consumer
:
110 callback(message
.topic
, yaml
.load(message
.key
), yaml
.load(message
.value
), *args
)
112 return message
.topic
, yaml
.load(message
.key
), yaml
.load(message
.value
)
113 except KafkaError
as e
:
114 raise MsgException(str(e
))
116 await self
.consumer
.stop()