5 from msgbase
import MsgBase
, MsgException
8 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
11 This emulated kafka bus by just using a shared file system. Usefull for testing or devops.
12 One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
13 access to the same file. e.g. same volume if running with docker.
14 One text line per message is used in yaml format
17 class MsgLocal(MsgBase
):
19 def __init__(self
, logger_name
='msg'):
20 self
.logger
= logging
.getLogger(logger_name
)
22 # create a different file for each topic
26 def connect(self
, config
):
28 if "logger_name" in config
:
29 self
.logger
= logging
.getLogger(config
["logger_name"])
30 self
.path
= config
["path"]
31 if not self
.path
.endswith("/"):
33 if not os
.path
.exists(self
.path
):
37 except Exception as e
: # TODO refine
38 raise MsgException(str(e
))
41 for f
in self
.files
.values():
44 except Exception as e
: # TODO refine
47 def write(self
, topic
, key
, msg
):
49 Insert a message into topic
51 :param key: key text to be inserted
52 :param msg: value object to be inserted, can be str, object ...
53 :return: None or raises and exception
56 if topic
not in self
.files
:
57 self
.files
[topic
] = open(self
.path
+ topic
, "a+")
58 yaml
.safe_dump({key
: msg
}, self
.files
[topic
], default_flow_style
=True, width
=20000)
59 self
.files
[topic
].flush()
60 except Exception as e
: # TODO refine
61 raise MsgException(str(e
))
63 def read(self
, topic
, blocks
=True):
65 Read from one or several topics. it is non blocking returning None if nothing is available
66 :param topic: can be str: single topic; or str list: several topics
67 :param blocks: indicates if it should wait and block until a message is present or returns None
68 :return: topic, key, message; or None if blocks==True
71 if isinstance(topic
, (list, tuple)):
74 topic_list
= (topic
, )
76 for single_topic
in topic_list
:
77 if single_topic
not in self
.files
:
78 self
.files
[single_topic
] = open(self
.path
+ single_topic
, "a+")
79 self
.buffer[single_topic
] = ""
80 self
.buffer[single_topic
] += self
.files
[single_topic
].readline()
81 if not self
.buffer[single_topic
].endswith("\n"):
83 msg_dict
= yaml
.load(self
.buffer[single_topic
])
84 self
.buffer[single_topic
] = ""
85 assert len(msg_dict
) == 1
86 for k
, v
in msg_dict
.items():
87 return single_topic
, k
, v
91 except Exception as e
: # TODO refine
92 raise MsgException(str(e
))
94 async def aioread(self
, topic
, loop
):
96 Asyncio read from one or several topics. It blocks
97 :param topic: can be str: single topic; or str list: several topics
98 :param loop: asyncio loop
99 :return: topic, key, message
103 msg
= self
.read(topic
, blocks
=False)
106 await asyncio
.sleep(2, loop
=loop
)
109 except Exception as e
: # TODO refine
110 raise MsgException(str(e
))