self.files_read = {}
self.files_write = {}
self.buffer = {}
- self.loop = None
def connect(self, config):
try:
self.path += "/"
if not os.path.exists(self.path):
os.mkdir(self.path)
- self.loop = config.get("loop")
except MsgException:
raise
try:
f.close()
self.files_read[topic] = None
- except Exception: # TODO refine
- pass
+ except Exception as read_topic_error:
+ if isinstance(read_topic_error, (IOError, FileNotFoundError)):
+ self.logger.exception(
+ f"{read_topic_error} occured while closing read topic files."
+ )
+ elif isinstance(read_topic_error, KeyError):
+ self.logger.exception(
+ f"{read_topic_error} occured while reading from files_read dictionary."
+ )
+ else:
+ self.logger.exception(
+ f"{read_topic_error} occured while closing read topics."
+ )
+
for topic, f in self.files_write.items():
try:
f.close()
self.files_write[topic] = None
- except Exception: # TODO refine
- pass
+ except Exception as write_topic_error:
+ if isinstance(write_topic_error, (IOError, FileNotFoundError)):
+ self.logger.exception(
+ f"{write_topic_error} occured while closing write topic files."
+ )
+ elif isinstance(write_topic_error, KeyError):
+ self.logger.exception(
+ f"{write_topic_error} occured while reading from files_write dictionary."
+ )
+ else:
+ self.logger.exception(
+ f"{write_topic_error} occured while closing write topics."
+ )
def write(self, topic, key, msg):
"""
continue
msg_dict = yaml.safe_load(self.buffer[single_topic])
self.buffer[single_topic] = ""
- assert len(msg_dict) == 1
+ if len(msg_dict) != 1:
+ raise ValueError(
+ "Length of message dictionary is not equal to 1"
+ )
for k, v in msg_dict.items():
return single_topic, k, v
if not blocks:
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
async def aioread(
- self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+ self, topic, callback=None, aiocallback=None, group_id=None, **kwargs
):
"""
Asyncio read from one or several topics. It blocks
:param topic: can be str: single topic; or str list: several topics
- :param loop: asyncio loop. To be DEPRECATED! in near future!!! loop must be provided inside config at connect
:param callback: synchronous callback function that will handle the message
:param aiocallback: async callback function that will handle the message
:param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
:param kwargs: optional keyword arguments for callback function
:return: If no callback defined, it returns (topic, key, message)
"""
- _loop = loop or self.loop
try:
while True:
msg = self.read(topic, blocks=False)
await aiocallback(*msg, **kwargs)
else:
return msg
- await asyncio.sleep(2, loop=_loop)
+ await asyncio.sleep(2)
except MsgException:
raise
except Exception as e: # TODO refine
raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
- async def aiowrite(self, topic, key, msg, loop=None):
+ async def aiowrite(self, topic, key, msg):
"""
Asyncio write. It blocks
:param topic: str
:param key: str
:param msg: message, can be str or yaml
- :param loop: asyncio loop
:return: nothing if ok or raises an exception
"""
return self.write(topic, key, msg)