X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Fmsgkafka.py;fp=osm_common%2Fmsgkafka.py;h=2d82f972f0b9bea4c635dcbcb5689ec7547d9bb1;hb=8657799f1469d2495c06087e88aca2777d425806;hp=382bdef7525c9e734bf4f433ad898f7e34cad747;hpb=f72494004517729569bd770f045708bc422df23c;p=osm%2Fcommon.git diff --git a/osm_common/msgkafka.py b/osm_common/msgkafka.py index 382bdef..2d82f97 100644 --- a/osm_common/msgkafka.py +++ b/osm_common/msgkafka.py @@ -41,17 +41,22 @@ class MsgKafka(MsgBase): raise MsgException(str(e)) def write(self, topic, key, msg): + """ + Write a message at kafka bus + :param topic: message topic, must be string + :param key: message key, must be string + :param msg: message content, can be string or dictionary + :return: None or raises MsgException on failing + """ try: - self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, - msg=yaml.safe_dump(msg, default_flow_style=True), - loop=self.loop)) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=msg)) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): """ - Read from one or several topics. it is non blocking returning None if nothing is available + Read from one or several topics. :param topic: can be str: single topic; or str list: several topics :return: topic, key, message; or None """