1(import (rumble)
2        (thread))
3
4(define-syntax declare
5  (syntax-rules ()
6    [(_ id ...)
7     (begin (define id #f) ...)]))
8
9(define done? #f)
10
11(call-in-main-thread
12 (lambda ()
13   (define-syntax check
14     (syntax-rules ()
15       [(_ a b)
16        (unless (equal? a b)
17          (printf "~s: ~s vs. ~s\n" 'b a b)
18          (error 'check "failed"))]))
19
20   (declare s t0 t1 t2
21            ch ct1 ct2
22            cpt1 cpt2
23            s2
24            pc
25            ok-evt
26            sp
27            nack
28            now1 now2 now3
29            t tinf tdelay
30            tdw dw-s dw-pre? dw-body? dw-post?)
31
32   (define-syntax define
33     (syntax-rules ()
34       [(_ id rhs) (set! id rhs)]))
35
36   (check #t (thread? (current-thread)))
37   (check #t (evt? (current-thread)))
38   (define s (make-semaphore))
39   (define t0 (thread (lambda () (semaphore-wait s) (printf "__\n") (semaphore-post s))))
40   (define t1 (thread (lambda () (semaphore-wait s) (printf "hi\n") (semaphore-post s))))
41   (define t2 (thread (lambda () (printf "HI\n") (semaphore-post s))))
42   (thread-wait t0)
43   (thread-wait t1)
44   (thread-wait t2)
45
46   (define ch (make-channel))
47   (define ct1 (thread (lambda () (printf "1 ~a\n" (channel-get ch)))))
48   (define ct2 (thread (lambda () (printf "2 ~a\n" (channel-get ch)))))
49   (channel-put ch 'a)
50   (channel-put ch 'b)
51
52   (define cpt1 (thread (lambda () (channel-put ch 'c))))
53   (define cpt2 (thread (lambda () (channel-put ch 'd))))
54   (printf "3 ~a\n" (channel-get ch))
55   (printf "4 ~a\n" (channel-get ch))
56
57   (check s (sync/timeout 0 s))
58   (check #f (sync/timeout 0 s))
59
60   (define s2 (make-semaphore 3))
61   (check s2 (sync/timeout 0 s s2))
62   (check s2 (sync/timeout 0 s2 s))
63   (check 'got-s2 (sync s (wrap-evt s2 (lambda (v) (check v s2) 'got-s2))))
64   (check #f (sync/timeout 0 s2 s))
65
66   (void (thread (lambda () (channel-put ch 'c2))))
67   (check 'c2 (sync ch))
68
69   (void (thread (lambda () (check 'c3 (channel-get ch)))))
70   (define pc (channel-put-evt ch 'c3))
71   (check pc (sync pc))
72
73   (define ok-evt (guard-evt
74                   (lambda ()
75                     (define ch (make-channel))
76                     (thread (lambda () (channel-put ch 'ok)))
77                     ch)))
78   (check 'ok (sync ok-evt))
79
80   (semaphore-post s)
81   (define sp (semaphore-peek-evt s))
82   (check sp (sync/timeout 0 sp))
83   (check sp (sync/timeout 0 sp))
84   (check s (sync/timeout 0 s))
85   (check #f (sync/timeout 0 sp))
86
87   (define nack #f)
88   (check #t (semaphore? (sync (nack-guard-evt (lambda (n) (set! nack n) (make-semaphore 1))))))
89   (check #f (sync/timeout 0 nack))
90   (set! nack #f)
91   (let loop ()
92     (check 'ok (sync (nack-guard-evt (lambda (n) (set! nack n) (make-semaphore))) ok-evt))
93     (unless nack (loop)))
94   (check (void) (sync/timeout 0 nack))
95
96   (semaphore-post s)
97   (check #f (sync/timeout 0 ch (channel-put-evt ch 'oops)))
98   (check sp (sync/timeout #f ch (channel-put-evt ch 'oops) sp))
99
100   (define now1 (current-inexact-milliseconds))
101   (sleep 0.1)
102   (check #t (>= (current-inexact-milliseconds) (+ now1 0.1)))
103
104   (define now2 (current-inexact-milliseconds))
105   (define ts (thread (lambda () (sleep 0.1))))
106   (check ts (sync ts))
107   (check #t (>= (current-inexact-milliseconds) (+ now2 0.1)))
108
109   (define v 0)
110   (thread (lambda () (set! v (add1 v))))
111   (sync (system-idle-evt))
112   (check 1 v)
113
114   (define tinf (thread (lambda () (let loop () (loop)))))
115   (break-thread tinf)
116   (check tinf (sync tinf))
117   (printf "[That break was from a thread, and it's expected]\n")
118
119   (define now3 (current-inexact-milliseconds))
120   (define tdelay (with-continuation-mark
121                      break-enabled-key
122                    (make-thread-cell #f #t)
123                    (thread (lambda ()
124                              (sleep 0.1)
125                              (with-continuation-mark
126                                  break-enabled-key
127                                (make-thread-cell #t #t)
128                                (begin
129                                  ;(check-for-break)
130                                  (let loop () (loop))))))))
131   (break-thread tdelay)
132   (check tdelay (sync tdelay))
133   (printf "[That break was from a thread, and it's expected]\n")
134   (check #t (>= (current-inexact-milliseconds) (+ now3 0.1)))
135
136   (define got-here? #f)
137   (define break-self (thread (lambda ()
138                                (unsafe-start-atomic)
139                                (break-thread (current-thread))
140                                (unsafe-end-atomic)
141                                (set! got-here? #t))))
142   (check break-self (sync break-self))
143   (printf "[That break was from a thread, and it's expected]\n")
144   (check #f got-here?)
145
146   (define break-self-immediate (thread (lambda ()
147                                          (dynamic-wind
148                                              void
149                                              (lambda ()
150                                                (unsafe-start-breakable-atomic)
151                                                (break-thread (current-thread))
152                                                (set! got-here? #t))
153                                              (lambda ()
154                                                (unsafe-end-atomic))))))
155   (check break-self-immediate (sync break-self-immediate))
156   (printf "[That break was from a thread, and it's expected]\n")
157   (check #f got-here?)
158
159   ;; Make sure breaks are disabled in a `dynamic-wind` post thunk
160   (define dw-s (make-semaphore))
161   (define dw-pre? #f)
162   (define dw-body? #f)
163   (define dw-post? #f)
164   (define tdw (thread
165                (lambda ()
166                  (dynamic-wind
167                   (lambda () (semaphore-wait dw-s) (set! dw-pre? #t))
168                   (lambda () (set! dw-body? #f))
169                   (lambda () (set! dw-post? #t))))))
170   (sync (system-idle-evt))
171   (check #f dw-pre?)
172   (break-thread tdw)
173   (check #f dw-pre?)
174   (semaphore-post dw-s)
175   (sync tdw)
176   (check #t dw-pre?)
177   (check #f dw-body?)
178   (check #t dw-post?)
179
180   ;; Make sure `equal?`-based hash tables are thread-safe
181   (let* ([ht (make-hash)]
182          [s (make-semaphore)]
183          [compare-ok (semaphore-peek-evt s)]
184          [trying 0]
185          [result #f])
186     (define-values (struct:posn make-posn posn? posn-ref posn-set!)
187       (make-struct-type 'posn #f 2 0 #f (list (cons prop:equal+hash
188                                                     (list
189                                                      (lambda (a b eql?)
190                                                        (set! trying (add1 trying))
191                                                        (sync compare-ok)
192                                                        #t)
193                                                      (lambda (a hc) 0)
194                                                      (lambda (a hc) 0))))))
195     (hash-set! ht (make-posn 1 2) 11)
196     (thread (lambda ()
197               (set! result (hash-ref ht (make-posn 1 2) #f))))
198     (sync (system-idle-evt))
199     (check #f result)
200     (check 1 trying)
201     (thread (lambda ()
202               ;; Should get stuck before calling the `posn` equality function:
203               (set! result (hash-ref ht (make-posn 1 2) #f))))
204     (check #f result)
205     (check 1 trying) ; since the second thread is waiting for the table
206     (semaphore-post s)
207     (sync (system-idle-evt))
208     (check 11 result)
209     (sync (system-idle-evt))
210     (check 2 trying)) ; second thread should have completed
211
212   (let* ([place-symbols (make-hasheq)]
213          [register-place-symbol!
214           (lambda (sym proc)
215             (hash-set! place-symbols sym proc))])
216     (set-start-place!
217      (lambda (pch mod sym in out err cust plumber)
218        (lambda ()
219          ((hash-ref place-symbols sym) pch))))
220
221     (register-place-symbol! 'nothing void)
222     (let-values ([(pl1 in1 out1 err1) (dynamic-place 'dummy 'nothing #f #f #f)])
223       (check #t (place? pl1))
224       (check 0 (place-wait pl1)))
225
226     (register-place-symbol! 'exit1 (lambda (pch) (exit 1)))
227     (let-values ([(pl2 in2 out2 err2) (dynamic-place 'dummy 'exit1 #f #f #f)])
228       (check #t (place? pl2))
229       (check 1 (place-wait pl2)))
230
231     (register-place-symbol! 'loop (lambda (pch) (let loop () (loop))))
232     (let-values ([(pl3 in3 out3 err3) (dynamic-place 'dummy 'loop #f #f #f)])
233       (check #t (place? pl3))
234       (place-break pl3)
235       (check 1 (place-wait pl3))
236       (printf "[That break was from a place, and it's expected]\n"))
237
238     (let-values ([(pl4 in4 out4 err4) (dynamic-place 'dummy 'loop #f #f #f)])
239       (check #f (sync/timeout 0.01 (place-dead-evt pl4)))
240       (place-kill pl4)
241       (check 1 (place-wait pl4))
242       (check #t (evt? (sync (place-dead-evt pl4))))
243       (check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4))))))
244
245   (let ()
246     (check 'ok (touch (future (lambda () 'ok))))
247     (check 'ok (touch (would-be-future (lambda () 'ok))))
248     (check 'ok (touch (would-be-future (lambda () (touch (would-be-future (lambda () 'ok))))))))
249
250   (let ()
251     (define fts (let loop ([i 0])
252                   (if (= i 50)
253                       '()
254                       (cons
255                        (future (lambda ()
256                                  (let loop ([i i])
257                                    (if (zero? i)
258                                        i
259                                        (add1 (loop (sub1 i)))))))
260                        (loop (add1 i))))))
261     (check (let loop ([i 0])
262              (if (= i 50)
263                  '()
264                  (cons i (loop (add1 i)))))
265            (map touch fts)))
266
267   ;; Measure thread quantum:
268   #;
269   (let ([t1 (thread (lambda () (let loop () (loop))))]
270         [t2 (thread (lambda () (let loop ()
271                             (define n (current-inexact-milliseconds))
272                             (sleep)
273                             (fprintf (current-error-port) "~a\n" (- (current-inexact-milliseconds) n))
274                             (loop))))])
275     (sleep 0.5)
276     (break-thread t1)
277     (break-thread t2))
278
279   (time
280    (let ([s1 (make-semaphore)]
281          [s2 (make-semaphore)])
282      (let ([ping
283             (lambda (s1 s2)
284               (let loop ([n 1000000])
285                 (if (zero? n)
286                     'done
287                     (begin
288                       (semaphore-post s1)
289                       (semaphore-wait s2)
290                       (loop (sub1 n))))))])
291        (let ([t1 (thread (lambda () (ping s1 s2)))]
292              [t2 (thread (lambda () (ping s2 s1)))])
293          (thread-wait t1)
294          (thread-wait t2)))))
295
296   (set! done? #t)))
297
298(unless done?
299  (error 'thread-demo "something went wrong; deadlock?"))
300