1 /* threads.c                                       -*- mode:c; coding:utf-8; -*-
2  *
3  * multi thread extensions.
4  *
5  *   Copyright (c) 2010-2015  Takashi Kato <ktakashi@ymail.com>
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *   1. Redistributions of source code must retain the above copyright
12  *      notice, this list of conditions and the following disclaimer.
13  *
14  *   2. Redistributions in binary form must reproduce the above copyright
15  *      notice, this list of conditions and the following disclaimer in the
16  *      documentation and/or other materials provided with the distribution.
17  *
18  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
24  *   TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25  *   PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26  *   LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27  *   NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28  *   SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  *
30  *  $Id: $
31  */
32 #include <math.h>
33 #include <time.h>
34 #ifdef HAVE_SYS_TIME_H
35 # include <sys/time.h>
36 #endif
37 
38 #include <sagittarius.h>
39 #define LIBSAGITTARIUS_EXT_BODY
40 #include <sagittarius/extend.h>
41 #include "threads.h"
42 
43 
Sg_MakeThread(SgProcedure * thunk,SgObject name)44 SgObject Sg_MakeThread(SgProcedure *thunk, SgObject name)
45 {
46   SgVM *current = Sg_VM(), *vm;
47   if (SG_PROCEDURE_REQUIRED(thunk) != 0) {
48     Sg_Error(UC("thunk required, but got %S"), thunk);
49   }
50   vm = Sg_NewThreadVM(current, name);
51   vm->thunk = thunk;
52   return SG_OBJ(vm);
53 }
54 
thread_cleanup_inner(SgVM * vm)55 static void thread_cleanup_inner(SgVM *vm)
56 {
57   vm->threadState = SG_VM_TERMINATED;
58   if (vm->canceller) {
59     /* this thread is cancelled */
60     vm->result = Sg_MakeTerminatedThreadException(vm, vm->canceller);
61     vm->threadErrorP = TRUE;
62   }
63   Sg_NotifyAll(&vm->cond);
64 }
65 
thread_cleanup(void * data)66 static void thread_cleanup(void *data)
67 {
68   SgVM *vm = SG_VM(data);
69   /* change this VM state to TERMINATED, and signals the change
70      to the waiting threads. */
71   Sg_LockMutex(&vm->vmlock);
72   thread_cleanup_inner(vm);
73   Sg_UnlockMutex(&vm->vmlock);
74 }
75 
thread_entry(void * data)76 static void* thread_entry(void *data)
77 {
78   SgVM *vm = SG_VM(data);
79   SgObject *stack;
80 #ifdef HAVE_ALLOCA
81   stack = (SgObject *)alloca(sizeof(SgObject) * SG_VM_STACK_SIZE);
82 #else
83   stack = SG_NEW_ARRAY(SgObject, SG_VM_STACK_SIZE);
84 #endif
85   Sg_SetVMStack(vm, stack, SG_VM_STACK_SIZE);
86   thread_cleanup_push(thread_cleanup, vm);
87   if (Sg_SetCurrentVM(vm)) {
88     SG_UNWIND_PROTECT {
89       vm->result = Sg_Apply(SG_OBJ(vm->thunk), SG_NIL);
90     } SG_WHEN_ERROR {
91       SgObject exc;
92       switch (vm->escapeReason) {
93       case SG_VM_ESCAPE_CONT:
94 	vm->result = SG_MAKE_STRING("stale continuation throws");
95 	vm->threadErrorP = TRUE;
96 	break;
97       default:
98 	Sg_Panic("unknown escape");
99       case SG_VM_ESCAPE_ERROR:
100 	exc = Sg_MakeUncaughtException(vm, SG_OBJ(vm->escapeData[1]));
101 	vm->result = exc;
102 	vm->threadErrorP = TRUE;
103 	break;
104       }
105     } SG_END_PROTECT;
106   } else {
107     /* I'm not sure if this happen */
108     /* if this happen, we can not use any Sg_Apply related methods such as
109        error creations. so just create string.
110      */
111     vm->result = SG_MAKE_STRING("set-current-vm failed");
112     vm->threadErrorP = TRUE;
113   }
114   thread_cleanup_pop(TRUE);
115   return NULL;
116 }
117 
Sg_ThreadStart(SgVM * vm)118 SgObject Sg_ThreadStart(SgVM *vm)
119 {
120   int err_state = FALSE;
121   Sg_LockMutex(&vm->vmlock);
122   if (vm->threadState != SG_VM_NEW) {
123     err_state = TRUE;
124   } else {
125     ASSERT(vm->thunk);
126     vm->threadState = SG_VM_RUNNABLE;
127     if (!Sg_InternalThreadStart(&vm->thread, (SgThreadEntryFunc *)thread_entry,
128 				vm)) {
129       vm->threadState = SG_VM_NEW;
130       err_state = TRUE;
131     }
132   }
133   Sg_UnlockMutex(&vm->vmlock);
134   if (err_state)
135     Sg_Error(UC("attempt to start an already-started thread: %S"), vm);
136   return SG_OBJ(vm);
137 }
138 
Sg_ThreadJoin(SgVM * vm,SgObject timeout,SgObject timeoutval)139 SgObject Sg_ThreadJoin(SgVM *vm, SgObject timeout, SgObject timeoutval)
140 {
141   struct timespec ts;
142   int intr = FALSE, tout = FALSE, errorP;
143   SgObject result = SG_FALSE;
144 
145   struct timespec *pts = Sg_GetTimeSpec(timeout, &ts);
146   SG_INTERNAL_MUTEX_SAFE_LOCK_BEGIN(vm->vmlock);
147   while (vm->threadState != SG_VM_TERMINATED) {
148     if (pts) {
149       int tr  = Sg_WaitWithTimeout(&vm->cond, &vm->vmlock, pts);
150       if (tr == SG_INTERNAL_COND_TIMEDOUT) {
151 	tout = TRUE;
152 	break;
153       } else if (tr == SG_INTERNAL_COND_INTR) {
154 	intr = TRUE;
155 	break;
156       }
157     } else {
158       Sg_Wait(&vm->cond, &vm->vmlock);
159     }
160   }
161   if (!tout) {
162     result = vm->result;
163     errorP = vm->threadErrorP;
164     if (errorP) {
165       vm->result = SG_UNDEF;
166       vm->threadErrorP = FALSE;
167     }
168   }
169   SG_INTERNAL_MUTEX_SAFE_LOCK_END();
170 
171   if (intr) {
172     /* TODO should this be continuable? */
173     SgObject e = Sg_MakeThreadInterruptException(vm);
174     result = Sg_Raise(e, FALSE);
175   }
176   if (tout) {
177     if (SG_UNBOUNDP(timeoutval)) {
178       SgObject e = Sg_MakeJoinTimeoutException(vm);
179       result = Sg_Raise(e, FALSE);
180     } else {
181       result = timeoutval;
182     }
183   } else if (errorP) {
184     /* TODO raise for only non-continuable? */
185     result = Sg_Raise(result, FALSE);
186   }
187   return result;
188 }
189 
Sg_ThreadSuspend(SgVM * target,SgObject timeout,SgObject timeoutval)190 SgObject Sg_ThreadSuspend(SgVM *target, SgObject timeout, SgObject timeoutval)
191 {
192   int invalid_state = FALSE;
193   SgVM *taker = NULL;
194   SgVM *vm = Sg_VM();
195   int success = TRUE;
196 
197   struct timespec ts, *pts;
198   pts = Sg_GetTimeSpec(timeout, &ts);
199 
200   Sg_LockMutex(&target->vmlock);
201   if (target->threadState != SG_VM_RUNNABLE &&
202       target->threadState != SG_VM_STOPPED) {
203     invalid_state = TRUE;
204   } else if (target->inspector != NULL &&
205 	     target->inspector != vm &&
206 	     target->inspector->threadState != SG_VM_TERMINATED) {
207     taker = target->inspector;
208   } else {
209     if (target->inspector != vm) {
210       target->inspector = vm;
211       target->stopRequest = SG_VM_REQUEST_SUSPEND;
212       target->attentionRequest = TRUE;
213     }
214     while (target->threadState != SG_VM_STOPPED) {
215       if (pts) {
216 	success = Sg_WaitWithTimeout(&target->cond, &target->vmlock, pts);
217 	break;
218       } else {
219 	success = Sg_Wait(&target->cond, &target->vmlock);
220       }
221     }
222   }
223   Sg_UnlockMutex(&target->vmlock);
224   if (invalid_state) {
225     Sg_Error(UC("cannot stop a thread %S since it is in neither runnable or stopped state"),
226 	     target);
227   }
228   if (taker != NULL) {
229     Sg_Error(UC("target %S is already under inspection by %S"), target, taker);
230   }
231   if (!success) return timeoutval;
232   return SG_OBJ(target);
233 }
234 
Sg_ThreadResume(SgVM * target)235 SgObject Sg_ThreadResume(SgVM *target)
236 {
237   int not_stopped = FALSE;
238   SgVM *stopped_by_other = NULL;
239   Sg_LockMutex(&target->vmlock);
240   if (target->inspector == NULL) {
241     not_stopped = TRUE;
242   } else if (target->inspector != Sg_VM() &&
243 	     target->inspector->threadState != SG_VM_TERMINATED) {
244     stopped_by_other = target->inspector;
245   } else {
246     target->inspector = NULL;
247     target->threadState = SG_VM_RUNNABLE;
248     target->stopRequest = FALSE;
249     Sg_NotifyAll(&target->cond);
250   }
251   Sg_UnlockMutex(&target->vmlock);
252   if (not_stopped) Sg_Error(UC("target %S is not stopped"), target);
253   if (stopped_by_other) Sg_Error(UC("target %S is stopped by other thread %S"),
254 				 target, stopped_by_other);
255   return SG_OBJ(target);
256 }
257 
Sg_ThreadSleep(SgObject timeout)258 SgObject Sg_ThreadSleep(SgObject timeout)
259 {
260   SgInternalCond dummyc;
261   SgInternalMutex dummym;
262   int intr = FALSE;
263 
264   struct timespec ts, *pts;
265   pts = Sg_GetTimeSpec(timeout, &ts);
266 
267   if (!pts) {
268     Sg_Error(UC("thread-sleep! can't take #f as timeout value: %S"), timeout);
269   }
270 
271   Sg_InitMutex(&dummym, FALSE);
272   Sg_InitCond(&dummyc);
273   Sg_LockMutex(&dummym);
274   /* sleep should sleep second not milli second */
275   intr = Sg_WaitWithTimeout(&dummyc, &dummym, pts);
276   if (intr == SG_INTERNAL_COND_INTR) {
277     /* TODO should this be continuable? */
278     SgObject e = Sg_MakeThreadInterruptException(Sg_VM());
279     Sg_Raise(e, TRUE);
280   }
281   Sg_UnlockMutex(&dummym);
282   Sg_DestroyMutex(&dummym);
283   Sg_DestroyCond(&dummyc);
284   return SG_UNDEF;
285 }
286 
wait_for_termination(SgVM * target)287 static int wait_for_termination(SgVM *target)
288 {
289   struct timespec ts;
290   int r;
291   SgObject t = Sg_MakeFlonum(0.001); /* 1ms */
292 
293   Sg_GetTimeSpec(t, &ts);
294   do {
295     r = Sg_WaitWithTimeout(&target->cond, &target->vmlock, &ts);
296   } while (r != SG_INTERNAL_COND_TIMEDOUT &&
297 	   target->threadState != SG_VM_TERMINATED);
298   return r == 0;
299 }
300 
Sg_ThreadTerminate(SgVM * target)301 SgObject Sg_ThreadTerminate(SgVM *target)
302 {
303   SgVM *vm = Sg_VM();
304   if (target == vm) {
305     /* self termination */
306     Sg_LockMutex(&target->vmlock);
307     if (target->canceller == NULL) {
308       target->canceller = vm;
309     }
310     Sg_UnlockMutex(&target->vmlock);
311     Sg_ExitThread(&target->thread, NULL);
312   }
313   Sg_LockMutex(&target->vmlock);
314   if (target->threadState == SG_VM_RUNNABLE ||
315       target->threadState == SG_VM_STOPPED) {
316     do {
317       if (target->canceller == NULL) {
318 	target->canceller = vm;
319 	target->stopRequest = SG_VM_REQUEST_TERMINATE;
320 	target->attentionRequest = TRUE;
321 
322 	if (wait_for_termination(target)) break;
323 
324 	thread_cleanup_inner(target);
325 
326 	Sg_TerminateThread(&target->thread);
327       }
328     } while (0);
329   }
330   target->threadState = SG_VM_TERMINATED;
331   Sg_UnlockMutex(&target->vmlock);
332 
333   return SG_UNDEF;
334 }
335 
Sg_ThreadInterrupt(SgVM * target)336 SgObject Sg_ThreadInterrupt(SgVM *target)
337 {
338   SgVM *vm = Sg_VM();
339   if (target == vm) {
340     Sg_AssertionViolation(SG_INTERN("thread-interrupt!"),
341 			  SG_MAKE_STRING("attempt to interrupt own"),
342 			  SG_LIST1(target));
343   }
344   if (target->threadState != SG_VM_RUNNABLE) {
345     Sg_AssertionViolation(SG_INTERN("thread-interrupt!"),
346 			  SG_MAKE_STRING("thread is not running"),
347 			  SG_LIST1(target));
348   }
349   return SG_MAKE_BOOL(Sg_InterruptThread(&target->thread));
350 }
351 
352 #if !defined HAVE_NANOSLEEP || defined(_WIN32)
nanosleep(const struct timespec * req,struct timespec * rem)353 int nanosleep(const struct timespec *req, struct timespec *rem)
354 {
355     DWORD msecs = 0;
356     time_t sec;
357     unsigned long overflow = 0, c;
358     const DWORD MSEC_OVERFLOW = 4294967; /* 4294967*1000 = 0xfffffed8 */
359 
360     /* It's very unlikely that we overflow msecs, but just in case... */
361     if (req->tv_sec > 0 || (req->tv_sec == 0 && req->tv_nsec > 0)) {
362         if (req->tv_sec >= MSEC_OVERFLOW) {
363             overflow = req->tv_sec / MSEC_OVERFLOW;
364             sec = req->tv_sec % MSEC_OVERFLOW;
365         } else {
366             sec = req->tv_sec;
367         }
368         msecs = (sec * 1000 + (req->tv_nsec + 999999)/1000000);
369     }
370     Sleep (msecs);
371     for (c = 0; c < overflow; c++) {
372         Sleep(MSEC_OVERFLOW * 1000);
373     }
374     if (rem) {
375         rem->tv_sec = rem->tv_nsec = 0;
376     }
377     return 0;
378 }
379 #endif
380 
Sg_SysNanosleep(double v)381 unsigned long Sg_SysNanosleep(double v)
382 {
383   struct timespec spec, rem;
384   spec.tv_sec = (unsigned long)floor(v / 1.0e9);
385   spec.tv_nsec = (unsigned long)fmod(v, 1.0e9);
386   while (spec.tv_nsec >= 1000000000L) {
387     spec.tv_nsec -= 1000000000L;
388     spec.tv_sec += 1;
389   }
390   rem.tv_sec = 0;
391   rem.tv_nsec = 0;
392   nanosleep(&spec, &rem);
393   if (rem.tv_sec == 0 && rem.tv_nsec == 0) return 0;
394   else {
395     /* returns remaind nanosecond */
396     unsigned long r = 0;
397     while (rem.tv_sec > 0) {
398       r += 1000000000L;
399       rem.tv_sec -= 1000000000L;
400     }
401     r += rem.tv_nsec;
402     return r;
403   }
404 }
405 
406 extern void Sg__Init_threads_stub(SgLibrary *lib);
407 SG_CDECL_BEGIN
408 extern void Sg__InitMutex(SgLibrary *lib);
409 SG_CDECL_END
410 
Sg_Init_sagittarius__threads()411 SG_EXTENSION_ENTRY void CDECL Sg_Init_sagittarius__threads()
412 {
413   SgLibrary *lib;
414   SG_INIT_EXTENSION(sagittarius__threads);
415   lib = SG_LIBRARY(Sg_FindLibrary(SG_INTERN("(sagittarius threads)"),
416 				  FALSE));
417   Sg_InitStaticClassWithMeta(SG_CLASS_VM, UC("<thread>"), lib,
418 			     NULL, SG_FALSE, NULL, 0);
419   Sg__InitMutex(lib);
420   Sg__Init_threads_stub(lib);
421 }
422 
423 /*
424   end of file
425   Local Variables:
426   coding: utf-8-unix
427   End:
428 */
429