blob: 5caa5b1becb86a78142a4ac50d53596aa64e630e [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
tierno5c012612018-04-19 16:01:59 +020016import logging
17import asyncio
18import yaml
19from aiokafka import AIOKafkaConsumer
20from aiokafka import AIOKafkaProducer
21from aiokafka.errors import KafkaError
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +020023
garciadeblas2644b762021-03-24 09:21:01 +010024__author__ = (
25 "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, "
26 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
27)
tierno3054f782018-04-25 16:59:53 +020028
29
tierno5c012612018-04-19 16:01:59 +020030class MsgKafka(MsgBase):
garciadeblas2644b762021-03-24 09:21:01 +010031 def __init__(self, logger_name="msg", lock=False):
tierno1e9a3292018-11-05 18:18:45 +010032 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020033 self.host = None
34 self.port = None
35 self.consumer = None
36 self.producer = None
37 self.loop = None
38 self.broker = None
tierno73da4fa2018-08-31 13:50:59 +000039 self.group_id = None
tierno5c012612018-04-19 16:01:59 +020040
41 def connect(self, config):
42 try:
43 if "logger_name" in config:
44 self.logger = logging.getLogger(config["logger_name"])
45 self.host = config["host"]
46 self.port = config["port"]
tierno05ede8f2019-01-28 16:20:18 +000047 self.loop = config.get("loop") or asyncio.get_event_loop()
tierno5c012612018-04-19 16:01:59 +020048 self.broker = str(self.host) + ":" + str(self.port)
tierno73da4fa2018-08-31 13:50:59 +000049 self.group_id = config.get("group_id")
tierno5c012612018-04-19 16:01:59 +020050
51 except Exception as e: # TODO refine
52 raise MsgException(str(e))
53
54 def disconnect(self):
55 try:
tiernoebbf3532018-05-03 17:49:37 +020056 pass
57 # self.loop.close()
tierno5c012612018-04-19 16:01:59 +020058 except Exception as e: # TODO refine
59 raise MsgException(str(e))
60
61 def write(self, topic, key, msg):
tierno86577992018-05-10 16:51:17 +020062 """
63 Write a message at kafka bus
64 :param topic: message topic, must be string
65 :param key: message key, must be string
66 :param msg: message content, can be string or dictionary
67 :return: None or raises MsgException on failing
68 """
garciadeblas2644b762021-03-24 09:21:01 +010069 retry = 2 # Try two times
delacruzramo562435a2019-12-10 12:06:01 +010070 while retry:
71 try:
garciadeblas2644b762021-03-24 09:21:01 +010072 self.loop.run_until_complete(
73 self.aiowrite(topic=topic, key=key, msg=msg)
74 )
delacruzramo562435a2019-12-10 12:06:01 +010075 break
76 except Exception as e:
77 retry -= 1
78 if retry == 0:
garciadeblas2644b762021-03-24 09:21:01 +010079 raise MsgException(
80 "Error writing {} topic: {}".format(topic, str(e))
81 )
tierno5c012612018-04-19 16:01:59 +020082
83 def read(self, topic):
84 """
tierno86577992018-05-10 16:51:17 +020085 Read from one or several topics.
tierno5c012612018-04-19 16:01:59 +020086 :param topic: can be str: single topic; or str list: several topics
87 :return: topic, key, message; or None
88 """
89 try:
90 return self.loop.run_until_complete(self.aioread(topic, self.loop))
91 except MsgException:
92 raise
93 except Exception as e:
94 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
95
96 async def aiowrite(self, topic, key, msg, loop=None):
tierno05ede8f2019-01-28 16:20:18 +000097 """
98 Asyncio write
99 :param topic: str kafka topic
100 :param key: str kafka key
101 :param msg: str or dictionary kafka message
102 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
103 :return: None
104 """
tierno5c012612018-04-19 16:01:59 +0200105
106 if not loop:
107 loop = self.loop
108 try:
garciadeblas2644b762021-03-24 09:21:01 +0100109 self.producer = AIOKafkaProducer(
110 loop=loop,
111 key_serializer=str.encode,
112 value_serializer=str.encode,
113 bootstrap_servers=self.broker,
114 )
tierno5c012612018-04-19 16:01:59 +0200115 await self.producer.start()
garciadeblas2644b762021-03-24 09:21:01 +0100116 await self.producer.send(
117 topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)
118 )
tierno5c012612018-04-19 16:01:59 +0200119 except Exception as e:
garciadeblas2644b762021-03-24 09:21:01 +0100120 raise MsgException(
121 "Error publishing topic '{}', key '{}': {}".format(topic, key, e)
122 )
tierno5c012612018-04-19 16:01:59 +0200123 finally:
124 await self.producer.stop()
125
garciadeblas2644b762021-03-24 09:21:01 +0100126 async def aioread(
127 self,
128 topic,
129 loop=None,
130 callback=None,
131 aiocallback=None,
132 group_id=None,
133 from_beginning=None,
134 **kwargs
135 ):
tierno5c012612018-04-19 16:01:59 +0200136 """
tierno05ede8f2019-01-28 16:20:18 +0000137 Asyncio read from one or several topics.
tierno5c012612018-04-19 16:01:59 +0200138 :param topic: can be str: single topic; or str list: several topics
tierno05ede8f2019-01-28 16:20:18 +0000139 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300140 :param callback: synchronous callback function that will handle the message in kafka bus
141 :param aiocallback: async callback function that will handle the message in kafka bus
tierno10602af2019-02-18 14:53:54 +0000142 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
143 at connect inside config), or a group_id string
tierno41ca4d02020-07-16 11:22:12 +0000144 :param from_beginning: if True, messages will be obtained from beginning instead of only new ones.
145 If group_id is supplied, only the not processed messages by other worker are obtained.
146 If group_id is None, all messages stored at kafka are obtained.
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300147 :param kwargs: optional keyword arguments for callback function
148 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200149 """
150
151 if not loop:
152 loop = self.loop
tierno10602af2019-02-18 14:53:54 +0000153 if group_id is False:
154 group_id = None
155 elif group_id is None:
156 group_id = self.group_id
tierno5c012612018-04-19 16:01:59 +0200157 try:
158 if isinstance(topic, (list, tuple)):
159 topic_list = topic
160 else:
161 topic_list = (topic,)
garciadeblas2644b762021-03-24 09:21:01 +0100162 self.consumer = AIOKafkaConsumer(
163 loop=loop,
164 bootstrap_servers=self.broker,
165 group_id=group_id,
166 auto_offset_reset="earliest" if from_beginning else "latest",
167 )
tierno5c012612018-04-19 16:01:59 +0200168 await self.consumer.start()
169 self.consumer.subscribe(topic_list)
170
171 async for message in self.consumer:
172 if callback:
garciadeblas2644b762021-03-24 09:21:01 +0100173 callback(
174 message.topic,
175 yaml.safe_load(message.key),
176 yaml.safe_load(message.value),
177 **kwargs
178 )
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300179 elif aiocallback:
garciadeblas2644b762021-03-24 09:21:01 +0100180 await aiocallback(
181 message.topic,
182 yaml.safe_load(message.key),
183 yaml.safe_load(message.value),
184 **kwargs
185 )
tierno5c012612018-04-19 16:01:59 +0200186 else:
garciadeblas2644b762021-03-24 09:21:01 +0100187 return (
188 message.topic,
189 yaml.safe_load(message.key),
190 yaml.safe_load(message.value),
191 )
tierno5c012612018-04-19 16:01:59 +0200192 except KafkaError as e:
193 raise MsgException(str(e))
194 finally:
195 await self.consumer.stop()