1class Transaction(object): 2 """Filesystem transaction write context 3 4 Gathers files for deferred commit or discard, so that several write 5 operations can be finalized semi-atomically. This works by having this 6 instance as the ``.transaction`` attribute of the given filesystem 7 """ 8 9 def __init__(self, fs): 10 """ 11 Parameters 12 ---------- 13 fs: FileSystem instance 14 """ 15 self.fs = fs 16 self.files = [] 17 18 def __enter__(self): 19 self.start() 20 21 def __exit__(self, exc_type, exc_val, exc_tb): 22 """End transaction and commit, if exit is not due to exception""" 23 # only commit if there was no exception 24 self.complete(commit=exc_type is None) 25 self.fs._intrans = False 26 self.fs._transaction = None 27 28 def start(self): 29 """Start a transaction on this FileSystem""" 30 self.files = [] # clean up after previous failed completions 31 self.fs._intrans = True 32 33 def complete(self, commit=True): 34 """Finish transaction: commit or discard all deferred files""" 35 for f in self.files: 36 if commit: 37 f.commit() 38 else: 39 f.discard() 40 self.files = [] 41 self.fs._intrans = False 42 43 44class FileActor(object): 45 def __init__(self): 46 self.files = [] 47 48 def commit(self): 49 for f in self.files: 50 f.commit() 51 self.files.clear() 52 53 def discard(self): 54 for f in self.files: 55 f.discard() 56 self.files.clear() 57 58 def append(self, f): 59 self.files.append(f) 60 61 62class DaskTransaction(Transaction): 63 def __init__(self, fs): 64 """ 65 Parameters 66 ---------- 67 fs: FileSystem instance 68 """ 69 import distributed 70 71 super().__init__(fs) 72 client = distributed.default_client() 73 self.files = client.submit(FileActor, actor=True).result() 74 75 def complete(self, commit=True): 76 """Finish transaction: commit or discard all deferred files""" 77 if commit: 78 self.files.commit().result() 79 else: 80 self.files.discard().result() 81 self.fs._intrans = False 82