Added set_list at database.
[osm/common.git] / osm_common / msgkafka.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 import asyncio
18 import yaml
19 from aiokafka import AIOKafkaConsumer
20 from aiokafka import AIOKafkaProducer
21 from aiokafka.errors import KafkaError
22 from osm_common.msgbase import MsgBase, MsgException
23 # import json
24
25 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
26 "Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
27
28
29 class MsgKafka(MsgBase):
30 def __init__(self, logger_name='msg'):
31 self.logger = logging.getLogger(logger_name)
32 self.host = None
33 self.port = None
34 self.consumer = None
35 self.producer = None
36 self.loop = None
37 self.broker = None
38 self.group_id = None
39
40 def connect(self, config):
41 try:
42 if "logger_name" in config:
43 self.logger = logging.getLogger(config["logger_name"])
44 self.host = config["host"]
45 self.port = config["port"]
46 self.loop = asyncio.get_event_loop()
47 self.broker = str(self.host) + ":" + str(self.port)
48 self.group_id = config.get("group_id")
49
50 except Exception as e: # TODO refine
51 raise MsgException(str(e))
52
53 def disconnect(self):
54 try:
55 pass
56 # self.loop.close()
57 except Exception as e: # TODO refine
58 raise MsgException(str(e))
59
60 def write(self, topic, key, msg):
61 """
62 Write a message at kafka bus
63 :param topic: message topic, must be string
64 :param key: message key, must be string
65 :param msg: message content, can be string or dictionary
66 :return: None or raises MsgException on failing
67 """
68 try:
69 self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
70
71 except Exception as e:
72 raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
73
74 def read(self, topic):
75 """
76 Read from one or several topics.
77 :param topic: can be str: single topic; or str list: several topics
78 :return: topic, key, message; or None
79 """
80 try:
81 return self.loop.run_until_complete(self.aioread(topic, self.loop))
82 except MsgException:
83 raise
84 except Exception as e:
85 raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
86
87 async def aiowrite(self, topic, key, msg, loop=None):
88
89 if not loop:
90 loop = self.loop
91 try:
92 self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
93 bootstrap_servers=self.broker)
94 await self.producer.start()
95 await self.producer.send(topic=topic, key=key, value=yaml.safe_dump(msg, default_flow_style=True))
96 except Exception as e:
97 raise MsgException("Error publishing topic '{}', key '{}': {}".format(topic, key, e))
98 finally:
99 await self.producer.stop()
100
101 async def aioread(self, topic, loop=None, callback=None, *args):
102 """
103 Asyncio read from one or several topics. It blocks
104 :param topic: can be str: single topic; or str list: several topics
105 :param loop: asyncio loop
106 :callback: callback function that will handle the message in kafka bus
107 :*args: optional arguments for callback function
108 :return: topic, key, message
109 """
110
111 if not loop:
112 loop = self.loop
113 try:
114 if isinstance(topic, (list, tuple)):
115 topic_list = topic
116 else:
117 topic_list = (topic,)
118
119 self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=self.group_id)
120 await self.consumer.start()
121 self.consumer.subscribe(topic_list)
122
123 async for message in self.consumer:
124 if callback:
125 callback(message.topic, yaml.load(message.key), yaml.load(message.value), *args)
126 else:
127 return message.topic, yaml.load(message.key), yaml.load(message.value)
128 except KafkaError as e:
129 raise MsgException(str(e))
130 finally:
131 await self.consumer.stop()