projects
/
osm
/
common.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Adding tests for base classes
[osm/common.git]
/
osm_common
/
msgkafka.py
diff --git
a/osm_common/msgkafka.py
b/osm_common/msgkafka.py
index
c819c81
..
2d82f97
100644
(file)
--- a/
osm_common/msgkafka.py
+++ b/
osm_common/msgkafka.py
@@
-4,11
+4,13
@@
import yaml
from aiokafka import AIOKafkaConsumer
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
from aiokafka import AIOKafkaConsumer
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
-from msgbase import MsgBase, MsgException
-#import json
+from
osm_common.
msgbase import MsgBase, MsgException
+#
import json
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
"Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>, " \
"Guillermo Calvino <guillermo.calvinosanchez@altran.com>"
+
+
class MsgKafka(MsgBase):
def __init__(self, logger_name='msg'):
self.logger = logging.getLogger(logger_name)
class MsgKafka(MsgBase):
def __init__(self, logger_name='msg'):
self.logger = logging.getLogger(logger_name)
@@
-33,22
+35,28
@@
class MsgKafka(MsgBase):
def disconnect(self):
try:
def disconnect(self):
try:
- self.loop.close()
+ pass
+ # self.loop.close()
except Exception as e: # TODO refine
raise MsgException(str(e))
def write(self, topic, key, msg):
except Exception as e: # TODO refine
raise MsgException(str(e))
def write(self, topic, key, msg):
+ """
+ Write a message at kafka bus
+ :param topic: message topic, must be string
+ :param key: message key, must be string
+ :param msg: message content, can be string or dictionary
+ :return: None or raises MsgException on failing
+ """
try:
try:
- self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
- msg=yaml.safe_dump(msg, default_flow_style=True),
- loop=self.loop))
+ self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg))
except Exception as e:
raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
def read(self, topic):
"""
except Exception as e:
raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
def read(self, topic):
"""
- Read from one or several topics.
it is non blocking returning None if nothing is available
+ Read from one or several topics.
:param topic: can be str: single topic; or str list: several topics
:return: topic, key, message; or None
"""
:param topic: can be str: single topic; or str list: several topics
:return: topic, key, message; or None
"""
@@
-67,9
+75,9
@@
class MsgKafka(MsgBase):
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
await self.producer.start()
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
await self.producer.start()
- await self.producer.send(topic=topic, key=key, value=
msg
)
+ await self.producer.send(topic=topic, key=key, value=
yaml.safe_dump(msg, default_flow_style=True)
)
except Exception as e:
except Exception as e:
- raise MsgException("Error publishing to
{} topic: {}".format(topic, str(e)
))
+ raise MsgException("Error publishing to
pic '{}', key '{}': {}".format(topic, key, e
))
finally:
await self.producer.stop()
finally:
await self.producer.stop()
@@
-104,4
+112,3
@@
class MsgKafka(MsgBase):
raise MsgException(str(e))
finally:
await self.consumer.stop()
raise MsgException(str(e))
finally:
await self.consumer.stop()
-