1from __future__ import absolute_import, print_function, unicode_literals 2 3import contextlib 4 5from zope.interface import implementer 6 7from ._interfaces import IJournal 8 9 10@implementer(IJournal) 11class Journal(object): 12 def __init__(self, save_checkpoint): 13 self._save_checkpoint = save_checkpoint 14 self._outbound_queue = [] 15 self._processing = False 16 17 def queue_outbound(self, fn, *args, **kwargs): 18 assert self._processing 19 self._outbound_queue.append((fn, args, kwargs)) 20 21 @contextlib.contextmanager 22 def process(self): 23 assert not self._processing 24 assert not self._outbound_queue 25 self._processing = True 26 yield # process inbound messages, change state, queue outbound 27 self._save_checkpoint() 28 for (fn, args, kwargs) in self._outbound_queue: 29 fn(*args, **kwargs) 30 self._outbound_queue[:] = [] 31 self._processing = False 32 33 34@implementer(IJournal) 35class ImmediateJournal(object): 36 def __init__(self): 37 pass 38 39 def queue_outbound(self, fn, *args, **kwargs): 40 fn(*args, **kwargs) 41 42 @contextlib.contextmanager 43 def process(self): 44 yield 45