fix 1131 Allow to get old kafka messages
[osm/common.git] / osm_common / msgkafka.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 import asyncio
18 import yaml
19 from aiokafka import AIOKafkaConsumer
20 from aiokafka import AIOKafkaProducer
21 from aiokafka.errors import KafkaError
22 from osm_common.msgbase import MsgBase, MsgException
23
24 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
25 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
26
27
28 class MsgKafka(MsgBase):
29 def __init__(self, logger_name='msg', lock=False):
30 super().__init__(logger_name, lock)
31 self.host = None
32 self.port = None
33 self.consumer = None
34 self.producer = None
35 self.loop = None
36 self.broker = None
37 self.group_id = None
38
39 def connect(self, config):
40 try:
41 if "logger_name" in config:
42 self.logger = logging.getLogger(config["logger_name"])
43 self.host = config["host"]
44 self.port = config["port"]
45 self.loop = config.get("loop") or asyncio.get_event_loop()
46 self.broker = str(self.host) + ":" + str(self.port)
47 self.group_id = config.get("group_id")
48
49 except Exception as e: # TODO refine
50 raise MsgException(str(e))
51
52 def disconnect(self):
53 try:
54 pass
55 # self.loop.close()
56 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def write(self, topic, key, msg):
60 """
61 Write a message at kafka bus
62 :param topic: message topic, must be string
63 :param key: message key, must be string
64 :param msg: message content, can be string or dictionary
65 :return: None or raises MsgException on failing
66 """
67 retry = 2 # Try two times
68 while retry:
69 try:
70 self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
71 break
72 except Exception as e:
73 retry -= 1
74 if retry == 0:
75 raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
76
77 def read(self, topic):
78 """
79 Read from one or several topics.
80 :param topic: can be str: single topic; or str list: several topics
81 :return: topic, key, message; or None
82 """
83 try:
84 return self.loop.run_until_complete(self.aioread(topic, self.loop))
85 except MsgException:
86 raise
87 except Exception as e:
88 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
89
90 async def aiowrite(self, topic, key, msg, loop=None):
91 """
92 Asyncio write
93 :param topic: str kafka topic
94 :param key: str kafka key
95 :param msg: str or dictionary kafka message
96 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
97 :return: None
98 """
99
100 if not loop:
101 loop = self.loop
102 try:
103 self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
104 bootstrap_servers=self.broker)
105 await self.producer.start()
106 await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
107 except Exception as e:
108 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
109 finally:
110 await self.producer.stop()
111
112 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, from_beginning=None,
113 **kwargs):
114 """
115 Asyncio read from one or several topics.
116 :param topic: can be str: single topic; or str list: several topics
117 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
118 :param callback: synchronous callback function that will handle the message in kafka bus
119 :param aiocallback: async callback function that will handle the message in kafka bus
120 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
121 at connect inside config), or a group_id string
122 :param from_beginning: if True, messages will be obtained from beginning instead of only new ones.
123 If group_id is supplied, only the not processed messages by other worker are obtained.
124 If group_id is None, all messages stored at kafka are obtained.
125 :param kwargs: optional keyword arguments for callback function
126 :return: If no callback defined, it returns (topic, key, message)
127 """
128
129 if not loop:
130 loop = self.loop
131 if group_id is False:
132 group_id = None
133 elif group_id is None:
134 group_id = self.group_id
135 try:
136 if isinstance(topic, (list, tuple)):
137 topic_list = topic
138 else:
139 topic_list = (topic,)
140 self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=group_id,
141 auto_offset_reset="earliest" if from_beginning else "latest")
142 await self.consumer.start()
143 self.consumer.subscribe(topic_list)
144
145 async for message in self.consumer:
146 if callback:
147 callback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value), **kwargs)
148 elif aiocallback:
149 await aiocallback(message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value),
150 **kwargs)
151 else:
152 return message.topic, yaml.safe_load(message.key), yaml.safe_load(message.value)
153 except KafkaError as e:
154 raise MsgException(str(e))
155 finally:
156 await self.consumer.stop()