1# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf8 -*-
2#
3# Copyright 2002 Ben Escoto <ben@emerose.org>
4# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
5# Copyright 2008 Peter Schuller <peter.schuller@infidyne.com>
6#
7# This file is part of duplicity.
8#
9# Duplicity is free software; you can redistribute it and/or modify it
10# under the terms of the GNU General Public License as published by the
11# Free Software Foundation; either version 2 of the License, or (at your
12# option) any later version.
13#
14# Duplicity is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17# General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with duplicity; if not, write to the Free Software Foundation,
21# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
23u"""
24Asynchronous job scheduler, for concurrent execution with minimalistic
25dependency guarantees.
26"""
27
28from future import standard_library
29standard_library.install_aliases()
30from builtins import object
31import duplicity
32from duplicity import log
33from duplicity.dup_threading import require_threading
34from duplicity.dup_threading import interruptably_wait
35from duplicity.dup_threading import async_split
36from duplicity.dup_threading import with_lock
37
38thread = duplicity.dup_threading.thread_module()
39threading = duplicity.dup_threading.threading_module()
40
41
42class AsyncScheduler(object):
43    u"""
44    Easy-to-use scheduler of function calls to be executed
45    concurrently. A very simple dependency mechanism exists in the
46    form of barriers (see insert_barrier()).
47
48    Each instance has a concurrency level associated with it. A
49    concurrency of 0 implies that all tasks will be executed
50    synchronously when scheduled. A concurrency of 1 indicates that a
51    task will be executed asynchronously, but never concurrently with
52    other tasks. Both 0 and 1 guarantee strict ordering among all
53    tasks (i.e., they will be executed in the order scheduled).
54
55    At concurrency levels above 1, the tasks will end up being
56    executed in an order undetermined except insofar as is enforced by
57    calls to insert_barrier().
58
59    An AsynchScheduler should be created for any independent process;
60    the scheduler will assume that if any background job fails (raises
61    an exception), it makes further work moot.
62    """
63
64    def __init__(self, concurrency):
65        u"""
66        Create an asynchronous scheduler that executes jobs with the
67        given level of concurrency.
68        """
69        log.Info(u"%s: %s" % (self.__class__.__name__,
70                              _(u"instantiating at concurrency %d") %
71                              (concurrency)))
72        assert concurrency >= 0, u"%s concurrency level must be >= 0" % (self.__class__.__name__,)
73
74        self.__failed = False  # has at least one task failed so far?
75        self.__failed_waiter = None  # when __failed, the waiter of the first task that failed
76        self.__concurrency = concurrency
77        self.__worker_count = 0  # number of active workers
78        self.__waiter_count = 0  # number of threads waiting to submit work
79        self.__barrier = False  # barrier currently in effect?
80        self.__cv = threading.Condition()  # for simplicity, we use a single cv with its lock
81#                                                    # for everything, even if the resulting notifyAll():s
82#                                                    # are not technically efficient.
83
84        if concurrency > 0:
85            require_threading(u"concurrency > 0 (%d)" % (concurrency,))
86
87    def insert_barrier(self):
88        u"""
89        Proclaim that any tasks scheduled prior to the call to this
90        method MUST be executed prior to any tasks scheduled after the
91        call to this method.
92
93        The intended use case is that if task B depends on A, a
94        barrier must be inserted in between to guarantee that A
95        happens before B.
96        """
97        log.Debug(u"%s: %s" % (self.__class__.__name__, _(u"inserting barrier")))
98        # With concurrency 0 it's a NOOP, and due to the special case in
99        # task scheduling we do not want to append to the queue (will never
100        # be popped).
101        if self.__concurrency > 0:
102            def _insert_barrier():
103                self.__barrier = True
104
105            with_lock(self.__cv, _insert_barrier)
106
107    def schedule_task(self, fn, params):
108        u"""
109        Schedule the given task (callable, typically function) for
110        execution. Pass the given parameters to the function when
111        calling it. Returns a callable which can optionally be used
112        to wait for the task to complete, either by returning its
113        return value or by propagating any exception raised by said
114        task.
115
116        This method may block or return immediately, depending on the
117        configuration and state of the scheduler.
118
119        This method may also raise an exception in order to trigger
120        failures early, if the task (if run synchronously) or a previous
121        task has already failed.
122
123        NOTE: Pay particular attention to the scope in which this is
124        called. In particular, since it will execute concurrently in
125        the background, assuming fn is a closure, any variables used
126        most be properly bound in the closure. This is the reason for
127        the convenience feature of being able to give parameters to
128        the call, to avoid having to wrap the call itself in a
129        function in order to "fixate" variables in, for example, an
130        enclosing loop.
131        """
132        assert fn is not None
133
134        # Note: It is on purpose that we keep track of concurrency in
135        # the front end and launch threads for each task, rather than
136        # keep a pool of workers. The overhead is not relevant in the
137        # situation this will be used, and it removes complexity in
138        # terms of ensuring the scheduler is garbage collected/shut
139        # down properly when no longer referenced/needed by calling
140        # code.
141
142        if self.__concurrency == 0:
143            # special case this to not require any platform support for
144            # threading at all
145            log.Info(u"%s: %s" % (self.__class__.__name__,
146                     _(u"running task synchronously (asynchronicity disabled)")),
147                     log.InfoCode.synchronous_upload_begin)
148
149            return self.__run_synchronously(fn, params)
150        else:
151            log.Info(u"%s: %s" % (self.__class__.__name__,
152                     _(u"scheduling task for asynchronous execution")),
153                     log.InfoCode.asynchronous_upload_begin)
154
155            return self.__run_asynchronously(fn, params)
156
157    def wait(self):
158        u"""
159        Wait for the scheduler to become entirely empty (i.e., all
160        tasks having run to completion).
161
162        IMPORTANT: This is only useful with a single caller scheduling
163        tasks, such that no call to schedule_task() is currently in
164        progress or may happen subsequently to the call to wait().
165        """
166        def _wait():
167            interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
168
169        with_lock(self.__cv, _wait)
170
171    def __run_synchronously(self, fn, params):
172
173        # When running synchronously, we immediately leak any exception raised
174        # for immediate failure reporting to calling code.
175        ret = fn(*params)
176
177        def _waiter():
178            return ret
179
180        log.Info(u"%s: %s" % (self.__class__.__name__,
181                 _(u"task completed successfully")),
182                 log.InfoCode.synchronous_upload_done)
183
184        return _waiter
185
186    def __run_asynchronously(self, fn, params):
187        (waiter, caller) = async_split(lambda: fn(*params))
188
189        def check_pending_failure():
190            if self.__failed:
191                log.Info(u"%s: %s" % (self.__class__.__name__,
192                         _(u"a previously scheduled task has failed; "
193                           u"propagating the result immediately")),
194                         log.InfoCode.asynchronous_upload_done)
195                self.__failed_waiter()
196                raise AssertionError(u"%s: waiter should have raised an exception; "
197                                     u"this is a bug" % (self.__class__.__name__,))
198
199        def wait_for_and_register_launch():
200            check_pending_failure()  # raise on fail
201            while self.__worker_count >= self.__concurrency or self.__barrier:
202                if self.__worker_count == 0:
203                    assert self.__barrier, u"barrier should be in effect"
204                    self.__barrier = False
205                    self.__cv.notifyAll()
206                else:
207                    self.__waiter_count += 1
208                    self.__cv.wait()
209                    self.__waiter_count -= 1
210
211                check_pending_failure()  # raise on fail
212
213            self.__worker_count += 1
214            log.Debug(u"%s: %s" % (self.__class__.__name__,
215                                   _(u"active workers = %d") % (self.__worker_count,)))
216
217        # simply wait for an OK condition to start, then launch our worker. the worker
218        # never waits on us, we just wait for them.
219        with_lock(self.__cv, wait_for_and_register_launch)
220
221        self.__start_worker(caller)
222
223        return waiter
224
225    def __start_worker(self, caller):
226        u"""
227        Start a new worker.
228        """
229        def trampoline():
230            try:
231                self.__execute_caller(caller)
232            finally:
233                def complete_worker():
234                    self.__worker_count -= 1
235                    log.Debug(u"%s: %s" % (self.__class__.__name__,
236                                           _(u"active workers = %d") % (self.__worker_count,)))
237                    self.__cv.notifyAll()
238                with_lock(self.__cv, complete_worker)
239
240        thread.start_new_thread(trampoline, ())
241
242    def __execute_caller(self, caller):
243        # The caller half that we get here will not propagate
244        # errors back to us, but rather propagate it back to the
245        # "other half" of the async split.
246        succeeded, waiter = caller()
247        if not succeeded:
248            def _signal_failed():
249                if not self.__failed:
250                    self.__failed = True
251                    self.__failed_waiter = waiter
252                    self.__cv.notifyAll()
253            with_lock(self.__cv, _signal_failed)
254
255        log.Info(u"%s: %s" % (self.__class__.__name__,
256                 _(u"task execution done (success: %s)") % succeeded),
257                 log.InfoCode.asynchronous_upload_done)
258