msgkafka, provide loop on config
[osm/common.git] / osm_common / msgkafka.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 import asyncio
18 import yaml
19 from aiokafka import AIOKafkaConsumer
20 from aiokafka import AIOKafkaProducer
21 from aiokafka.errors import KafkaError
22 from osm_common.msgbase import MsgBase, MsgException
23
24 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
25 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
26
27
28 class MsgKafka(MsgBase):
29 def __init__(self, logger_name='msg', lock=False):
30 super().__init__(logger_name, lock)
31 self.host = None
32 self.port = None
33 self.consumer = None
34 self.producer = None
35 self.loop = None
36 self.broker = None
37 self.group_id = None
38
39 def connect(self, config):
40 try:
41 if "logger_name" in config:
42 self.logger = logging.getLogger(config["logger_name"])
43 self.host = config["host"]
44 self.port = config["port"]
45 self.loop = config.get("loop") or asyncio.get_event_loop()
46 self.broker = str(self.host) + ":" + str(self.port)
47 self.group_id = config.get("group_id")
48
49 except Exception as e: # TODO refine
50 raise MsgException(str(e))
51
52 def disconnect(self):
53 try:
54 pass
55 # self.loop.close()
56 except Exception as e: # TODO refine
57 raise MsgException(str(e))
58
59 def write(self, topic, key, msg):
60 """
61 Write a message at kafka bus
62 :param topic: message topic, must be string
63 :param key: message key, must be string
64 :param msg: message content, can be string or dictionary
65 :return: None or raises MsgException on failing
66 """
67 try:
68 self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
69
70 except Exception as e:
71 raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
72
73 def read(self, topic):
74 """
75 Read from one or several topics.
76 :param topic: can be str: single topic; or str list: several topics
77 :return: topic, key, message; or None
78 """
79 try:
80 return self.loop.run_until_complete(self.aioread(topic, self.loop))
81 except MsgException:
82 raise
83 except Exception as e:
84 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
85
86 async def aiowrite(self, topic, key, msg, loop=None):
87 """
88 Asyncio write
89 :param topic: str kafka topic
90 :param key: str kafka key
91 :param msg: str or dictionary kafka message
92 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
93 :return: None
94 """
95
96 if not loop:
97 loop = self.loop
98 try:
99 self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
100 bootstrap_servers=self.broker)
101 await self.producer.start()
102 await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
103 except Exception as e:
104 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
105 finally:
106 await self.producer.stop()
107
108 async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
109 """
110 Asyncio read from one or several topics.
111 :param topic: can be str: single topic; or str list: several topics
112 :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
113 :param callback: synchronous callback function that will handle the message in kafka bus
114 :param aiocallback: async callback function that will handle the message in kafka bus
115 :param kwargs: optional keyword arguments for callback function
116 :return: If no callback defined, it returns (topic, key, message)
117 """
118
119 if not loop:
120 loop = self.loop
121 try:
122 if isinstance(topic, (list, tuple)):
123 topic_list = topic
124 else:
125 topic_list = (topic,)
126
127 self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
128 await self.consumer.start()
129 self.consumer.subscribe(topic_list)
130
131 async for message in self.consumer:
132 if callback:
133 callback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
134 elif aiocallback:
135 await aiocallback(message.topic, yaml.load(message.key), yaml.load(message.value), **kwargs)
136 else:
137 return message.topic, yaml.load(message.key), yaml.load(message.value)
138 except KafkaError as e:
139 raise MsgException(str(e))
140 finally:
141 await self.consumer.stop()