1 /* threads.c -*- mode:c; coding:utf-8; -*-
2 *
3 * multi thread extensions.
4 *
5 * Copyright (c) 2010-2015 Takashi Kato <ktakashi@ymail.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 *
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
24 * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 * $Id: $
31 */
32 #include <math.h>
33 #include <time.h>
34 #ifdef HAVE_SYS_TIME_H
35 # include <sys/time.h>
36 #endif
37
38 #include <sagittarius.h>
39 #define LIBSAGITTARIUS_EXT_BODY
40 #include <sagittarius/extend.h>
41 #include "threads.h"
42
43
Sg_MakeThread(SgProcedure * thunk,SgObject name)44 SgObject Sg_MakeThread(SgProcedure *thunk, SgObject name)
45 {
46 SgVM *current = Sg_VM(), *vm;
47 if (SG_PROCEDURE_REQUIRED(thunk) != 0) {
48 Sg_Error(UC("thunk required, but got %S"), thunk);
49 }
50 vm = Sg_NewThreadVM(current, name);
51 vm->thunk = thunk;
52 return SG_OBJ(vm);
53 }
54
thread_cleanup_inner(SgVM * vm)55 static void thread_cleanup_inner(SgVM *vm)
56 {
57 vm->threadState = SG_VM_TERMINATED;
58 if (vm->canceller) {
59 /* this thread is cancelled */
60 vm->result = Sg_MakeTerminatedThreadException(vm, vm->canceller);
61 vm->threadErrorP = TRUE;
62 }
63 Sg_NotifyAll(&vm->cond);
64 }
65
thread_cleanup(void * data)66 static void thread_cleanup(void *data)
67 {
68 SgVM *vm = SG_VM(data);
69 /* change this VM state to TERMINATED, and signals the change
70 to the waiting threads. */
71 Sg_LockMutex(&vm->vmlock);
72 thread_cleanup_inner(vm);
73 Sg_UnlockMutex(&vm->vmlock);
74 }
75
thread_entry(void * data)76 static void* thread_entry(void *data)
77 {
78 SgVM *vm = SG_VM(data);
79 SgObject *stack;
80 #ifdef HAVE_ALLOCA
81 stack = (SgObject *)alloca(sizeof(SgObject) * SG_VM_STACK_SIZE);
82 #else
83 stack = SG_NEW_ARRAY(SgObject, SG_VM_STACK_SIZE);
84 #endif
85 Sg_SetVMStack(vm, stack, SG_VM_STACK_SIZE);
86 thread_cleanup_push(thread_cleanup, vm);
87 if (Sg_SetCurrentVM(vm)) {
88 SG_UNWIND_PROTECT {
89 vm->result = Sg_Apply(SG_OBJ(vm->thunk), SG_NIL);
90 } SG_WHEN_ERROR {
91 SgObject exc;
92 switch (vm->escapeReason) {
93 case SG_VM_ESCAPE_CONT:
94 vm->result = SG_MAKE_STRING("stale continuation throws");
95 vm->threadErrorP = TRUE;
96 break;
97 default:
98 Sg_Panic("unknown escape");
99 case SG_VM_ESCAPE_ERROR:
100 exc = Sg_MakeUncaughtException(vm, SG_OBJ(vm->escapeData[1]));
101 vm->result = exc;
102 vm->threadErrorP = TRUE;
103 break;
104 }
105 } SG_END_PROTECT;
106 } else {
107 /* I'm not sure if this happen */
108 /* if this happen, we can not use any Sg_Apply related methods such as
109 error creations. so just create string.
110 */
111 vm->result = SG_MAKE_STRING("set-current-vm failed");
112 vm->threadErrorP = TRUE;
113 }
114 thread_cleanup_pop(TRUE);
115 return NULL;
116 }
117
Sg_ThreadStart(SgVM * vm)118 SgObject Sg_ThreadStart(SgVM *vm)
119 {
120 int err_state = FALSE;
121 Sg_LockMutex(&vm->vmlock);
122 if (vm->threadState != SG_VM_NEW) {
123 err_state = TRUE;
124 } else {
125 ASSERT(vm->thunk);
126 vm->threadState = SG_VM_RUNNABLE;
127 if (!Sg_InternalThreadStart(&vm->thread, (SgThreadEntryFunc *)thread_entry,
128 vm)) {
129 vm->threadState = SG_VM_NEW;
130 err_state = TRUE;
131 }
132 }
133 Sg_UnlockMutex(&vm->vmlock);
134 if (err_state)
135 Sg_Error(UC("attempt to start an already-started thread: %S"), vm);
136 return SG_OBJ(vm);
137 }
138
Sg_ThreadJoin(SgVM * vm,SgObject timeout,SgObject timeoutval)139 SgObject Sg_ThreadJoin(SgVM *vm, SgObject timeout, SgObject timeoutval)
140 {
141 struct timespec ts;
142 int intr = FALSE, tout = FALSE, errorP;
143 SgObject result = SG_FALSE;
144
145 struct timespec *pts = Sg_GetTimeSpec(timeout, &ts);
146 SG_INTERNAL_MUTEX_SAFE_LOCK_BEGIN(vm->vmlock);
147 while (vm->threadState != SG_VM_TERMINATED) {
148 if (pts) {
149 int tr = Sg_WaitWithTimeout(&vm->cond, &vm->vmlock, pts);
150 if (tr == SG_INTERNAL_COND_TIMEDOUT) {
151 tout = TRUE;
152 break;
153 } else if (tr == SG_INTERNAL_COND_INTR) {
154 intr = TRUE;
155 break;
156 }
157 } else {
158 Sg_Wait(&vm->cond, &vm->vmlock);
159 }
160 }
161 if (!tout) {
162 result = vm->result;
163 errorP = vm->threadErrorP;
164 if (errorP) {
165 vm->result = SG_UNDEF;
166 vm->threadErrorP = FALSE;
167 }
168 }
169 SG_INTERNAL_MUTEX_SAFE_LOCK_END();
170
171 if (intr) {
172 /* TODO should this be continuable? */
173 SgObject e = Sg_MakeThreadInterruptException(vm);
174 result = Sg_Raise(e, FALSE);
175 }
176 if (tout) {
177 if (SG_UNBOUNDP(timeoutval)) {
178 SgObject e = Sg_MakeJoinTimeoutException(vm);
179 result = Sg_Raise(e, FALSE);
180 } else {
181 result = timeoutval;
182 }
183 } else if (errorP) {
184 /* TODO raise for only non-continuable? */
185 result = Sg_Raise(result, FALSE);
186 }
187 return result;
188 }
189
Sg_ThreadSuspend(SgVM * target,SgObject timeout,SgObject timeoutval)190 SgObject Sg_ThreadSuspend(SgVM *target, SgObject timeout, SgObject timeoutval)
191 {
192 int invalid_state = FALSE;
193 SgVM *taker = NULL;
194 SgVM *vm = Sg_VM();
195 int success = TRUE;
196
197 struct timespec ts, *pts;
198 pts = Sg_GetTimeSpec(timeout, &ts);
199
200 Sg_LockMutex(&target->vmlock);
201 if (target->threadState != SG_VM_RUNNABLE &&
202 target->threadState != SG_VM_STOPPED) {
203 invalid_state = TRUE;
204 } else if (target->inspector != NULL &&
205 target->inspector != vm &&
206 target->inspector->threadState != SG_VM_TERMINATED) {
207 taker = target->inspector;
208 } else {
209 if (target->inspector != vm) {
210 target->inspector = vm;
211 target->stopRequest = SG_VM_REQUEST_SUSPEND;
212 target->attentionRequest = TRUE;
213 }
214 while (target->threadState != SG_VM_STOPPED) {
215 if (pts) {
216 success = Sg_WaitWithTimeout(&target->cond, &target->vmlock, pts);
217 break;
218 } else {
219 success = Sg_Wait(&target->cond, &target->vmlock);
220 }
221 }
222 }
223 Sg_UnlockMutex(&target->vmlock);
224 if (invalid_state) {
225 Sg_Error(UC("cannot stop a thread %S since it is in neither runnable or stopped state"),
226 target);
227 }
228 if (taker != NULL) {
229 Sg_Error(UC("target %S is already under inspection by %S"), target, taker);
230 }
231 if (!success) return timeoutval;
232 return SG_OBJ(target);
233 }
234
Sg_ThreadResume(SgVM * target)235 SgObject Sg_ThreadResume(SgVM *target)
236 {
237 int not_stopped = FALSE;
238 SgVM *stopped_by_other = NULL;
239 Sg_LockMutex(&target->vmlock);
240 if (target->inspector == NULL) {
241 not_stopped = TRUE;
242 } else if (target->inspector != Sg_VM() &&
243 target->inspector->threadState != SG_VM_TERMINATED) {
244 stopped_by_other = target->inspector;
245 } else {
246 target->inspector = NULL;
247 target->threadState = SG_VM_RUNNABLE;
248 target->stopRequest = FALSE;
249 Sg_NotifyAll(&target->cond);
250 }
251 Sg_UnlockMutex(&target->vmlock);
252 if (not_stopped) Sg_Error(UC("target %S is not stopped"), target);
253 if (stopped_by_other) Sg_Error(UC("target %S is stopped by other thread %S"),
254 target, stopped_by_other);
255 return SG_OBJ(target);
256 }
257
Sg_ThreadSleep(SgObject timeout)258 SgObject Sg_ThreadSleep(SgObject timeout)
259 {
260 SgInternalCond dummyc;
261 SgInternalMutex dummym;
262 int intr = FALSE;
263
264 struct timespec ts, *pts;
265 pts = Sg_GetTimeSpec(timeout, &ts);
266
267 if (!pts) {
268 Sg_Error(UC("thread-sleep! can't take #f as timeout value: %S"), timeout);
269 }
270
271 Sg_InitMutex(&dummym, FALSE);
272 Sg_InitCond(&dummyc);
273 Sg_LockMutex(&dummym);
274 /* sleep should sleep second not milli second */
275 intr = Sg_WaitWithTimeout(&dummyc, &dummym, pts);
276 if (intr == SG_INTERNAL_COND_INTR) {
277 /* TODO should this be continuable? */
278 SgObject e = Sg_MakeThreadInterruptException(Sg_VM());
279 Sg_Raise(e, TRUE);
280 }
281 Sg_UnlockMutex(&dummym);
282 Sg_DestroyMutex(&dummym);
283 Sg_DestroyCond(&dummyc);
284 return SG_UNDEF;
285 }
286
wait_for_termination(SgVM * target)287 static int wait_for_termination(SgVM *target)
288 {
289 struct timespec ts;
290 int r;
291 SgObject t = Sg_MakeFlonum(0.001); /* 1ms */
292
293 Sg_GetTimeSpec(t, &ts);
294 do {
295 r = Sg_WaitWithTimeout(&target->cond, &target->vmlock, &ts);
296 } while (r != SG_INTERNAL_COND_TIMEDOUT &&
297 target->threadState != SG_VM_TERMINATED);
298 return r == 0;
299 }
300
Sg_ThreadTerminate(SgVM * target)301 SgObject Sg_ThreadTerminate(SgVM *target)
302 {
303 SgVM *vm = Sg_VM();
304 if (target == vm) {
305 /* self termination */
306 Sg_LockMutex(&target->vmlock);
307 if (target->canceller == NULL) {
308 target->canceller = vm;
309 }
310 Sg_UnlockMutex(&target->vmlock);
311 Sg_ExitThread(&target->thread, NULL);
312 }
313 Sg_LockMutex(&target->vmlock);
314 if (target->threadState == SG_VM_RUNNABLE ||
315 target->threadState == SG_VM_STOPPED) {
316 do {
317 if (target->canceller == NULL) {
318 target->canceller = vm;
319 target->stopRequest = SG_VM_REQUEST_TERMINATE;
320 target->attentionRequest = TRUE;
321
322 if (wait_for_termination(target)) break;
323
324 thread_cleanup_inner(target);
325
326 Sg_TerminateThread(&target->thread);
327 }
328 } while (0);
329 }
330 target->threadState = SG_VM_TERMINATED;
331 Sg_UnlockMutex(&target->vmlock);
332
333 return SG_UNDEF;
334 }
335
Sg_ThreadInterrupt(SgVM * target)336 SgObject Sg_ThreadInterrupt(SgVM *target)
337 {
338 SgVM *vm = Sg_VM();
339 if (target == vm) {
340 Sg_AssertionViolation(SG_INTERN("thread-interrupt!"),
341 SG_MAKE_STRING("attempt to interrupt own"),
342 SG_LIST1(target));
343 }
344 if (target->threadState != SG_VM_RUNNABLE) {
345 Sg_AssertionViolation(SG_INTERN("thread-interrupt!"),
346 SG_MAKE_STRING("thread is not running"),
347 SG_LIST1(target));
348 }
349 return SG_MAKE_BOOL(Sg_InterruptThread(&target->thread));
350 }
351
352 #if !defined HAVE_NANOSLEEP || defined(_WIN32)
nanosleep(const struct timespec * req,struct timespec * rem)353 int nanosleep(const struct timespec *req, struct timespec *rem)
354 {
355 DWORD msecs = 0;
356 time_t sec;
357 unsigned long overflow = 0, c;
358 const DWORD MSEC_OVERFLOW = 4294967; /* 4294967*1000 = 0xfffffed8 */
359
360 /* It's very unlikely that we overflow msecs, but just in case... */
361 if (req->tv_sec > 0 || (req->tv_sec == 0 && req->tv_nsec > 0)) {
362 if (req->tv_sec >= MSEC_OVERFLOW) {
363 overflow = req->tv_sec / MSEC_OVERFLOW;
364 sec = req->tv_sec % MSEC_OVERFLOW;
365 } else {
366 sec = req->tv_sec;
367 }
368 msecs = (sec * 1000 + (req->tv_nsec + 999999)/1000000);
369 }
370 Sleep (msecs);
371 for (c = 0; c < overflow; c++) {
372 Sleep(MSEC_OVERFLOW * 1000);
373 }
374 if (rem) {
375 rem->tv_sec = rem->tv_nsec = 0;
376 }
377 return 0;
378 }
379 #endif
380
Sg_SysNanosleep(double v)381 unsigned long Sg_SysNanosleep(double v)
382 {
383 struct timespec spec, rem;
384 spec.tv_sec = (unsigned long)floor(v / 1.0e9);
385 spec.tv_nsec = (unsigned long)fmod(v, 1.0e9);
386 while (spec.tv_nsec >= 1000000000L) {
387 spec.tv_nsec -= 1000000000L;
388 spec.tv_sec += 1;
389 }
390 rem.tv_sec = 0;
391 rem.tv_nsec = 0;
392 nanosleep(&spec, &rem);
393 if (rem.tv_sec == 0 && rem.tv_nsec == 0) return 0;
394 else {
395 /* returns remaind nanosecond */
396 unsigned long r = 0;
397 while (rem.tv_sec > 0) {
398 r += 1000000000L;
399 rem.tv_sec -= 1000000000L;
400 }
401 r += rem.tv_nsec;
402 return r;
403 }
404 }
405
406 extern void Sg__Init_threads_stub(SgLibrary *lib);
407 SG_CDECL_BEGIN
408 extern void Sg__InitMutex(SgLibrary *lib);
409 SG_CDECL_END
410
Sg_Init_sagittarius__threads()411 SG_EXTENSION_ENTRY void CDECL Sg_Init_sagittarius__threads()
412 {
413 SgLibrary *lib;
414 SG_INIT_EXTENSION(sagittarius__threads);
415 lib = SG_LIBRARY(Sg_FindLibrary(SG_INTERN("(sagittarius threads)"),
416 FALSE));
417 Sg_InitStaticClassWithMeta(SG_CLASS_VM, UC("<thread>"), lib,
418 NULL, SG_FALSE, NULL, 0);
419 Sg__InitMutex(lib);
420 Sg__Init_threads_stub(lib);
421 }
422
423 /*
424 end of file
425 Local Variables:
426 coding: utf-8-unix
427 End:
428 */
429