# See the License for the specific language governing permissions and
# limitations under the License.
+import asyncio
+from http import HTTPStatus
import logging
import os
-import yaml
-import asyncio
-from osm_common.msgbase import MsgBase, MsgException
from time import sleep
-from http import HTTPStatus
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+from osm_common.msgbase import MsgBase, MsgException
+import yaml
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"""
This emulated kafka bus by just using a shared file system. Useful for testing or devops.
One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
try:
f.close()
self.files_read[topic] = None
- except Exception: # TODO refine
- pass
+ except Exception as read_topic_error:
+ if isinstance(read_topic_error, (IOError, FileNotFoundError)):
+ self.logger.exception(
+ f"{read_topic_error} occured while closing read topic files."
+ )
+ elif isinstance(read_topic_error, KeyError):
+ self.logger.exception(
+ f"{read_topic_error} occured while reading from files_read dictionary."
+ )
+ else:
+ self.logger.exception(
+ f"{read_topic_error} occured while closing read topics."
+ )
+
for topic, f in self.files_write.items():
try:
f.close()
self.files_write[topic] = None
- except Exception: # TODO refine
- pass
+ except Exception as write_topic_error:
+ if isinstance(write_topic_error, (IOError, FileNotFoundError)):
+ self.logger.exception(
+ f"{write_topic_error} occured while closing write topic files."
+ )
+ elif isinstance(write_topic_error, KeyError):
+ self.logger.exception(
+ f"{write_topic_error} occured while reading from files_write dictionary."
+ )
+ else:
+ self.logger.exception(
+ f"{write_topic_error} occured while closing write topics."
+ )
def write(self, topic, key, msg):
"""
continue
msg_dict = yaml.safe_load(self.buffer[single_topic])
self.buffer[single_topic] = ""
- assert len(msg_dict) == 1
+ if len(msg_dict) != 1:
+ raise ValueError(
+ "Length of message dictionary is not equal to 1"
+ )
for k, v in msg_dict.items():
return single_topic, k, v
if not blocks: