1# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf8 -*- 2# 3# Copyright 2002 Ben Escoto <ben@emerose.org> 4# Copyright 2007 Kenneth Loafman <kenneth@loafman.com> 5# Copyright 2008 Peter Schuller <peter.schuller@infidyne.com> 6# 7# This file is part of duplicity. 8# 9# Duplicity is free software; you can redistribute it and/or modify it 10# under the terms of the GNU General Public License as published by the 11# Free Software Foundation; either version 2 of the License, or (at your 12# option) any later version. 13# 14# Duplicity is distributed in the hope that it will be useful, but 15# WITHOUT ANY WARRANTY; without even the implied warranty of 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17# General Public License for more details. 18# 19# You should have received a copy of the GNU General Public License 20# along with duplicity; if not, write to the Free Software Foundation, 21# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 22 23u""" 24Asynchronous job scheduler, for concurrent execution with minimalistic 25dependency guarantees. 26""" 27 28from future import standard_library 29standard_library.install_aliases() 30from builtins import object 31import duplicity 32from duplicity import log 33from duplicity.dup_threading import require_threading 34from duplicity.dup_threading import interruptably_wait 35from duplicity.dup_threading import async_split 36from duplicity.dup_threading import with_lock 37 38thread = duplicity.dup_threading.thread_module() 39threading = duplicity.dup_threading.threading_module() 40 41 42class AsyncScheduler(object): 43 u""" 44 Easy-to-use scheduler of function calls to be executed 45 concurrently. A very simple dependency mechanism exists in the 46 form of barriers (see insert_barrier()). 47 48 Each instance has a concurrency level associated with it. A 49 concurrency of 0 implies that all tasks will be executed 50 synchronously when scheduled. A concurrency of 1 indicates that a 51 task will be executed asynchronously, but never concurrently with 52 other tasks. Both 0 and 1 guarantee strict ordering among all 53 tasks (i.e., they will be executed in the order scheduled). 54 55 At concurrency levels above 1, the tasks will end up being 56 executed in an order undetermined except insofar as is enforced by 57 calls to insert_barrier(). 58 59 An AsynchScheduler should be created for any independent process; 60 the scheduler will assume that if any background job fails (raises 61 an exception), it makes further work moot. 62 """ 63 64 def __init__(self, concurrency): 65 u""" 66 Create an asynchronous scheduler that executes jobs with the 67 given level of concurrency. 68 """ 69 log.Info(u"%s: %s" % (self.__class__.__name__, 70 _(u"instantiating at concurrency %d") % 71 (concurrency))) 72 assert concurrency >= 0, u"%s concurrency level must be >= 0" % (self.__class__.__name__,) 73 74 self.__failed = False # has at least one task failed so far? 75 self.__failed_waiter = None # when __failed, the waiter of the first task that failed 76 self.__concurrency = concurrency 77 self.__worker_count = 0 # number of active workers 78 self.__waiter_count = 0 # number of threads waiting to submit work 79 self.__barrier = False # barrier currently in effect? 80 self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock 81# # for everything, even if the resulting notifyAll():s 82# # are not technically efficient. 83 84 if concurrency > 0: 85 require_threading(u"concurrency > 0 (%d)" % (concurrency,)) 86 87 def insert_barrier(self): 88 u""" 89 Proclaim that any tasks scheduled prior to the call to this 90 method MUST be executed prior to any tasks scheduled after the 91 call to this method. 92 93 The intended use case is that if task B depends on A, a 94 barrier must be inserted in between to guarantee that A 95 happens before B. 96 """ 97 log.Debug(u"%s: %s" % (self.__class__.__name__, _(u"inserting barrier"))) 98 # With concurrency 0 it's a NOOP, and due to the special case in 99 # task scheduling we do not want to append to the queue (will never 100 # be popped). 101 if self.__concurrency > 0: 102 def _insert_barrier(): 103 self.__barrier = True 104 105 with_lock(self.__cv, _insert_barrier) 106 107 def schedule_task(self, fn, params): 108 u""" 109 Schedule the given task (callable, typically function) for 110 execution. Pass the given parameters to the function when 111 calling it. Returns a callable which can optionally be used 112 to wait for the task to complete, either by returning its 113 return value or by propagating any exception raised by said 114 task. 115 116 This method may block or return immediately, depending on the 117 configuration and state of the scheduler. 118 119 This method may also raise an exception in order to trigger 120 failures early, if the task (if run synchronously) or a previous 121 task has already failed. 122 123 NOTE: Pay particular attention to the scope in which this is 124 called. In particular, since it will execute concurrently in 125 the background, assuming fn is a closure, any variables used 126 most be properly bound in the closure. This is the reason for 127 the convenience feature of being able to give parameters to 128 the call, to avoid having to wrap the call itself in a 129 function in order to "fixate" variables in, for example, an 130 enclosing loop. 131 """ 132 assert fn is not None 133 134 # Note: It is on purpose that we keep track of concurrency in 135 # the front end and launch threads for each task, rather than 136 # keep a pool of workers. The overhead is not relevant in the 137 # situation this will be used, and it removes complexity in 138 # terms of ensuring the scheduler is garbage collected/shut 139 # down properly when no longer referenced/needed by calling 140 # code. 141 142 if self.__concurrency == 0: 143 # special case this to not require any platform support for 144 # threading at all 145 log.Info(u"%s: %s" % (self.__class__.__name__, 146 _(u"running task synchronously (asynchronicity disabled)")), 147 log.InfoCode.synchronous_upload_begin) 148 149 return self.__run_synchronously(fn, params) 150 else: 151 log.Info(u"%s: %s" % (self.__class__.__name__, 152 _(u"scheduling task for asynchronous execution")), 153 log.InfoCode.asynchronous_upload_begin) 154 155 return self.__run_asynchronously(fn, params) 156 157 def wait(self): 158 u""" 159 Wait for the scheduler to become entirely empty (i.e., all 160 tasks having run to completion). 161 162 IMPORTANT: This is only useful with a single caller scheduling 163 tasks, such that no call to schedule_task() is currently in 164 progress or may happen subsequently to the call to wait(). 165 """ 166 def _wait(): 167 interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0) 168 169 with_lock(self.__cv, _wait) 170 171 def __run_synchronously(self, fn, params): 172 173 # When running synchronously, we immediately leak any exception raised 174 # for immediate failure reporting to calling code. 175 ret = fn(*params) 176 177 def _waiter(): 178 return ret 179 180 log.Info(u"%s: %s" % (self.__class__.__name__, 181 _(u"task completed successfully")), 182 log.InfoCode.synchronous_upload_done) 183 184 return _waiter 185 186 def __run_asynchronously(self, fn, params): 187 (waiter, caller) = async_split(lambda: fn(*params)) 188 189 def check_pending_failure(): 190 if self.__failed: 191 log.Info(u"%s: %s" % (self.__class__.__name__, 192 _(u"a previously scheduled task has failed; " 193 u"propagating the result immediately")), 194 log.InfoCode.asynchronous_upload_done) 195 self.__failed_waiter() 196 raise AssertionError(u"%s: waiter should have raised an exception; " 197 u"this is a bug" % (self.__class__.__name__,)) 198 199 def wait_for_and_register_launch(): 200 check_pending_failure() # raise on fail 201 while self.__worker_count >= self.__concurrency or self.__barrier: 202 if self.__worker_count == 0: 203 assert self.__barrier, u"barrier should be in effect" 204 self.__barrier = False 205 self.__cv.notifyAll() 206 else: 207 self.__waiter_count += 1 208 self.__cv.wait() 209 self.__waiter_count -= 1 210 211 check_pending_failure() # raise on fail 212 213 self.__worker_count += 1 214 log.Debug(u"%s: %s" % (self.__class__.__name__, 215 _(u"active workers = %d") % (self.__worker_count,))) 216 217 # simply wait for an OK condition to start, then launch our worker. the worker 218 # never waits on us, we just wait for them. 219 with_lock(self.__cv, wait_for_and_register_launch) 220 221 self.__start_worker(caller) 222 223 return waiter 224 225 def __start_worker(self, caller): 226 u""" 227 Start a new worker. 228 """ 229 def trampoline(): 230 try: 231 self.__execute_caller(caller) 232 finally: 233 def complete_worker(): 234 self.__worker_count -= 1 235 log.Debug(u"%s: %s" % (self.__class__.__name__, 236 _(u"active workers = %d") % (self.__worker_count,))) 237 self.__cv.notifyAll() 238 with_lock(self.__cv, complete_worker) 239 240 thread.start_new_thread(trampoline, ()) 241 242 def __execute_caller(self, caller): 243 # The caller half that we get here will not propagate 244 # errors back to us, but rather propagate it back to the 245 # "other half" of the async split. 246 succeeded, waiter = caller() 247 if not succeeded: 248 def _signal_failed(): 249 if not self.__failed: 250 self.__failed = True 251 self.__failed_waiter = waiter 252 self.__cv.notifyAll() 253 with_lock(self.__cv, _signal_failed) 254 255 log.Info(u"%s: %s" % (self.__class__.__name__, 256 _(u"task execution done (success: %s)") % succeeded), 257 log.InfoCode.asynchronous_upload_done) 258