Update from master
[osm/common.git] / osm_common / msglocal.py
index c10ff17..0c3b216 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import asyncio
+from http import HTTPStatus
 import logging
 import os
-import yaml
-import asyncio
-from osm_common.msgbase import MsgBase, MsgException
 from time import sleep
-from http import HTTPStatus
 
-__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+from osm_common.msgbase import MsgBase, MsgException
+import yaml
 
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
 """
 This emulated kafka bus by just using a shared file system. Useful for testing or devops.
 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
@@ -41,7 +41,6 @@ class MsgLocal(MsgBase):
         self.files_read = {}
         self.files_write = {}
         self.buffer = {}
-        self.loop = None
 
     def connect(self, config):
         try:
@@ -52,7 +51,6 @@ class MsgLocal(MsgBase):
                 self.path += "/"
             if not os.path.exists(self.path):
                 os.mkdir(self.path)
-            self.loop = config.get("loop")
 
         except MsgException:
             raise
@@ -64,14 +62,37 @@ class MsgLocal(MsgBase):
             try:
                 f.close()
                 self.files_read[topic] = None
-            except Exception:  # TODO refine
-                pass
+            except Exception as read_topic_error:
+                if isinstance(read_topic_error, (IOError, FileNotFoundError)):
+                    self.logger.exception(
+                        f"{read_topic_error} occured while closing read topic files."
+                    )
+                elif isinstance(read_topic_error, KeyError):
+                    self.logger.exception(
+                        f"{read_topic_error} occured while reading from files_read dictionary."
+                    )
+                else:
+                    self.logger.exception(
+                        f"{read_topic_error} occured while closing read topics."
+                    )
+
         for topic, f in self.files_write.items():
             try:
                 f.close()
                 self.files_write[topic] = None
-            except Exception:  # TODO refine
-                pass
+            except Exception as write_topic_error:
+                if isinstance(write_topic_error, (IOError, FileNotFoundError)):
+                    self.logger.exception(
+                        f"{write_topic_error} occured while closing write topic files."
+                    )
+                elif isinstance(write_topic_error, KeyError):
+                    self.logger.exception(
+                        f"{write_topic_error} occured while reading from files_write dictionary."
+                    )
+                else:
+                    self.logger.exception(
+                        f"{write_topic_error} occured while closing write topics."
+                    )
 
     def write(self, topic, key, msg):
         """
@@ -122,7 +143,10 @@ class MsgLocal(MsgBase):
                             continue
                         msg_dict = yaml.safe_load(self.buffer[single_topic])
                         self.buffer[single_topic] = ""
-                        assert len(msg_dict) == 1
+                        if len(msg_dict) != 1:
+                            raise ValueError(
+                                "Length of message dictionary is not equal to 1"
+                            )
                         for k, v in msg_dict.items():
                             return single_topic, k, v
                 if not blocks:
@@ -132,12 +156,11 @@ class MsgLocal(MsgBase):
             raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
 
     async def aioread(
-        self, topic, loop=None, callback=None, aiocallback=None, group_id=None, **kwargs
+        self, topic, callback=None, aiocallback=None, group_id=None, **kwargs
     ):
         """
         Asyncio read from one or several topics. It blocks
         :param topic: can be str: single topic; or str list: several topics
-        :param loop: asyncio loop. To be DEPRECATED! in near future!!!  loop must be provided inside config at connect
         :param callback: synchronous callback function that will handle the message
         :param aiocallback: async callback function that will handle the message
         :param group_id: group_id to use for load balancing. Can be False (set group_id to None), None (use general
@@ -145,7 +168,6 @@ class MsgLocal(MsgBase):
         :param kwargs: optional keyword arguments for callback function
         :return: If no callback defined, it returns (topic, key, message)
         """
-        _loop = loop or self.loop
         try:
             while True:
                 msg = self.read(topic, blocks=False)
@@ -156,19 +178,18 @@ class MsgLocal(MsgBase):
                         await aiocallback(*msg, **kwargs)
                     else:
                         return msg
-                await asyncio.sleep(2, loop=_loop)
+                await asyncio.sleep(2)
         except MsgException:
             raise
         except Exception as e:  # TODO refine
             raise MsgException(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
 
-    async def aiowrite(self, topic, key, msg, loop=None):
+    async def aiowrite(self, topic, key, msg):
         """
         Asyncio write. It blocks
         :param topic: str
         :param key: str
         :param msg: message, can be str or yaml
-        :param loop: asyncio loop
         :return: nothing if ok or raises an exception
         """
         return self.write(topic, key, msg)