pip3 install --upgrade aiokafka==0.4.*
pip3 install --upgrade pymongo==3.7.*
pip3 install --upgrade pyyaml==3.*
+# pip3 install pycrypto added as dependency: python3-crypto
#Creation of log folder
mkdir -p /var/log/osm
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aioread(self, topic, loop):
+ async def aioread(self, topic, loop=None, callback=None, aiocallback=None, **kwargs):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
while True:
msg = self.read(topic, blocks=False)
if msg:
- return msg
+ if callback:
+ callback(*msg, **kwargs)
+ elif aiocallback:
+ await aiocallback(*msg, **kwargs)
+ else:
+ return msg
await asyncio.sleep(2, loop=loop)
except MsgException:
raise