1;;;; Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011, 2;;;; 2012, 2018 Free Software Foundation, Inc. 3;;;; 4;;;; This library is free software; you can redistribute it and/or 5;;;; modify it under the terms of the GNU Lesser General Public 6;;;; License as published by the Free Software Foundation; either 7;;;; version 3 of the License, or (at your option) any later version. 8;;;; 9;;;; This library is distributed in the hope that it will be useful, 10;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of 11;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12;;;; Lesser General Public License for more details. 13;;;; 14;;;; You should have received a copy of the GNU Lesser General Public 15;;;; License along with this library; if not, write to the Free Software 16;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 17;;;; 18;;;; ---------------------------------------------------------------- 19;;;; threads.scm -- User-level interface to Guile's thread system 20;;;; 4 March 1996, Anthony Green <green@cygnus.com> 21;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se> 22;;;; Modified 6 April 2001, ttn 23;;;; ---------------------------------------------------------------- 24;;;; 25 26;;; Commentary: 27 28;; This module is documented in the Guile Reference Manual. 29 30;;; Code: 31 32(define-module (ice-9 threads) 33 #:use-module (ice-9 match) 34 #:use-module (ice-9 control) 35 ;; These bindings are marked as #:replace because when deprecated code 36 ;; is enabled, (ice-9 deprecated) also exports these names. 37 ;; (Referencing one of the deprecated names prints a warning directing 38 ;; the user to these bindings.) Anyway once we can remove the 39 ;; deprecated bindings, we should use #:export instead of #:replace 40 ;; for these. 41 #:replace (call-with-new-thread 42 yield 43 cancel-thread 44 join-thread 45 thread? 46 make-mutex 47 make-recursive-mutex 48 lock-mutex 49 try-mutex 50 unlock-mutex 51 mutex? 52 mutex-owner 53 mutex-level 54 mutex-locked? 55 make-condition-variable 56 wait-condition-variable 57 signal-condition-variable 58 broadcast-condition-variable 59 condition-variable? 60 current-thread 61 all-threads 62 thread-exited? 63 total-processor-count 64 current-processor-count) 65 #:export (begin-thread 66 make-thread 67 with-mutex 68 monitor 69 70 parallel 71 letpar 72 par-map 73 par-for-each 74 n-par-map 75 n-par-for-each 76 n-for-each-par-map 77 %thread-handler)) 78 79;; Note that this extension also defines %make-transcoded-port, which is 80;; not exported but is used by (rnrs io ports). 81 82(eval-when (expand eval load) 83 (load-extension (string-append "libguile-" (effective-version)) 84 "scm_init_ice_9_threads")) 85 86 87 88(define-syntax-rule (with-mutex m e0 e1 ...) 89 (let ((x m)) 90 (dynamic-wind 91 (lambda () (lock-mutex x)) 92 (lambda () (begin e0 e1 ...)) 93 (lambda () (unlock-mutex x))))) 94 95(define cancel-tag (make-prompt-tag "cancel")) 96(define (cancel-thread thread . values) 97 "Asynchronously interrupt the target @var{thread} and ask it to 98terminate, returning the given @var{values}. @code{dynamic-wind} post 99thunks will run, but throw handlers will not. If @var{thread} has 100already terminated or been signaled to terminate, this function is a 101no-op." 102 (system-async-mark 103 (lambda () 104 (catch #t 105 (lambda () 106 (apply abort-to-prompt cancel-tag values)) 107 (lambda _ 108 (error "thread cancellation failed, throwing error instead???")))) 109 thread)) 110 111(define thread-join-data (make-object-property)) 112(define %thread-results (make-object-property)) 113 114(define* (call-with-new-thread thunk #:optional handler) 115 "Call @code{thunk} in a new thread and with a new dynamic state, 116returning a new thread object representing the thread. The procedure 117@var{thunk} is called via @code{with-continuation-barrier}. 118 119When @var{handler} is specified, then @var{thunk} is called from within 120a @code{catch} with tag @code{#t} that has @var{handler} as its handler. 121This catch is established inside the continuation barrier. 122 123Once @var{thunk} or @var{handler} returns, the return value is made the 124@emph{exit value} of the thread and the thread is terminated." 125 (let ((cv (make-condition-variable)) 126 (mutex (make-mutex)) 127 (thunk (if handler 128 (lambda () (catch #t thunk handler)) 129 thunk)) 130 (thread #f)) 131 (define (call-with-backtrace thunk) 132 (let ((err (current-error-port))) 133 (catch #t 134 (lambda () (%start-stack 'thread thunk)) 135 (lambda _ (values)) 136 (lambda (key . args) 137 ;; Narrow by three: the dispatch-exception, 138 ;; this thunk, and make-stack. 139 (let ((stack (make-stack #t 3))) 140 (false-if-exception 141 (begin 142 (when stack 143 (display-backtrace stack err)) 144 (let ((frame (and stack (stack-ref stack 0)))) 145 (print-exception err frame key args))))))))) 146 (with-mutex mutex 147 (%call-with-new-thread 148 (lambda () 149 (call-with-values 150 (lambda () 151 (call-with-prompt cancel-tag 152 (lambda () 153 (lock-mutex mutex) 154 (set! thread (current-thread)) 155 (set! (thread-join-data thread) (cons cv mutex)) 156 (signal-condition-variable cv) 157 (unlock-mutex mutex) 158 (call-with-unblocked-asyncs 159 (lambda () (call-with-backtrace thunk)))) 160 (lambda (k . args) 161 (apply values args)))) 162 (lambda vals 163 (lock-mutex mutex) 164 ;; Probably now you're wondering why we are going to use 165 ;; the cond variable as the key into the thread results 166 ;; object property. It's because there is a possibility 167 ;; that the thread object itself ends up as part of the 168 ;; result, and if that happens we create a cycle whereby 169 ;; the strong reference to a thread in the value of the 170 ;; weak-key hash table used by the object property prevents 171 ;; the thread from ever being collected. So instead we use 172 ;; the cv as the key. Weak-key hash tables, amirite? 173 (set! (%thread-results cv) vals) 174 (broadcast-condition-variable cv) 175 (unlock-mutex mutex) 176 (apply values vals))))) 177 (let lp () 178 (unless thread 179 (wait-condition-variable cv mutex) 180 (lp)))) 181 thread)) 182 183(define* (join-thread thread #:optional timeout timeoutval) 184 "Suspend execution of the calling thread until the target @var{thread} 185terminates, unless the target @var{thread} has already terminated." 186 (match (thread-join-data thread) 187 (#f (error "foreign thread cannot be joined" thread)) 188 ((cv . mutex) 189 (lock-mutex mutex) 190 (let lp () 191 (cond 192 ((%thread-results cv) 193 => (lambda (results) 194 (unlock-mutex mutex) 195 (apply values results))) 196 ((if timeout 197 (wait-condition-variable cv mutex timeout) 198 (wait-condition-variable cv mutex)) 199 (lp)) 200 (else timeoutval)))))) 201 202(define* (try-mutex mutex) 203 "Try to lock @var{mutex}. If the mutex is already locked, return 204@code{#f}. Otherwise lock the mutex and return @code{#t}." 205 (lock-mutex mutex 0)) 206 207 208 209;;; Macros first, so that the procedures expand correctly. 210 211(define-syntax-rule (begin-thread e0 e1 ...) 212 (call-with-new-thread 213 (lambda () e0 e1 ...) 214 %thread-handler)) 215 216(define-syntax-rule (make-thread proc arg ...) 217 (call-with-new-thread 218 (lambda () (proc arg ...)) 219 %thread-handler)) 220 221(define monitor-mutex-table (make-hash-table)) 222 223(define monitor-mutex-table-mutex (make-mutex)) 224 225(define (monitor-mutex-with-id id) 226 (with-mutex monitor-mutex-table-mutex 227 (or (hashq-ref monitor-mutex-table id) 228 (let ((mutex (make-mutex))) 229 (hashq-set! monitor-mutex-table id mutex) 230 mutex)))) 231 232(define-syntax monitor 233 (lambda (stx) 234 (syntax-case stx () 235 ((_ body body* ...) 236 (let ((id (datum->syntax #'body (gensym)))) 237 #`(with-mutex (monitor-mutex-with-id '#,id) 238 body body* ...)))))) 239 240(define (thread-handler tag . args) 241 (let ((n (length args)) 242 (p (current-error-port))) 243 (display "In thread:" p) 244 (newline p) 245 (if (>= n 3) 246 (display-error #f 247 p 248 (car args) 249 (cadr args) 250 (caddr args) 251 (if (= n 4) 252 (cadddr args) 253 '())) 254 (begin 255 (display "uncaught throw to " p) 256 (display tag p) 257 (display ": " p) 258 (display args p) 259 (newline p))) 260 #f)) 261 262;;; Set system thread handler 263(define %thread-handler thread-handler) 264 265(use-modules (ice-9 futures)) 266 267(define-syntax parallel 268 (lambda (x) 269 (syntax-case x () 270 ((_ e0 ...) 271 (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...))))) 272 #'(let ((tmp0 (future e0)) 273 ...) 274 (values (touch tmp0) ...))))))) 275 276(define-syntax-rule (letpar ((v e) ...) b0 b1 ...) 277 (call-with-values 278 (lambda () (parallel e ...)) 279 (lambda (v ...) 280 b0 b1 ...))) 281 282(define (par-mapper mapper cons) 283 (lambda (proc . lists) 284 (let loop ((lists lists)) 285 (match lists 286 (((heads tails ...) ...) 287 (let ((tail (future (loop tails))) 288 (head (apply proc heads))) 289 (cons head (touch tail)))) 290 (_ 291 '()))))) 292 293(define par-map (par-mapper map cons)) 294(define par-for-each (par-mapper for-each (const *unspecified*))) 295 296(define (n-par-map n proc . arglists) 297 (let* ((m (make-mutex)) 298 (threads '()) 299 (results (make-list (length (car arglists)))) 300 (result results)) 301 (do ((i 0 (+ 1 i))) 302 ((= i n) 303 (for-each join-thread threads) 304 results) 305 (set! threads 306 (cons (begin-thread 307 (let loop () 308 (lock-mutex m) 309 (if (null? result) 310 (unlock-mutex m) 311 (let ((args (map car arglists)) 312 (my-result result)) 313 (set! arglists (map cdr arglists)) 314 (set! result (cdr result)) 315 (unlock-mutex m) 316 (set-car! my-result (apply proc args)) 317 (loop))))) 318 threads))))) 319 320(define (n-par-for-each n proc . arglists) 321 (let ((m (make-mutex)) 322 (threads '())) 323 (do ((i 0 (+ 1 i))) 324 ((= i n) 325 (for-each join-thread threads)) 326 (set! threads 327 (cons (begin-thread 328 (let loop () 329 (lock-mutex m) 330 (if (null? (car arglists)) 331 (unlock-mutex m) 332 (let ((args (map car arglists))) 333 (set! arglists (map cdr arglists)) 334 (unlock-mutex m) 335 (apply proc args) 336 (loop))))) 337 threads))))) 338 339;;; The following procedure is motivated by the common and important 340;;; case where a lot of work should be done, (not too much) in parallel, 341;;; but the results need to be handled serially (for example when 342;;; writing them to a file). 343;;; 344(define (n-for-each-par-map n s-proc p-proc . arglists) 345 "Using N parallel processes, apply S-PROC in serial order on the results 346of applying P-PROC on ARGLISTS." 347 (let* ((m (make-mutex)) 348 (threads '()) 349 (no-result '(no-value)) 350 (results (make-list (length (car arglists)) no-result)) 351 (result results)) 352 (do ((i 0 (+ 1 i))) 353 ((= i n) 354 (for-each join-thread threads)) 355 (set! threads 356 (cons (begin-thread 357 (let loop () 358 (lock-mutex m) 359 (cond ((null? results) 360 (unlock-mutex m)) 361 ((not (eq? (car results) no-result)) 362 (let ((arg (car results))) 363 ;; stop others from choosing to process results 364 (set-car! results no-result) 365 (unlock-mutex m) 366 (s-proc arg) 367 (lock-mutex m) 368 (set! results (cdr results)) 369 (unlock-mutex m) 370 (loop))) 371 ((null? result) 372 (unlock-mutex m)) 373 (else 374 (let ((args (map car arglists)) 375 (my-result result)) 376 (set! arglists (map cdr arglists)) 377 (set! result (cdr result)) 378 (unlock-mutex m) 379 (set-car! my-result (apply p-proc args)) 380 (loop)))))) 381 threads))))) 382 383 384;; Now that thread support is loaded, make module autoloading 385;; thread-safe. 386(set! (@ (guile) call-with-module-autoload-lock) 387 (let ((mutex (make-mutex 'recursive))) 388 (lambda (thunk) 389 (with-mutex mutex 390 (thunk))))) 391 392;;; threads.scm ends here 393