blob: aa756c49589a42090f375121c936470bb998e88b [file] [log] [blame]
tierno87858ca2018-10-08 16:30:15 +02001# -*- coding: utf-8 -*-
2
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
tierno5c012612018-04-19 16:01:59 +020016import logging
17import asyncio
18import yaml
19from aiokafka import AIOKafkaConsumer
20from aiokafka import AIOKafkaProducer
21from aiokafka.errors import KafkaError
tierno3054f782018-04-25 16:59:53 +020022from osm_common.msgbase import MsgBase, MsgException
23# import json
tierno5c012612018-04-19 16:01:59 +020024
25__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
26 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
tierno3054f782018-04-25 16:59:53 +020027
28
tierno5c012612018-04-19 16:01:59 +020029class MsgKafka(MsgBase):
30 def __init__(self, logger_name='msg'):
31 self.logger = logging.getLogger(logger_name)
32 self.host = None
33 self.port = None
34 self.consumer = None
35 self.producer = None
36 self.loop = None
37 self.broker = None
tierno73da4fa2018-08-31 13:50:59 +000038 self.group_id = None
tierno5c012612018-04-19 16:01:59 +020039
40 def connect(self, config):
41 try:
42 if "logger_name" in config:
43 self.logger = logging.getLogger(config["logger_name"])
44 self.host = config["host"]
45 self.port = config["port"]
46 self.loop = asyncio.get_event_loop()
47 self.broker = str(self.host) + ":" + str(self.port)
tierno73da4fa2018-08-31 13:50:59 +000048 self.group_id = config.get("group_id")
tierno5c012612018-04-19 16:01:59 +020049
50 except Exception as e: # TODO refine
51 raise MsgException(str(e))
52
53 def disconnect(self):
54 try:
tiernoebbf3532018-05-03 17:49:37 +020055 pass
56 # self.loop.close()
tierno5c012612018-04-19 16:01:59 +020057 except Exception as e: # TODO refine
58 raise MsgException(str(e))
59
60 def write(self, topic, key, msg):
tierno86577992018-05-10 16:51:17 +020061 """
62 Write a message at kafka bus
63 :param topic: message topic, must be string
64 :param key: message key, must be string
65 :param msg: message content, can be string or dictionary
66 :return: None or raises MsgException on failing
67 """
tierno5c012612018-04-19 16:01:59 +020068 try:
tierno86577992018-05-10 16:51:17 +020069 self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
tierno5c012612018-04-19 16:01:59 +020070
71 except Exception as e:
72 raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
73
74 def read(self, topic):
75 """
tierno86577992018-05-10 16:51:17 +020076 Read from one or several topics.
tierno5c012612018-04-19 16:01:59 +020077 :param topic: can be str: single topic; or str list: several topics
78 :return: topic, key, message; or None
79 """
80 try:
81 return self.loop.run_until_complete(self.aioread(topic, self.loop))
82 except MsgException:
83 raise
84 except Exception as e:
85 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
86
87 async def aiowrite(self, topic, key, msg, loop=None):
88
89 if not loop:
90 loop = self.loop
91 try:
92 self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
93 bootstrap_servers=self.broker)
94 await self.producer.start()
tierno3054f782018-04-25 16:59:53 +020095 await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
tierno5c012612018-04-19 16:01:59 +020096 except Exception as e:
tierno3054f782018-04-25 16:59:53 +020097 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
tierno5c012612018-04-19 16:01:59 +020098 finally:
99 await self.producer.stop()
100
101 async def aioread(self, topic, loop=None, callback=None, *args):
102 """
103 Asyncio read from one or several topics. It blocks
104 :param topic: can be str: single topic; or str list: several topics
105 :param loop: asyncio loop
106 :callback: callback function that will handle the message in kafka bus
107 :*args: optional arguments for callback function
108 :return: topic, key, message
109 """
110
111 if not loop:
112 loop = self.loop
113 try:
114 if isinstance(topic, (list, tuple)):
115 topic_list = topic
116 else:
117 topic_list = (topic,)
118
tierno73da4fa2018-08-31 13:50:59 +0000119 self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
tierno5c012612018-04-19 16:01:59 +0200120 await self.consumer.start()
121 self.consumer.subscribe(topic_list)
122
123 async for message in self.consumer:
124 if callback:
125 callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
126 else:
127 return message.topic, yaml.load(message.key), yaml.load(message.value)
128 except KafkaError as e:
129 raise MsgException(str(e))
130 finally:
131 await self.consumer.stop()