1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
19 from aiokafka
import AIOKafkaConsumer
20 from aiokafka
import AIOKafkaProducer
21 from aiokafka
.errors
import KafkaError
22 from osm_common
.msgbase
import MsgBase
, MsgException
24 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
25 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
28 class MsgKafka(MsgBase
):
29 def __init__(self
, logger_name
='msg', lock
=False):
30 super().__init
__(logger_name
, lock
)
39 def connect(self
, config
):
41 if "logger_name" in config
:
42 self
.logger
= logging
.getLogger(config
["logger_name"])
43 self
.host
= config
["host"]
44 self
.port
= config
["port"]
45 self
.loop
= config
.get("loop") or asyncio
.get_event_loop()
46 self
.broker
= str(self
.host
) + ":" + str(self
.port
)
47 self
.group_id
= config
.get("group_id")
49 except Exception as e
: # TODO refine
50 raise MsgException(str(e
))
56 except Exception as e
: # TODO refine
57 raise MsgException(str(e
))
59 def write(self
, topic
, key
, msg
):
61 Write a message at kafka bus
62 :param topic: message topic, must be string
63 :param key: message key, must be string
64 :param msg: message content, can be string or dictionary
65 :return: None or raises MsgException on failing
67 retry
= 2 # Try two times
70 self
.loop
.run_until_complete(self
.aiowrite(topic
=topic
, key
=key
, msg
=msg
))
72 except Exception as e
:
75 raise MsgException("Error writing {} topic: {}".format(topic
, str(e
)))
77 def read(self
, topic
):
79 Read from one or several topics.
80 :param topic: can be str: single topic; or str list: several topics
81 :return: topic, key, message; or None
84 return self
.loop
.run_until_complete(self
.aioread(topic
, self
.loop
))
87 except Exception as e
:
88 raise MsgException("Error reading {} topic: {}".format(topic
, str(e
)))
90 async def aiowrite(self
, topic
, key
, msg
, loop
=None):
93 :param topic: str kafka topic
94 :param key: str kafka key
95 :param msg: str or dictionary kafka message
96 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
103 self
.producer
= AIOKafkaProducer(loop
=loop
, key_serializer
=str.encode
, value_serializer
=str.encode
,
104 bootstrap_servers
=self
.broker
)
105 await self
.producer
.start()
106 await self
.producer
.send(topic
=topic
, key
=key
, value
=yaml
.safe_dump(msg
, default_flow_style
=True))
107 except Exception as e
:
108 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic
, key
, e
))
110 await self
.producer
.stop()
112 async def aioread(self
, topic
, loop
=None, callback
=None, aiocallback
=None, group_id
=None, **kwargs
):
114 Asyncio read from one or several topics.
115 :param topic: can be str: single topic; or str list: several topics
116 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
117 :param callback: synchronous callback function that will handle the message in kafka bus
118 :param aiocallback: async callback function that will handle the message in kafka bus
119 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
120 at connect inside config), or a group_id string
121 :param kwargs: optional keyword arguments for callback function
122 :return: If no callback defined, it returns (topic, key, message)
127 if group_id
is False:
129 elif group_id
is None:
130 group_id
= self
.group_id
132 if isinstance(topic
, (list, tuple)):
135 topic_list
= (topic
,)
137 self
.consumer
= AIOKafkaConsumer(loop
=loop
, bootstrap_servers
=self
.broker
, group_id
=group_id
)
138 await self
.consumer
.start()
139 self
.consumer
.subscribe(topic_list
)
141 async for message
in self
.consumer
:
143 callback(message
.topic
, yaml
.safe_load(message
.key
), yaml
.safe_load(message
.value
), **kwargs
)
145 await aiocallback(message
.topic
, yaml
.safe_load(message
.key
), yaml
.safe_load(message
.value
),
148 return message
.topic
, yaml
.safe_load(message
.key
), yaml
.safe_load(message
.value
)
149 except KafkaError
as e
:
150 raise MsgException(str(e
))
152 await self
.consumer
.stop()