# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
import asyncio
-import yaml
+import logging
+
from aiokafka import AIOKafkaConsumer
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
from osm_common.msgbase import MsgBase, MsgException
+import yaml
__author__ = (
"Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, "
self.port = None
self.consumer = None
self.producer = None
- self.loop = None
self.broker = None
self.group_id = None
self.logger = logging.getLogger(config["logger_name"])
self.host = config["host"]
self.port = config["port"]
- self.loop = config.get("loop") or asyncio.get_event_loop()
self.broker = str(self.host) + ":" + str(self.port)
self.group_id = config.get("group_id")
def disconnect(self):
try:
pass
- # self.loop.close()
except Exception as e: # TODO refine
raise MsgException(str(e))
retry = 2 # Try two times
while retry:
try:
- self.loop.run_until_complete(
- self.aiowrite(topic=topic, key=key, msg=msg)
- )
+ asyncio.run(self.aiowrite(topic=topic, key=key, msg=msg))
break
except Exception as e:
retry -= 1
:return: topic, key, message; or None
"""
try:
- return self.loop.run_until_complete(self.aioread(topic, self.loop))
+ return asyncio.run(self.aioread(topic))
except MsgException:
raise
except Exception as e:
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
- async def aiowrite(self, topic, key, msg, loop=None):
+ async def aiowrite(self, topic, key, msg):
"""
Asyncio write
:param topic: str kafka topic
:param key: str kafka key
:param msg: str or dictionary kafka message
- :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
:return: None
"""
-
- if not loop:
- loop = self.loop
try:
self.producer = AIOKafkaProducer(
- loop=loop,
key_serializer=str.encode,
value_serializer=str.encode,
bootstrap_servers=self.broker,
async def aioread(
self,
topic,
- loop=None,
callback=None,
aiocallback=None,
group_id=None,
"""
Asyncio read from one or several topics.
:param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
:param callback: synchronous callback function that will handle the message in kafka bus
:param aiocallback: async callback function that will handle the message in kafka bus
:param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
:param kwargs: optional keyword arguments for callback function
:return: If no callback defined, it returns (topic, key, message)
"""
-
- if not loop:
- loop = self.loop
if group_id is False:
group_id = None
elif group_id is None:
else:
topic_list = (topic,)
self.consumer = AIOKafkaConsumer(
- loop=loop,
bootstrap_servers=self.broker,
group_id=group_id,
auto_offset_reset="earliest" if from_beginning else "latest",