1class Transaction(object):
2    """Filesystem transaction write context
3
4    Gathers files for deferred commit or discard, so that several write
5    operations can be finalized semi-atomically. This works by having this
6    instance as the ``.transaction`` attribute of the given filesystem
7    """
8
9    def __init__(self, fs):
10        """
11        Parameters
12        ----------
13        fs: FileSystem instance
14        """
15        self.fs = fs
16        self.files = []
17
18    def __enter__(self):
19        self.start()
20
21    def __exit__(self, exc_type, exc_val, exc_tb):
22        """End transaction and commit, if exit is not due to exception"""
23        # only commit if there was no exception
24        self.complete(commit=exc_type is None)
25        self.fs._intrans = False
26        self.fs._transaction = None
27
28    def start(self):
29        """Start a transaction on this FileSystem"""
30        self.files = []  # clean up after previous failed completions
31        self.fs._intrans = True
32
33    def complete(self, commit=True):
34        """Finish transaction: commit or discard all deferred files"""
35        for f in self.files:
36            if commit:
37                f.commit()
38            else:
39                f.discard()
40        self.files = []
41        self.fs._intrans = False
42
43
44class FileActor(object):
45    def __init__(self):
46        self.files = []
47
48    def commit(self):
49        for f in self.files:
50            f.commit()
51        self.files.clear()
52
53    def discard(self):
54        for f in self.files:
55            f.discard()
56        self.files.clear()
57
58    def append(self, f):
59        self.files.append(f)
60
61
62class DaskTransaction(Transaction):
63    def __init__(self, fs):
64        """
65        Parameters
66        ----------
67        fs: FileSystem instance
68        """
69        import distributed
70
71        super().__init__(fs)
72        client = distributed.default_client()
73        self.files = client.submit(FileActor, actor=True).result()
74
75    def complete(self, commit=True):
76        """Finish transaction: commit or discard all deferred files"""
77        if commit:
78            self.files.commit().result()
79        else:
80            self.files.discard().result()
81        self.fs._intrans = False
82