blob: 5487093b3478c6afc4507de6060436715a68aadc [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
tierno5c012612018-04-19 16:01:59 +020016import asyncio
aticig3dd0db62022-03-04 19:35:45 +030017import logging
18
tierno5c012612018-04-19 16:01:59 +020019from aiokafka import AIOKafkaConsumer
20from aiokafka import AIOKafkaProducer
21from aiokafka.errors import KafkaError
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
aticig3dd0db62022-03-04 19:35:45 +030023import yaml
tierno5c012612018-04-19 16:01:59 +020024
garciadeblas2644b762021-03-24 09:21:01 +010025__author__ = (
26 "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, "
27 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
28)
tierno3054f782018-04-25 16:59:53 +020029
30
tierno5c012612018-04-19 16:01:59 +020031class MsgKafka(MsgBase):
garciadeblas2644b762021-03-24 09:21:01 +010032 def __init__(self, logger_name="msg", lock=False):
tierno1e9a3292018-11-05 18:18:45 +010033 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020034 self.host = None
35 self.port = None
36 self.consumer = None
37 self.producer = None
38 self.loop = None
39 self.broker = None
tierno73da4fa2018-08-31 13:50:59 +000040 self.group_id = None
tierno5c012612018-04-19 16:01:59 +020041
42 def connect(self, config):
43 try:
44 if "logger_name" in config:
45 self.logger = logging.getLogger(config["logger_name"])
46 self.host = config["host"]
47 self.port = config["port"]
tierno05ede8f2019-01-28 16:20:18 +000048 self.loop = config.get("loop") or asyncio.get_event_loop()
tierno5c012612018-04-19 16:01:59 +020049 self.broker = str(self.host) + ":" + str(self.port)
tierno73da4fa2018-08-31 13:50:59 +000050 self.group_id = config.get("group_id")
tierno5c012612018-04-19 16:01:59 +020051
52 except Exception as e: # TODO refine
53 raise MsgException(str(e))
54
55 def disconnect(self):
56 try:
tiernoebbf3532018-05-03 17:49:37 +020057 pass
58 # self.loop.close()
tierno5c012612018-04-19 16:01:59 +020059 except Exception as e: # TODO refine
60 raise MsgException(str(e))
61
62 def write(self, topic, key, msg):
tierno86577992018-05-10 16:51:17 +020063 """
64 Write a message at kafka bus
65 :param topic: message topic, must be string
66 :param key: message key, must be string
67 :param msg: message content, can be string or dictionary
68 :return: None or raises MsgException on failing
69 """
garciadeblas2644b762021-03-24 09:21:01 +010070 retry = 2 # Try two times
delacruzramo562435a2019-12-10 12:06:01 +010071 while retry:
72 try:
garciadeblas2644b762021-03-24 09:21:01 +010073 self.loop.run_until_complete(
74 self.aiowrite(topic=topic, key=key, msg=msg)
75 )
delacruzramo562435a2019-12-10 12:06:01 +010076 break
77 except Exception as e:
78 retry -= 1
79 if retry == 0:
garciadeblas2644b762021-03-24 09:21:01 +010080 raise MsgException(
81 "Error writing {} topic: {}".format(topic, str(e))
82 )
tierno5c012612018-04-19 16:01:59 +020083
84 def read(self, topic):
85 """
tierno86577992018-05-10 16:51:17 +020086 Read from one or several topics.
tierno5c012612018-04-19 16:01:59 +020087 :param topic: can be str: single topic; or str list: several topics
88 :return: topic, key, message; or None
89 """
90 try:
91 return self.loop.run_until_complete(self.aioread(topic, self.loop))
92 except MsgException:
93 raise
94 except Exception as e:
95 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
96
97 async def aiowrite(self, topic, key, msg, loop=None):
tierno05ede8f2019-01-28 16:20:18 +000098 """
99 Asyncio write
100 :param topic: str kafka topic
101 :param key: str kafka key
102 :param msg: str or dictionary kafka message
103 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
104 :return: None
105 """
tierno5c012612018-04-19 16:01:59 +0200106
107 if not loop:
108 loop = self.loop
109 try:
garciadeblas2644b762021-03-24 09:21:01 +0100110 self.producer = AIOKafkaProducer(
111 loop=loop,
112 key_serializer=str.encode,
113 value_serializer=str.encode,
114 bootstrap_servers=self.broker,
115 )
tierno5c012612018-04-19 16:01:59 +0200116 await self.producer.start()
garciadeblas2644b762021-03-24 09:21:01 +0100117 await self.producer.send(
118 topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)
119 )
tierno5c012612018-04-19 16:01:59 +0200120 except Exception as e:
garciadeblas2644b762021-03-24 09:21:01 +0100121 raise MsgException(
122 "Error publishing topic '{}', key '{}': {}".format(topic, key, e)
123 )
tierno5c012612018-04-19 16:01:59 +0200124 finally:
125 await self.producer.stop()
126
garciadeblas2644b762021-03-24 09:21:01 +0100127 async def aioread(
128 self,
129 topic,
130 loop=None,
131 callback=None,
132 aiocallback=None,
133 group_id=None,
134 from_beginning=None,
135 **kwargs
136 ):
tierno5c012612018-04-19 16:01:59 +0200137 """
tierno05ede8f2019-01-28 16:20:18 +0000138 Asyncio read from one or several topics.
tierno5c012612018-04-19 16:01:59 +0200139 :param topic: can be str: single topic; or str list: several topics
tierno05ede8f2019-01-28 16:20:18 +0000140 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300141 :param callback: synchronous callback function that will handle the message in kafka bus
142 :param aiocallback: async callback function that will handle the message in kafka bus
tierno10602af2019-02-18 14:53:54 +0000143 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
144 at connect inside config), or a group_id string
tierno41ca4d02020-07-16 11:22:12 +0000145 :param from_beginning: if True, messages will be obtained from beginning instead of only new ones.
146 If group_id is supplied, only the not processed messages by other worker are obtained.
147 If group_id is None, all messages stored at kafka are obtained.
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300148 :param kwargs: optional keyword arguments for callback function
149 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200150 """
151
152 if not loop:
153 loop = self.loop
tierno10602af2019-02-18 14:53:54 +0000154 if group_id is False:
155 group_id = None
156 elif group_id is None:
157 group_id = self.group_id
tierno5c012612018-04-19 16:01:59 +0200158 try:
159 if isinstance(topic, (list, tuple)):
160 topic_list = topic
161 else:
162 topic_list = (topic,)
garciadeblas2644b762021-03-24 09:21:01 +0100163 self.consumer = AIOKafkaConsumer(
164 loop=loop,
165 bootstrap_servers=self.broker,
166 group_id=group_id,
167 auto_offset_reset="earliest" if from_beginning else "latest",
168 )
tierno5c012612018-04-19 16:01:59 +0200169 await self.consumer.start()
170 self.consumer.subscribe(topic_list)
171
172 async for message in self.consumer:
173 if callback:
garciadeblas2644b762021-03-24 09:21:01 +0100174 callback(
175 message.topic,
176 yaml.safe_load(message.key),
177 yaml.safe_load(message.value),
178 **kwargs
179 )
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300180 elif aiocallback:
garciadeblas2644b762021-03-24 09:21:01 +0100181 await aiocallback(
182 message.topic,
183 yaml.safe_load(message.key),
184 yaml.safe_load(message.value),
185 **kwargs
186 )
tierno5c012612018-04-19 16:01:59 +0200187 else:
garciadeblas2644b762021-03-24 09:21:01 +0100188 return (
189 message.topic,
190 yaml.safe_load(message.key),
191 yaml.safe_load(message.value),
192 )
tierno5c012612018-04-19 16:01:59 +0200193 except KafkaError as e:
194 raise MsgException(str(e))
195 finally:
196 await self.consumer.stop()