blob: 516251cfbd9ef4b85db0084168aa4a7a27d79e96 [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
tierno5c012612018-04-19 16:01:59 +020016import asyncio
aticig3dd0db62022-03-04 19:35:45 +030017import logging
18
tierno5c012612018-04-19 16:01:59 +020019from aiokafka import AIOKafkaConsumer
20from aiokafka import AIOKafkaProducer
21from aiokafka.errors import KafkaError
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
aticig3dd0db62022-03-04 19:35:45 +030023import yaml
tierno5c012612018-04-19 16:01:59 +020024
garciadeblas2644b762021-03-24 09:21:01 +010025__author__ = (
26 "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, "
27 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
28)
tierno3054f782018-04-25 16:59:53 +020029
30
tierno5c012612018-04-19 16:01:59 +020031class MsgKafka(MsgBase):
garciadeblas2644b762021-03-24 09:21:01 +010032 def __init__(self, logger_name="msg", lock=False):
tierno1e9a3292018-11-05 18:18:45 +010033 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020034 self.host = None
35 self.port = None
36 self.consumer = None
37 self.producer = None
tierno5c012612018-04-19 16:01:59 +020038 self.broker = None
tierno73da4fa2018-08-31 13:50:59 +000039 self.group_id = None
tierno5c012612018-04-19 16:01:59 +020040
41 def connect(self, config):
42 try:
43 if "logger_name" in config:
44 self.logger = logging.getLogger(config["logger_name"])
45 self.host = config["host"]
46 self.port = config["port"]
tierno5c012612018-04-19 16:01:59 +020047 self.broker = str(self.host) + ":" + str(self.port)
tierno73da4fa2018-08-31 13:50:59 +000048 self.group_id = config.get("group_id")
tierno5c012612018-04-19 16:01:59 +020049
50 except Exception as e: # TODO refine
51 raise MsgException(str(e))
52
53 def disconnect(self):
54 try:
tiernoebbf3532018-05-03 17:49:37 +020055 pass
tierno5c012612018-04-19 16:01:59 +020056 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def write(self, topic, key, msg):
tierno86577992018-05-10 16:51:17 +020060 """
61 Write a message at kafka bus
62 :param topic: message topic, must be string
63 :param key: message key, must be string
64 :param msg: message content, can be string or dictionary
65 :return: None or raises MsgException on failing
66 """
garciadeblas2644b762021-03-24 09:21:01 +010067 retry = 2 # Try two times
delacruzramo562435a2019-12-10 12:06:01 +010068 while retry:
69 try:
Gulsum Aticia06b8542023-05-09 13:42:13 +030070 asyncio.run(self.aiowrite(topic=topic, key=key, msg=msg))
delacruzramo562435a2019-12-10 12:06:01 +010071 break
72 except Exception as e:
73 retry -= 1
74 if retry == 0:
garciadeblas2644b762021-03-24 09:21:01 +010075 raise MsgException(
76 "Error writing {} topic: {}".format(topic, str(e))
77 )
tierno5c012612018-04-19 16:01:59 +020078
79 def read(self, topic):
80 """
tierno86577992018-05-10 16:51:17 +020081 Read from one or several topics.
tierno5c012612018-04-19 16:01:59 +020082 :param topic: can be str: single topic; or str list: several topics
83 :return: topic, key, message; or None
84 """
85 try:
Gulsum Aticia06b8542023-05-09 13:42:13 +030086 return asyncio.run(self.aioread(topic))
tierno5c012612018-04-19 16:01:59 +020087 except MsgException:
88 raise
89 except Exception as e:
90 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
91
Gulsum Aticia06b8542023-05-09 13:42:13 +030092 async def aiowrite(self, topic, key, msg):
tierno05ede8f2019-01-28 16:20:18 +000093 """
94 Asyncio write
95 :param topic: str kafka topic
96 :param key: str kafka key
97 :param msg: str or dictionary kafka message
tierno05ede8f2019-01-28 16:20:18 +000098 :return: None
99 """
tierno5c012612018-04-19 16:01:59 +0200100 try:
garciadeblas2644b762021-03-24 09:21:01 +0100101 self.producer = AIOKafkaProducer(
garciadeblas2644b762021-03-24 09:21:01 +0100102 key_serializer=str.encode,
103 value_serializer=str.encode,
104 bootstrap_servers=self.broker,
105 )
tierno5c012612018-04-19 16:01:59 +0200106 await self.producer.start()
garciadeblas2644b762021-03-24 09:21:01 +0100107 await self.producer.send(
108 topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)
109 )
tierno5c012612018-04-19 16:01:59 +0200110 except Exception as e:
garciadeblas2644b762021-03-24 09:21:01 +0100111 raise MsgException(
112 "Error publishing topic '{}', key '{}': {}".format(topic, key, e)
113 )
tierno5c012612018-04-19 16:01:59 +0200114 finally:
115 await self.producer.stop()
116
garciadeblas2644b762021-03-24 09:21:01 +0100117 async def aioread(
118 self,
119 topic,
garciadeblas2644b762021-03-24 09:21:01 +0100120 callback=None,
121 aiocallback=None,
122 group_id=None,
123 from_beginning=None,
garciadeblas7b3ae042025-12-09 19:19:44 +0100124 **kwargs,
garciadeblas2644b762021-03-24 09:21:01 +0100125 ):
tierno5c012612018-04-19 16:01:59 +0200126 """
tierno05ede8f2019-01-28 16:20:18 +0000127 Asyncio read from one or several topics.
tierno5c012612018-04-19 16:01:59 +0200128 :param topic: can be str: single topic; or str list: several topics
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300129 :param callback: synchronous callback function that will handle the message in kafka bus
130 :param aiocallback: async callback function that will handle the message in kafka bus
tierno10602af2019-02-18 14:53:54 +0000131 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
132 at connect inside config), or a group_id string
tierno41ca4d02020-07-16 11:22:12 +0000133 :param from_beginning: if True, messages will be obtained from beginning instead of only new ones.
134 If group_id is supplied, only the not processed messages by other worker are obtained.
135 If group_id is None, all messages stored at kafka are obtained.
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300136 :param kwargs: optional keyword arguments for callback function
137 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200138 """
tierno10602af2019-02-18 14:53:54 +0000139 if group_id is False:
140 group_id = None
141 elif group_id is None:
142 group_id = self.group_id
tierno5c012612018-04-19 16:01:59 +0200143 try:
144 if isinstance(topic, (list, tuple)):
145 topic_list = topic
146 else:
147 topic_list = (topic,)
garciadeblas2644b762021-03-24 09:21:01 +0100148 self.consumer = AIOKafkaConsumer(
garciadeblas2644b762021-03-24 09:21:01 +0100149 bootstrap_servers=self.broker,
150 group_id=group_id,
151 auto_offset_reset="earliest" if from_beginning else "latest",
152 )
tierno5c012612018-04-19 16:01:59 +0200153 await self.consumer.start()
154 self.consumer.subscribe(topic_list)
155
156 async for message in self.consumer:
157 if callback:
garciadeblas2644b762021-03-24 09:21:01 +0100158 callback(
159 message.topic,
160 yaml.safe_load(message.key),
161 yaml.safe_load(message.value),
garciadeblas7b3ae042025-12-09 19:19:44 +0100162 **kwargs,
garciadeblas2644b762021-03-24 09:21:01 +0100163 )
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300164 elif aiocallback:
garciadeblas2644b762021-03-24 09:21:01 +0100165 await aiocallback(
166 message.topic,
167 yaml.safe_load(message.key),
168 yaml.safe_load(message.value),
garciadeblas7b3ae042025-12-09 19:19:44 +0100169 **kwargs,
garciadeblas2644b762021-03-24 09:21:01 +0100170 )
tierno5c012612018-04-19 16:01:59 +0200171 else:
garciadeblas2644b762021-03-24 09:21:01 +0100172 return (
173 message.topic,
174 yaml.safe_load(message.key),
175 yaml.safe_load(message.value),
176 )
tierno5c012612018-04-19 16:01:59 +0200177 except KafkaError as e:
178 raise MsgException(str(e))
179 finally:
180 await self.consumer.stop()