blob: bc9147d415111ab3fa3f891dd16c565208d4b06c [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
tierno5c012612018-04-19 16:01:59 +020016import logging
17import asyncio
18import yaml
19from aiokafka import AIOKafkaConsumer
20from aiokafka import AIOKafkaProducer
21from aiokafka.errors import KafkaError
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
tierno5c012612018-04-19 16:01:59 +020023
24__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
25 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
tierno3054f782018-04-25 16:59:53 +020026
27
tierno5c012612018-04-19 16:01:59 +020028class MsgKafka(MsgBase):
tierno1e9a3292018-11-05 18:18:45 +010029 def __init__(self, logger_name='msg', lock=False):
30 super().__init__(logger_name, lock)
tierno5c012612018-04-19 16:01:59 +020031 self.host = None
32 self.port = None
33 self.consumer = None
34 self.producer = None
35 self.loop = None
36 self.broker = None
tierno73da4fa2018-08-31 13:50:59 +000037 self.group_id = None
tierno5c012612018-04-19 16:01:59 +020038
39 def connect(self, config):
40 try:
41 if "logger_name" in config:
42 self.logger = logging.getLogger(config["logger_name"])
43 self.host = config["host"]
44 self.port = config["port"]
tierno05ede8f2019-01-28 16:20:18 +000045 self.loop = config.get("loop") or asyncio.get_event_loop()
tierno5c012612018-04-19 16:01:59 +020046 self.broker = str(self.host) + ":" + str(self.port)
tierno73da4fa2018-08-31 13:50:59 +000047 self.group_id = config.get("group_id")
tierno5c012612018-04-19 16:01:59 +020048
49 except Exception as e: # TODO refine
50 raise MsgException(str(e))
51
52 def disconnect(self):
53 try:
tiernoebbf3532018-05-03 17:49:37 +020054 pass
55 # self.loop.close()
tierno5c012612018-04-19 16:01:59 +020056 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def write(self, topic, key, msg):
tierno86577992018-05-10 16:51:17 +020060 """
61 Write a message at kafka bus
62 :param topic: message topic, must be string
63 :param key: message key, must be string
64 :param msg: message content, can be string or dictionary
65 :return: None or raises MsgException on failing
66 """
tierno5c012612018-04-19 16:01:59 +020067 try:
tierno86577992018-05-10 16:51:17 +020068 self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
tierno5c012612018-04-19 16:01:59 +020069
70 except Exception as e:
71 raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
72
73 def read(self, topic):
74 """
tierno86577992018-05-10 16:51:17 +020075 Read from one or several topics.
tierno5c012612018-04-19 16:01:59 +020076 :param topic: can be str: single topic; or str list: several topics
77 :return: topic, key, message; or None
78 """
79 try:
80 return self.loop.run_until_complete(self.aioread(topic, self.loop))
81 except MsgException:
82 raise
83 except Exception as e:
84 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
85
86 async def aiowrite(self, topic, key, msg, loop=None):
tierno05ede8f2019-01-28 16:20:18 +000087 """
88 Asyncio write
89 :param topic: str kafka topic
90 :param key: str kafka key
91 :param msg: str or dictionary kafka message
92 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
93 :return: None
94 """
tierno5c012612018-04-19 16:01:59 +020095
96 if not loop:
97 loop = self.loop
98 try:
99 self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
100 bootstrap_servers=self.broker)
101 await self.producer.start()
tierno3054f782018-04-25 16:59:53 +0200102 await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
tierno5c012612018-04-19 16:01:59 +0200103 except Exception as e:
tierno3054f782018-04-25 16:59:53 +0200104 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
tierno5c012612018-04-19 16:01:59 +0200105 finally:
106 await self.producer.stop()
107
tierno10602af2019-02-18 14:53:54 +0000108 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs):
tierno5c012612018-04-19 16:01:59 +0200109 """
tierno05ede8f2019-01-28 16:20:18 +0000110 Asyncio read from one or several topics.
tierno5c012612018-04-19 16:01:59 +0200111 :param topic: can be str: single topic; or str list: several topics
tierno05ede8f2019-01-28 16:20:18 +0000112 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300113 :param callback: synchronous callback function that will handle the message in kafka bus
114 :param aiocallback: async callback function that will handle the message in kafka bus
tierno10602af2019-02-18 14:53:54 +0000115 :param group_id: kafka group_id to use. Can be False (set group_id to None), None (use general group_id provided
116 at connect inside config), or a group_id string
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300117 :param kwargs: optional keyword arguments for callback function
118 :return: If no callback defined, it returns (topic, key, message)
tierno5c012612018-04-19 16:01:59 +0200119 """
120
121 if not loop:
122 loop = self.loop
tierno10602af2019-02-18 14:53:54 +0000123 if group_id is False:
124 group_id = None
125 elif group_id is None:
126 group_id = self.group_id
tierno5c012612018-04-19 16:01:59 +0200127 try:
128 if isinstance(topic, (list, tuple)):
129 topic_list = topic
130 else:
131 topic_list = (topic,)
132
tierno10602af2019-02-18 14:53:54 +0000133 self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=group_id)
tierno5c012612018-04-19 16:01:59 +0200134 await self.consumer.start()
135 self.consumer.subscribe(topic_list)
136
137 async for message in self.consumer:
138 if callback:
Benjamin Diaz48b78e12018-10-18 17:55:12 -0300139 callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
140 elif aiocallback:
141 await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
tierno5c012612018-04-19 16:01:59 +0200142 else:
143 return message.topic, yaml.load(message.key), yaml.load(message.value)
144 except KafkaError as e:
145 raise MsgException(str(e))
146 finally:
147 await self.consumer.stop()