Enable parallel execution and output of tox env
[osm/common.git] / osm_common / msgkafka.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 import asyncio
18 import yaml
19 from aiokafka import AIOKafkaConsumer
20 from aiokafka import AIOKafkaProducer
21 from aiokafka.errors import KafkaError
22 from osm_common.msgbase import MsgBase, MsgException
23
24 __author__ = (
25 "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, "
26 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
27 )
28
29
30 class MsgKafka(MsgBase):
31 def __init__(self, logger_name="msg", lock=False):
32 super().__init__(logger_name, lock)
33 self.host = None
34 self.port = None
35 self.consumer = None
36 self.producer = None
37 self.loop = None
38 self.broker = None
39 self.group_id = None
40
41 def connect(self, config):
42 try:
43 if "logger_name" in config:
44 self.logger = logging.getLogger(config["logger_name"])
45 self.host = config["host"]
46 self.port = config["port"]
47 self.loop = config.get("loop") or asyncio.get_event_loop()
48 self.broker = str(self.host) + ":" + str(self.port)
49 self.group_id = config.get("group_id")
50
51 except Exception as e: # TODO refine
52 raise MsgException(str(e))
53
54 def disconnect(self):
55 try:
56 pass
57 # self.loop.close()
58 except Exception as e: # TODO refine
59 raise MsgException(str(e))
60
61 def write(self, topic, key, msg):
62 """
63 Write a message at kafka bus
64 :param topic: message topic, must be string
65 :param key: message key, must be string
66 :param msg: message content, can be string or dictionary
67 :return: None or raises MsgException on failing
68 """
69 retry = 2 # Try two times
70 while retry:
71 try:
72 self.loop.run_until_complete(
73 self.aiowrite(topic=topic, key=key, msg=msg)
74 )
75 break
76 except Exception as e:
77 retry -= 1
78 if retry == 0:
79 raise MsgException(
80 "Error writing {} topic: {}".format(topic, str(e))
81 )
82
83 def read(self, topic):
84 """
85 Read from one or several topics.
86 :param topic: can be str: single topic; or str list: several topics
87 :return: topic, key, message; or None
88 """
89 try:
90 return self.loop.run_until_complete(self.aioread(topic, self.loop))
91 except MsgException:
92 raise
93 except Exception as e:
94 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
95
96 async def aiowrite(self, topic, key, msg, loop=None):
97 """
98 Asyncio write
99 :param topic: str kafka topic
100 :param key: str kafka key
101 :param msg: str or dictionary kafka message
102 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
103 :return: None
104 """
105
106 if not loop:
107 loop = self.loop
108 try:
109 self.producer = AIOKafkaProducer(
110 loop=loop,
111 key_serializer=str.encode,
112 value_serializer=str.encode,
113 bootstrap_servers=self.broker,
114 )
115 await self.producer.start()
116 await self.producer.send(
117 topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True)
118 )
119 except Exception as e:
120 raise MsgException(
121 "Error publishing topic '{}', key '{}': {}".format(topic, key, e)
122 )
123 finally:
124 await self.producer.stop()
125
126 async def aioread(
127 self,
128 topic,
129 loop=None,
130 callback=None,
131 aiocallback=None,
132 group_id=None,
133 from_beginning=None,
134 **kwargs
135 ):
136 """
137 Asyncio read from one or several topics.
138 :param topic: can be str: single topic; or str list: several topics
139 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
140 :param callback: synchronous callback function that will handle the message in kafka bus
141 :param aiocallback: async callback function that will handle the message in kafka bus
142 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
143 at connect inside config), or a group_id string
144 :param from_beginning: if True, messages will be obtained from beginning instead of only new ones.
145 If group_id is supplied, only the not processed messages by other worker are obtained.
146 If group_id is None, all messages stored at kafka are obtained.
147 :param kwargs: optional keyword arguments for callback function
148 :return: If no callback defined, it returns (topic, key, message)
149 """
150
151 if not loop:
152 loop = self.loop
153 if group_id is False:
154 group_id = None
155 elif group_id is None:
156 group_id = self.group_id
157 try:
158 if isinstance(topic, (list, tuple)):
159 topic_list = topic
160 else:
161 topic_list = (topic,)
162 self.consumer = AIOKafkaConsumer(
163 loop=loop,
164 bootstrap_servers=self.broker,
165 group_id=group_id,
166 auto_offset_reset="earliest" if from_beginning else "latest",
167 )
168 await self.consumer.start()
169 self.consumer.subscribe(topic_list)
170
171 async for message in self.consumer:
172 if callback:
173 callback(
174 message.topic,
175 yaml.safe_load(message.key),
176 yaml.safe_load(message.value),
177 **kwargs
178 )
179 elif aiocallback:
180 await aiocallback(
181 message.topic,
182 yaml.safe_load(message.key),
183 yaml.safe_load(message.value),
184 **kwargs
185 )
186 else:
187 return (
188 message.topic,
189 yaml.safe_load(message.key),
190 yaml.safe_load(message.value),
191 )
192 except KafkaError as e:
193 raise MsgException(str(e))
194 finally:
195 await self.consumer.stop()