blob: 4d020248c4f5cb9db5ae08db46a08ca5f0b4088b [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
tierno5c012612018-04-19 16:01:59 +020016import logging
17import asyncio
18import yaml
19from aiokafka import AIOKafkaConsumer
20from aiokafka import AIOKafkaProducer
21from aiokafka.errors import KafkaError
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +020023
24__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
25 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
tierno3054f782018-04-25 16:59:53 +020026
27
tierno5c012612018-04-19 16:01:59 +020028class MsgKafka(MsgBase):
tierno1e9a3292018-11-05 18:18:45 +010029 def __init__(self, logger_name='msg', lock=False):
30 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020031 self.host = None
32 self.port = None
33 self.consumer = None
34 self.producer = None
35 self.loop = None
36 self.broker = None
tierno73da4fa2018-08-31 13:50:59 +000037 self.group_id = None
tierno5c012612018-04-19 16:01:59 +020038
39 def connect(self, config):
40 try:
41 if "logger_name" in config:
42 self.logger = logging.getLogger(config["logger_name"])
43 self.host = config["host"]
44 self.port = config["port"]
tierno05ede8f2019-01-28 16:20:18 +000045 self.loop = config.get("loop") or asyncio.get_event_loop()
tierno5c012612018-04-19 16:01:59 +020046 self.broker = str(self.host) + ":" + str(self.port)
tierno73da4fa2018-08-31 13:50:59 +000047 self.group_id = config.get("group_id")
tierno5c012612018-04-19 16:01:59 +020048
49 except Exception as e: # TODO refine
50 raise MsgException(str(e))
51
52 def disconnect(self):
53 try:
tiernoebbf3532018-05-03 17:49:37 +020054 pass
55 # self.loop.close()
tierno5c012612018-04-19 16:01:59 +020056 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def write(self, topic, key, msg):
tierno86577992018-05-10 16:51:17 +020060 """
61 Write a message at kafka bus
62 :param topic: message topic, must be string
63 :param key: message key, must be string
64 :param msg: message content, can be string or dictionary
65 :return: None or raises MsgException on failing
66 """
delacruzramo562435a2019-12-10 12:06:01 +010067 retry = 2 # Try two times
68 while retry:
69 try:
70 self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
71 break
72 except Exception as e:
73 retry -= 1
74 if retry == 0:
75 raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
tierno5c012612018-04-19 16:01:59 +020076
77 def read(self, topic):
78 """
tierno86577992018-05-10 16:51:17 +020079 Read from one or several topics.
tierno5c012612018-04-19 16:01:59 +020080 :param topic: can be str: single topic; or str list: several topics
81 :return: topic, key, message; or None
82 """
83 try:
84 return self.loop.run_until_complete(self.aioread(topic, self.loop))
85 except MsgException:
86 raise
87 except Exception as e:
88 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
89
90 async def aiowrite(self, topic, key, msg, loop=None):
tierno05ede8f2019-01-28 16:20:18 +000091 """
92 Asyncio write
93 :param topic: str kafka topic
94 :param key: str kafka key
95 :param msg: str or dictionary kafka message
96 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
97 :return: None
98 """
tierno5c012612018-04-19 16:01:59 +020099
100 if not loop:
101 loop = self.loop
102 try:
103 self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
104 bootstrap_servers=self.broker)
105 await self.producer.start()
tierno3054f782018-04-25 16:59:53 +0200106 await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
tierno5c012612018-04-19 16:01:59 +0200107 except Exception as e:
tierno3054f782018-04-25 16:59:53 +0200108 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
tierno5c012612018-04-19 16:01:59 +0200109 finally:
110 await self.producer.stop()
111
tierno41ca4d02020-07-16 11:22:12 +0000112 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, from_beginning=None,
113 **kwargs):
tierno5c012612018-04-19 16:01:59 +0200114 """
tierno05ede8f2019-01-28 16:20:18 +0000115 Asyncio read from one or several topics.
tierno5c012612018-04-19 16:01:59 +0200116 :param topic: can be str: single topic; or str list: several topics
tierno05ede8f2019-01-28 16:20:18 +0000117 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300118 :param callback: synchronous callback function that will handle the message in kafka bus
119 :param aiocallback: async callback function that will handle the message in kafka bus
tierno10602af2019-02-18 14:53:54 +0000120 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
121 at connect inside config), or a group_id string
tierno41ca4d02020-07-16 11:22:12 +0000122 :param from_beginning: if True, messages will be obtained from beginning instead of only new ones.
123 If group_id is supplied, only the not processed messages by other worker are obtained.
124 If group_id is None, all messages stored at kafka are obtained.
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300125 :param kwargs: optional keyword arguments for callback function
126 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200127 """
128
129 if not loop:
130 loop = self.loop
tierno10602af2019-02-18 14:53:54 +0000131 if group_id is False:
132 group_id = None
133 elif group_id is None:
134 group_id = self.group_id
tierno5c012612018-04-19 16:01:59 +0200135 try:
136 if isinstance(topic, (list, tuple)):
137 topic_list = topic
138 else:
139 topic_list = (topic,)
tierno41ca4d02020-07-16 11:22:12 +0000140 self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=group_id,
141 auto_offset_reset="earliest" if from_beginning else "latest")
tierno5c012612018-04-19 16:01:59 +0200142 await self.consumer.start()
143 self.consumer.subscribe(topic_list)
144
145 async for message in self.consumer:
146 if callback:
tierno6472e2b2019-09-02 16:04:16 +0000147 callback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value), **kwargs)
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300148 elif aiocallback:
tierno6472e2b2019-09-02 16:04:16 +0000149 await aiocallback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value),
150 **kwargs)
tierno5c012612018-04-19 16:01:59 +0200151 else:
tierno6472e2b2019-09-02 16:04:16 +0000152 return message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value)
tierno5c012612018-04-19 16:01:59 +0200153 except KafkaError as e:
154 raise MsgException(str(e))
155 finally:
156 await self.consumer.stop()