1;;;; 	Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011,
2;;;;      2012, 2018 Free Software Foundation, Inc.
3;;;;
4;;;; This library is free software; you can redistribute it and/or
5;;;; modify it under the terms of the GNU Lesser General Public
6;;;; License as published by the Free Software Foundation; either
7;;;; version 3 of the License, or (at your option) any later version.
8;;;;
9;;;; This library is distributed in the hope that it will be useful,
10;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
11;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12;;;; Lesser General Public License for more details.
13;;;;
14;;;; You should have received a copy of the GNU Lesser General Public
15;;;; License along with this library; if not, write to the Free Software
16;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17;;;;
18;;;; ----------------------------------------------------------------
19;;;; threads.scm -- User-level interface to Guile's thread system
20;;;; 4 March 1996, Anthony Green <green@cygnus.com>
21;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se>
22;;;; Modified 6 April 2001, ttn
23;;;; ----------------------------------------------------------------
24;;;;
25
26;;; Commentary:
27
28;; This module is documented in the Guile Reference Manual.
29
30;;; Code:
31
32(define-module (ice-9 threads)
33  #:use-module (ice-9 match)
34  #:use-module (ice-9 control)
35  ;; These bindings are marked as #:replace because when deprecated code
36  ;; is enabled, (ice-9 deprecated) also exports these names.
37  ;; (Referencing one of the deprecated names prints a warning directing
38  ;; the user to these bindings.)  Anyway once we can remove the
39  ;; deprecated bindings, we should use #:export instead of #:replace
40  ;; for these.
41  #:replace (call-with-new-thread
42             yield
43             cancel-thread
44             join-thread
45             thread?
46             make-mutex
47             make-recursive-mutex
48             lock-mutex
49             try-mutex
50             unlock-mutex
51             mutex?
52             mutex-owner
53             mutex-level
54             mutex-locked?
55             make-condition-variable
56             wait-condition-variable
57             signal-condition-variable
58             broadcast-condition-variable
59             condition-variable?
60             current-thread
61             all-threads
62             thread-exited?
63             total-processor-count
64             current-processor-count)
65  #:export (begin-thread
66            make-thread
67            with-mutex
68            monitor
69
70            parallel
71            letpar
72            par-map
73            par-for-each
74            n-par-map
75            n-par-for-each
76            n-for-each-par-map
77            %thread-handler))
78
79;; Note that this extension also defines %make-transcoded-port, which is
80;; not exported but is used by (rnrs io ports).
81
82(eval-when (expand eval load)
83  (load-extension (string-append "libguile-" (effective-version))
84                  "scm_init_ice_9_threads"))
85
86
87
88(define-syntax-rule (with-mutex m e0 e1 ...)
89  (let ((x m))
90    (dynamic-wind
91      (lambda () (lock-mutex x))
92      (lambda () (begin e0 e1 ...))
93      (lambda () (unlock-mutex x)))))
94
95(define cancel-tag (make-prompt-tag "cancel"))
96(define (cancel-thread thread . values)
97  "Asynchronously interrupt the target @var{thread} and ask it to
98terminate, returning the given @var{values}.  @code{dynamic-wind} post
99thunks will run, but throw handlers will not.  If @var{thread} has
100already terminated or been signaled to terminate, this function is a
101no-op."
102  (system-async-mark
103   (lambda ()
104     (catch #t
105       (lambda ()
106         (apply abort-to-prompt cancel-tag values))
107       (lambda _
108         (error "thread cancellation failed, throwing error instead???"))))
109   thread))
110
111(define thread-join-data (make-object-property))
112(define %thread-results (make-object-property))
113
114(define* (call-with-new-thread thunk #:optional handler)
115  "Call @code{thunk} in a new thread and with a new dynamic state,
116returning a new thread object representing the thread.  The procedure
117@var{thunk} is called via @code{with-continuation-barrier}.
118
119When @var{handler} is specified, then @var{thunk} is called from within
120a @code{catch} with tag @code{#t} that has @var{handler} as its handler.
121This catch is established inside the continuation barrier.
122
123Once @var{thunk} or @var{handler} returns, the return value is made the
124@emph{exit value} of the thread and the thread is terminated."
125  (let ((cv (make-condition-variable))
126        (mutex (make-mutex))
127        (thunk (if handler
128                   (lambda () (catch #t thunk handler))
129                   thunk))
130        (thread #f))
131    (define (call-with-backtrace thunk)
132      (let ((err (current-error-port)))
133        (catch #t
134          (lambda () (%start-stack 'thread thunk))
135          (lambda _ (values))
136          (lambda (key . args)
137            ;; Narrow by three: the dispatch-exception,
138            ;; this thunk, and make-stack.
139            (let ((stack (make-stack #t 3)))
140              (false-if-exception
141               (begin
142                 (when stack
143                   (display-backtrace stack err))
144                 (let ((frame (and stack (stack-ref stack 0))))
145                   (print-exception err frame key args)))))))))
146    (with-mutex mutex
147      (%call-with-new-thread
148       (lambda ()
149         (call-with-values
150             (lambda ()
151               (call-with-prompt cancel-tag
152                 (lambda ()
153                   (lock-mutex mutex)
154                   (set! thread (current-thread))
155                   (set! (thread-join-data thread) (cons cv mutex))
156                   (signal-condition-variable cv)
157                   (unlock-mutex mutex)
158                   (call-with-unblocked-asyncs
159                    (lambda () (call-with-backtrace thunk))))
160                 (lambda (k . args)
161                   (apply values args))))
162           (lambda vals
163             (lock-mutex mutex)
164             ;; Probably now you're wondering why we are going to use
165             ;; the cond variable as the key into the thread results
166             ;; object property.  It's because there is a possibility
167             ;; that the thread object itself ends up as part of the
168             ;; result, and if that happens we create a cycle whereby
169             ;; the strong reference to a thread in the value of the
170             ;; weak-key hash table used by the object property prevents
171             ;; the thread from ever being collected.  So instead we use
172             ;; the cv as the key.  Weak-key hash tables, amirite?
173             (set! (%thread-results cv) vals)
174             (broadcast-condition-variable cv)
175             (unlock-mutex mutex)
176             (apply values vals)))))
177      (let lp ()
178        (unless thread
179          (wait-condition-variable cv mutex)
180          (lp))))
181    thread))
182
183(define* (join-thread thread #:optional timeout timeoutval)
184  "Suspend execution of the calling thread until the target @var{thread}
185terminates, unless the target @var{thread} has already terminated."
186  (match (thread-join-data thread)
187    (#f (error "foreign thread cannot be joined" thread))
188    ((cv . mutex)
189     (lock-mutex mutex)
190     (let lp ()
191       (cond
192        ((%thread-results cv)
193         => (lambda (results)
194              (unlock-mutex mutex)
195              (apply values results)))
196        ((if timeout
197             (wait-condition-variable cv mutex timeout)
198             (wait-condition-variable cv mutex))
199         (lp))
200        (else timeoutval))))))
201
202(define* (try-mutex mutex)
203  "Try to lock @var{mutex}.  If the mutex is already locked, return
204@code{#f}.  Otherwise lock the mutex and return @code{#t}."
205  (lock-mutex mutex 0))
206
207
208
209;;; Macros first, so that the procedures expand correctly.
210
211(define-syntax-rule (begin-thread e0 e1 ...)
212  (call-with-new-thread
213   (lambda () e0 e1 ...)
214   %thread-handler))
215
216(define-syntax-rule (make-thread proc arg ...)
217  (call-with-new-thread
218   (lambda () (proc arg ...))
219   %thread-handler))
220
221(define monitor-mutex-table (make-hash-table))
222
223(define monitor-mutex-table-mutex (make-mutex))
224
225(define (monitor-mutex-with-id id)
226  (with-mutex monitor-mutex-table-mutex
227    (or (hashq-ref monitor-mutex-table id)
228        (let ((mutex (make-mutex)))
229          (hashq-set! monitor-mutex-table id mutex)
230          mutex))))
231
232(define-syntax monitor
233  (lambda (stx)
234    (syntax-case stx ()
235      ((_ body body* ...)
236       (let ((id (datum->syntax #'body (gensym))))
237         #`(with-mutex (monitor-mutex-with-id '#,id)
238             body body* ...))))))
239
240(define (thread-handler tag . args)
241  (let ((n (length args))
242	(p (current-error-port)))
243    (display "In thread:" p)
244    (newline p)
245    (if (>= n 3)
246        (display-error #f
247                       p
248                       (car args)
249                       (cadr args)
250                       (caddr args)
251                       (if (= n 4)
252                           (cadddr args)
253                           '()))
254        (begin
255          (display "uncaught throw to " p)
256          (display tag p)
257          (display ": " p)
258          (display args p)
259          (newline p)))
260    #f))
261
262;;; Set system thread handler
263(define %thread-handler thread-handler)
264
265(use-modules (ice-9 futures))
266
267(define-syntax parallel
268  (lambda (x)
269    (syntax-case x ()
270      ((_ e0 ...)
271       (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
272         #'(let ((tmp0 (future e0))
273                 ...)
274             (values (touch tmp0) ...)))))))
275
276(define-syntax-rule (letpar ((v e) ...) b0 b1 ...)
277  (call-with-values
278      (lambda () (parallel e ...))
279    (lambda (v ...)
280      b0 b1 ...)))
281
282(define (par-mapper mapper cons)
283  (lambda (proc . lists)
284    (let loop ((lists lists))
285      (match lists
286        (((heads tails ...) ...)
287         (let ((tail (future (loop tails)))
288               (head (apply proc heads)))
289           (cons head (touch tail))))
290        (_
291         '())))))
292
293(define par-map (par-mapper map cons))
294(define par-for-each (par-mapper for-each (const *unspecified*)))
295
296(define (n-par-map n proc . arglists)
297  (let* ((m (make-mutex))
298	 (threads '())
299	 (results (make-list (length (car arglists))))
300	 (result results))
301    (do ((i 0 (+ 1 i)))
302	((= i n)
303	 (for-each join-thread threads)
304	 results)
305      (set! threads
306	    (cons (begin-thread
307		   (let loop ()
308		     (lock-mutex m)
309		     (if (null? result)
310			 (unlock-mutex m)
311			 (let ((args (map car arglists))
312			       (my-result result))
313			   (set! arglists (map cdr arglists))
314			   (set! result (cdr result))
315			   (unlock-mutex m)
316			   (set-car! my-result (apply proc args))
317			   (loop)))))
318		  threads)))))
319
320(define (n-par-for-each n proc . arglists)
321  (let ((m (make-mutex))
322	(threads '()))
323    (do ((i 0 (+ 1 i)))
324	((= i n)
325	 (for-each join-thread threads))
326      (set! threads
327	    (cons (begin-thread
328		   (let loop ()
329		     (lock-mutex m)
330		     (if (null? (car arglists))
331			 (unlock-mutex m)
332			 (let ((args (map car arglists)))
333			   (set! arglists (map cdr arglists))
334			   (unlock-mutex m)
335			   (apply proc args)
336			   (loop)))))
337		  threads)))))
338
339;;; The following procedure is motivated by the common and important
340;;; case where a lot of work should be done, (not too much) in parallel,
341;;; but the results need to be handled serially (for example when
342;;; writing them to a file).
343;;;
344(define (n-for-each-par-map n s-proc p-proc . arglists)
345  "Using N parallel processes, apply S-PROC in serial order on the results
346of applying P-PROC on ARGLISTS."
347  (let* ((m (make-mutex))
348	 (threads '())
349	 (no-result '(no-value))
350	 (results (make-list (length (car arglists)) no-result))
351	 (result results))
352    (do ((i 0 (+ 1 i)))
353	((= i n)
354	 (for-each join-thread threads))
355      (set! threads
356	    (cons (begin-thread
357		   (let loop ()
358		     (lock-mutex m)
359		     (cond ((null? results)
360			    (unlock-mutex m))
361			   ((not (eq? (car results) no-result))
362			    (let ((arg (car results)))
363			      ;; stop others from choosing to process results
364			      (set-car! results no-result)
365			      (unlock-mutex m)
366			      (s-proc arg)
367			      (lock-mutex m)
368			      (set! results (cdr results))
369			      (unlock-mutex m)
370			      (loop)))
371			   ((null? result)
372			    (unlock-mutex m))
373			   (else
374			    (let ((args (map car arglists))
375				  (my-result result))
376			      (set! arglists (map cdr arglists))
377			      (set! result (cdr result))
378			      (unlock-mutex m)
379			      (set-car! my-result (apply p-proc args))
380			      (loop))))))
381		  threads)))))
382
383
384;; Now that thread support is loaded, make module autoloading
385;; thread-safe.
386(set! (@ (guile) call-with-module-autoload-lock)
387  (let ((mutex (make-mutex 'recursive)))
388    (lambda (thunk)
389      (with-mutex mutex
390        (thunk)))))
391
392;;; threads.scm ends here
393