MAX_BODY_SIZE = 1 * MB # Max. size loaded into memory!
PORT = 4568
- def __init__(self, store, cleanup_interval=60):
+ def __init__(self, store, loop, cleanup_interval=60):
self.store = store
+ self.loop = loop
- self.cleaner = CleanupThread(self.store, cleanup_interval=cleanup_interval)
+ assert self.loop is not None
+
+ self.cleaner = CleanupThread(self.store, loop=self.loop, cleanup_interval=cleanup_interval)
self.cleaner.start()
super(StagingApplication, self).__init__([
class CleanupThread(threading.Thread):
"""Daemon thread that clean up the staging area
"""
- def __init__(self, store, log=None, cleanup_interval=60):
+ def __init__(self, store, loop, log=None, cleanup_interval=60):
"""
Args:
- store : A compatible store object
+ store: A compatible store object
log (None, optional): Log handle
cleanup_interval (int, optional): Cleanup interval in secs
+ loop: Tasklet main loop
"""
super().__init__()
- self.log = log or logging.getLogger()
- self.store = store
+ self.log = log or logging.getLogger()
+ self.store = store
self._cleaner = CleanUpStaging(store, log)
self.cleanup_interval = cleanup_interval
- self.daemon = True
+ self.daemon = True
+ self.loop = loop
+
+ assert self.loop is not None
def run(self):
try:
while True:
- self._cleaner.cleanup()
+ self.loop.call_soon_threadsafe(self._cleaner.cleanup, )
time.sleep(self.cleanup_interval)
except Exception as e: