1 /* Part of SWI-Prolog
2
3 Author: Jan Wielemaker
4 E-mail: J.Wielemaker@vu.nl
5 WWW: http://www.swi-prolog.org
6 Copyright (c) 1999-2020, University of Amsterdam,
7 VU University Amsterdam
8 CWI, Amsterdam
9 All rights reserved.
10
11 Redistribution and use in source and binary forms, with or without
12 modification, are permitted provided that the following conditions
13 are met:
14
15 1. Redistributions of source code must retain the above copyright
16 notice, this list of conditions and the following disclaimer.
17
18 2. Redistributions in binary form must reproduce the above copyright
19 notice, this list of conditions and the following disclaimer in
20 the documentation and/or other materials provided with the
21 distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 POSSIBILITY OF SUCH DAMAGE.
35 */
36
37 /* #define O_DEBUG 1 */
38
39 #define _GNU_SOURCE 1 /* get recursive mutex stuff to */
40 /* compile clean with glibc. Can */
41 /* this do any harm? */
42 #ifdef __WINDOWS__
43 #include <winsock2.h>
44 #include <windows.h>
45 #include <errno.h> /* must be before pl-incl.h */
46 #endif
47
48 #if __MINGW32__
49 #define __try
50 #define __except(_) if (0)
51 #define __finally
52 #endif
53
54 #include "pl-incl.h"
55 #include "pl-tabling.h"
56 #include "os/pl-cstack.h"
57 #include "pl-prof.h"
58 #include "pl-event.h"
59 #include <stdio.h>
60 #include <math.h>
61
62 #if __WINDOWS__ /* this is a stub. Should be detected */
63 #undef HAVE_PTHREAD_SETNAME_NP /* in configure.ac */
64 #endif
65
66 #ifdef O_PLMT
67
68 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
69 MULTITHREADING SUPPORT
70
71 APPROACH
72 ========
73
74 * Prolog threads are C-threads
75 Prolog multi-threading is based upon a C thread library. At first,
76 we will concentrate on the Posix pthread standard, due to its wide
77 availability especially on Unix systems.
78
79 This approach has some clear advantages: clean mixing of thread-based
80 foreign code and portability.
81
82 * Data
83 All global data that cannot be removed is split into three large
84 structures:
85
86 + PL_code_data
87 This structure contains `code' data: data that is set up at
88 initialisation time and never changed afterwards.
89 PL_initialise() initialises this and no further precautions
90 are needed.
91
92 + PL_global_data
93 This structure contains all global data required for the
94 Prolog `heap'. This data is shared between threads and
95 access should be properly synchronised.
96
97 + PL_local_data
98 This structure contains the thread-local data. If a new
99 Prolog engine is initialised in a thread, a new copy of this
100 structure is allocated and initialised.
101
102 For compatibility reasons, we cannot pass this pointer around
103 as an argument between all functions in the system. We will
104 locate it through the thread-id using a function. Any function
105 requiring frequent access can fetch this pointer once at
106 start-up. Cooperating functions can pass this pointer.
107 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
108
109 #include <errno.h>
110 #if defined(__linux__)
111 #include <syscall.h>
112 #ifdef HAVE_GETTID_MACRO
113 _syscall0(pid_t,gettid)
114 #endif
115 #endif
116
117 #ifdef HAVE_SYS_RESOURCE_H
118 #include <sys/resource.h>
119 #endif
120
121 #ifdef HAVE_SYS_SYSCALL_H
122 #include <sys/syscall.h>
123 #endif
124
125 #ifdef HAVE_SYS_CPUSET_H
126 #include <sys/param.h> /* pulls sys/cdefs.h and sys/types.h for sys/cpuset.h */
127 #include <sys/cpuset.h> /* CPU_ZERO(), CPU_SET, cpuset_t */
128 #endif
129 #ifdef HAVE_PTHREAD_NP_H
130 #include <pthread_np.h> /* pthread_*_np */
131 #endif
132 #ifdef HAVE_CPUSET_T
133 typedef cpuset_t cpu_set_t;
134 #endif
135
136 #ifdef HAVE_SEMA_INIT /* Solaris */
137 #include <synch.h>
138
139 typedef sema_t sem_t;
140 #define sem_trywait(s) sema_trywait(s)
141 #define sem_destroy(s) sema_destroy(s)
142 #define sem_post(s) sema_post(s)
143 #define sem_init(s, type, cnt) sema_init(s, cnt, type, NULL)
144
145 #else /*HAVE_SEMA_INIT*/
146 #include <semaphore.h>
147
148 #ifndef USYNC_THREAD
149 #define USYNC_THREAD 0
150 #endif
151
152 #endif /*HAVE_SEMA_INIT*/
153
154 #ifdef USE_SEM_OPEN /* see below */
155 static sem_t *sem_canceled_ptr;
156 #else
157 static sem_t sem_canceled; /* used on halt */
158 #define sem_canceled_ptr (&sem_canceled)
159 #endif
160
161 #ifndef __WINDOWS__
162 #include <signal.h>
163
164 #ifdef USE_SEM_OPEN
165
166 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
167 Apple Darwin (6.6) only contains the sem_init() function as a stub. It
168 only provides named semaphores through sem_open(). These defines and
169 my_sem_open() try to hide the details of this as much as possible from
170 the rest of the code. Note that we unlink the semaphore right after
171 creating it, using the common Unix trick to keep access to it as long as
172 we do not close it. We assume the OS will close the semaphore as the
173 application terminates. All this is highly undesirable, but it will do
174 for now. The USE_SEM_OPEN define is set by configure based on the
175 substring "darwin" in the architecture identifier.
176 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
177
178 #define sem_init(ptr, flags, val) my_sem_open(&ptr, val)
179 #define sem_destroy(ptr) ((void)0)
180
181 static int
my_sem_open(sem_t ** ptr,unsigned int val)182 my_sem_open(sem_t **ptr, unsigned int val)
183 { if ( !*ptr )
184 { sem_t *sem = sem_open("pl", O_CREAT|O_EXCL, 0600, val);
185
186 DEBUG(MSG_THREAD, Sdprintf("sem = %p\n", sem));
187
188 if ( sem == NULL )
189 { perror("sem_open");
190 exit(1);
191 }
192
193 *ptr = sem;
194
195 sem_unlink("pl");
196 }
197
198 return 0;
199 }
200
201 #endif /*USE_SEM_OPEN*/
202
203 #ifndef SA_RESTART
204 #define SA_RESTART 0
205 #endif
206 #endif /*!__WINDOWS__*/
207
208 #ifdef __WINDOWS__
209 /* Deal with different versions of the windows thread library
210 */
211
212 static HANDLE
get_windows_thread(PL_thread_info_t * info)213 get_windows_thread(PL_thread_info_t *info)
214 {
215 #ifdef HAVE_PTHREAD_GETW32THREADHANDLE_NP
216 HANDLE wt = NULL;
217 __try
218 { wt = pthread_getw32threadhandle_np(info->tid);
219 } __except(EXCEPTION_EXECUTE_HANDLER)
220 return wt;
221 #else
222 return OpenThread(THREAD_ALL_ACCESS, FALSE, info->w32id);
223 #endif
224 }
225
226 static void
close_windows_thread(HANDLE wt)227 close_windows_thread(HANDLE wt)
228 {
229 #ifndef HAVE_PTHREAD_GETW32THREADHANDLE_NP
230 CloseHandle(wt);
231 #endif
232 }
233 #endif /*__WINDOWS__*/
234
235
236 /*******************************
237 * GLOBAL DATA *
238 *******************************/
239
240 static Table threadTable; /* name --> reference symbol */
241 static int threads_ready = FALSE; /* Prolog threads available */
242 static Table queueTable; /* name --> queue */
243 static simpleMutex queueTable_mutex; /* GC synchronization */
244 static int will_exec; /* process will exec soon */
245
246 #ifdef HAVE___THREAD
247 __thread PL_local_data_t *GLOBAL_LD;
248 #else
249 TLD_KEY PL_ldata; /* key for thread PL_local_data */
250 #endif
251
252 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
253 The global mutexes. Most are using within a module and their name is
254 simply the module-name. The idea is that a module holds a coherent bit
255 of data that needs a mutex for all operations.
256
257 Some remarks:
258
259 L_MISC
260 General-purpose mutex. Should only be used for simple, very
261 local tasks and may not be used to lock anything significant.
262
263 __WINDOWS__
264 We use native windows CRITICAL_SECTIONS for mutexes here to
265 get the best performance, notably on single-processor hardware.
266 This is selected in pl-mutex.h based on the macro
267 USE_CRITICAL_SECTIONS
268
269 Unfortunately critical sections have no static initialiser,
270 so we need something called before anything else happens. This
271 can only be DllMain().
272 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
273
274 #ifdef USE_CRITICAL_SECTIONS
275 #undef PTHREAD_MUTEX_INITIALIZER
276 #define PTHREAD_MUTEX_INITIALIZER {0}
277 #endif
278
279 #define COUNT_MUTEX_INITIALIZER(name) \
280 { PTHREAD_MUTEX_INITIALIZER, \
281 name, \
282 0L \
283 }
284
285 /* NOTE: These must be kept in sequence such that they align with
286 the #defines for L_* in pl-thread.h
287 */
288
289 counting_mutex _PL_mutexes[] =
290 { COUNT_MUTEX_INITIALIZER("L_MISC"), /* 0 */
291 COUNT_MUTEX_INITIALIZER("L_ALLOC"),
292 COUNT_MUTEX_INITIALIZER("L_REHASH_ATOMS"),
293 COUNT_MUTEX_INITIALIZER("L_FLAG"),
294 COUNT_MUTEX_INITIALIZER("L_FUNCTOR"),
295 COUNT_MUTEX_INITIALIZER("L_RECORD"),
296 COUNT_MUTEX_INITIALIZER("L_THREAD"),
297 COUNT_MUTEX_INITIALIZER("L_MUTEX"),
298 COUNT_MUTEX_INITIALIZER("L_PREDICATE"),
299 COUNT_MUTEX_INITIALIZER("L_MODULE"),
300 COUNT_MUTEX_INITIALIZER("L_SRCFILE"), /* 10 */
301 COUNT_MUTEX_INITIALIZER("L_TABLE"),
302 COUNT_MUTEX_INITIALIZER("L_BREAK"),
303 COUNT_MUTEX_INITIALIZER("L_FILE"),
304 COUNT_MUTEX_INITIALIZER("L_SEETELL"),
305 COUNT_MUTEX_INITIALIZER("L_PLFLAG"),
306 COUNT_MUTEX_INITIALIZER("L_OP"),
307 COUNT_MUTEX_INITIALIZER("L_INIT"),
308 COUNT_MUTEX_INITIALIZER("L_TERM"),
309 COUNT_MUTEX_INITIALIZER("L_FOREIGN"),
310 COUNT_MUTEX_INITIALIZER("L_OS"), /* 20 */
311 COUNT_MUTEX_INITIALIZER("L_LOCALE"),
312 COUNT_MUTEX_INITIALIZER("L_SORTR"),
313 COUNT_MUTEX_INITIALIZER("L_UMUTEX"),
314 COUNT_MUTEX_INITIALIZER("L_INIT_ATOMS"),
315 COUNT_MUTEX_INITIALIZER("L_CGCGEN"),
316 COUNT_MUTEX_INITIALIZER("L_EVHOOK"),
317 COUNT_MUTEX_INITIALIZER("L_OSDIR"),
318 COUNT_MUTEX_INITIALIZER("L_ALERT")
319 #ifdef __WINDOWS__
320 , COUNT_MUTEX_INITIALIZER("L_DDE")
321 , COUNT_MUTEX_INITIALIZER("L_CSTACK")
322 #endif
323 };
324
325
326 static void
link_mutexes(void)327 link_mutexes(void)
328 { counting_mutex *m;
329 int n = sizeof(_PL_mutexes)/sizeof(*m);
330 int i;
331
332 GD->thread.mutexes = _PL_mutexes;
333 for(i=0, m=_PL_mutexes; i<n-1; i++, m++)
334 m->next = m+1;
335 }
336
337
338 #ifdef USE_CRITICAL_SECTIONS
339
340 static void
W32initMutexes(void)341 W32initMutexes(void)
342 { static int done = FALSE;
343 counting_mutex *m;
344 int n = sizeof(_PL_mutexes)/sizeof(*m);
345 int i;
346
347 if ( done )
348 return;
349 done = TRUE;
350
351 for(i=0, m=_PL_mutexes; i<n; i++, m++)
352 simpleMutexInit(&m->mutex);
353 }
354
355 static void
deleteMutexes()356 deleteMutexes()
357 { counting_mutex *m;
358 int n = sizeof(_PL_mutexes)/sizeof(*m);
359 int i;
360
361 for(i=0, m=_PL_mutexes; i<n; i++, m++)
362 simpleMutexDelete(&m->mutex);
363 }
364
365
366 #ifdef O_SHARED_KERNEL
367
368 BOOL WINAPI
DllMain(HINSTANCE hinstDll,DWORD fdwReason,LPVOID lpvReserved)369 DllMain(HINSTANCE hinstDll, DWORD fdwReason, LPVOID lpvReserved)
370 { BOOL result = TRUE;
371
372 switch(fdwReason)
373 { case DLL_PROCESS_ATTACH:
374 GD->thread.instance = hinstDll;
375 W32initMutexes();
376 TLD_alloc(&PL_ldata);
377 break;
378 case DLL_PROCESS_DETACH:
379 deleteMutexes();
380 break;
381 case DLL_THREAD_ATTACH:
382 case DLL_THREAD_DETACH:
383 break;
384 }
385
386 return result;
387 }
388
389 #endif /*O_SHARED_KERNEL*/
390
391 #endif /*USE_CRITICAL_SECTIONS*/
392
393 static
394 PRED_IMPL("mutex_statistics", 0, mutex_statistics, 0)
395 { PRED_LD
396 counting_mutex *cm;
397 IOSTREAM *s = Scurout;
398
399 #ifdef O_CONTENTION_STATISTICS
400 Sfprintf(s, "Name locked collisions\n"
401 "----------------------------------------------------------------------------\n");
402 #else
403 Sfprintf(s, "Name locked\n"
404 "-----------------------------------------\n");
405 #endif
406 PL_LOCK(L_MUTEX);
407 for(cm = GD->thread.mutexes; cm; cm = cm->next)
408 { int lc;
409
410 if ( cm->count == 0 )
411 continue;
412
413 Sfprintf(s, "%-56Us %8d", cm->name, cm->count); /* %Us: UTF-8 string */
414 #ifdef O_CONTENTION_STATISTICS
415 Sfprintf(s, " %8d", cm->collisions);
416 #endif
417 lc = (cm == &_PL_mutexes[L_MUTEX] ? 1 : 0);
418
419 if ( cm->lock_count > lc )
420 Sfprintf(s, " LOCKS: %d\n", cm->lock_count - lc);
421 else
422 Sfprintf(s, "\n");
423 }
424 PL_UNLOCK(L_MUTEX);
425
426 succeed;
427 }
428
429
430 #ifdef PTW32_STATIC_LIB
431 static void
win_thread_initialize(void)432 win_thread_initialize(void)
433 { static int done = FALSE;
434
435 if ( done )
436 return;
437 done = TRUE;
438 ptw32_processInitialize();
439 }
440 #endif
441
442 /*******************************
443 * LOCAL PROTOTYPES *
444 *******************************/
445
446 static PL_thread_info_t *alloc_thread(void);
447 static void destroy_message_queue(message_queue *queue);
448 static void destroy_thread_message_queue(message_queue *queue);
449 static void init_message_queue(message_queue *queue, size_t max_size);
450 static size_t sizeof_message_queue(message_queue *queue);
451 static size_t sizeof_local_definitions(PL_local_data_t *ld);
452 static void freeThreadSignals(PL_local_data_t *ld);
453 static thread_handle *create_thread_handle(PL_thread_info_t *info);
454 static void free_thread_info(PL_thread_info_t *info);
455 static void set_system_thread_id(PL_thread_info_t *info);
456 static thread_handle *symbol_thread_handle(atom_t a);
457 static void destroy_interactor(thread_handle *th, int gc);
458 static PL_engine_t PL_current_engine(void);
459 static void detach_engine(PL_engine_t e);
460
461 static int unify_queue(term_t t, message_queue *q);
462 static int get_message_queue_unlocked__LD(term_t t, message_queue **queue ARG_LD);
463 static int get_message_queue__LD(term_t t, message_queue **queue ARG_LD);
464 static void release_message_queue(message_queue *queue);
465 static void initMessageQueues(void);
466 static int get_thread(term_t t, PL_thread_info_t **info, int warn);
467 static int is_alive(int status);
468 static void init_predicate_references(PL_local_data_t *ld);
469 static void free_predicate_references(PL_local_data_t *ld);
470 static int ldata_in_use(PL_local_data_t *ld);
471 #ifdef O_C_BACKTRACE
472 static void print_trace(int depth);
473 #else
474 #define print_trace(depth) (void)0
475 #endif
476
477 static void timespec_diff(struct timespec *diff,
478 const struct timespec *a, const struct timespec *b);
479 static int timespec_sign(const struct timespec *t);
480
481 /*******************************
482 * LOCAL DATA *
483 *******************************/
484
485 #undef LD
486 #define LD LOCAL_LD
487
488
489 /*******************************
490 * ERRORS *
491 *******************************/
492
493 static char *
ThError(int e)494 ThError(int e)
495 { return strerror(e);
496 }
497
498
499 /*******************************
500 * RUNTIME ENABLE/DISABLE *
501 *******************************/
502
503 int
enableThreads(int enable)504 enableThreads(int enable)
505 { if ( enable )
506 { GD->thread.enabled = TRUE; /* print system message? */
507 } else
508 { PL_LOCK(L_THREAD);
509 if ( GD->statistics.threads_created -
510 GD->statistics.threads_finished == 1 ) /* I am alone :-( */
511 { GD->thread.enabled = FALSE;
512 } else
513 { GET_LD
514 term_t key = PL_new_term_ref();
515
516 PL_put_atom(key, ATOM_threads);
517
518 PL_UNLOCK(L_THREAD);
519 return PL_error(NULL, 0, "Active threads",
520 ERR_PERMISSION,
521 ATOM_modify, ATOM_flag, key);
522 }
523 PL_UNLOCK(L_THREAD);
524 }
525
526 succeed;
527 }
528
529
530 /*******************************
531 * THREAD ALLOCATION *
532 *******************************/
533
534 static int
initialise_thread(PL_thread_info_t * info)535 initialise_thread(PL_thread_info_t *info)
536 { assert(info->thread_data);
537
538 TLD_set_LD(info->thread_data);
539
540 if ( !info->stack_limit ) info->stack_limit = GD->options.stackLimit;
541 if ( !info->table_space ) info->table_space = GD->options.tableSpace;
542
543 if ( !initPrologStacks(info->stack_limit) )
544 { info->status = PL_THREAD_NOMEM;
545 TLD_set_LD(NULL);
546 return FALSE;
547 }
548
549 initPrologLocalData(info->thread_data);
550 info->thread_data->magic = LD_MAGIC;
551
552 return TRUE;
553 }
554
555
556 static void
free_local_data(PL_local_data_t * ld)557 free_local_data(PL_local_data_t *ld)
558 { simpleMutexDelete(&ld->thread.scan_lock);
559 freeHeap(ld, sizeof(*ld));
560 }
561
562 static PL_local_data_t *ld_free_list = NULL;
563
564 static void
clean_ld_free_list(void)565 clean_ld_free_list(void)
566 { if ( ld_free_list )
567 { PL_local_data_t *ld, **prev = &ld_free_list;
568 for(ld = ld_free_list; ld; ld=*prev)
569 { if ( !ldata_in_use(ld) )
570 { *prev = ld->next_free;
571 free_local_data(ld);
572 } else
573 { prev = &ld->next_free;
574 }
575 }
576 }
577 }
578
579 static void
maybe_free_local_data(PL_local_data_t * ld)580 maybe_free_local_data(PL_local_data_t *ld)
581 { if ( !ldata_in_use(ld) )
582 { free_local_data(ld);
583 } else
584 { PL_LOCK(L_THREAD);
585 clean_ld_free_list();
586 ld->next_free = ld_free_list;
587 ld_free_list = ld;
588 PL_UNLOCK(L_THREAD);
589 }
590 }
591
592
593 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
594 free_prolog_thread()
595 Called from a cleanup-handler to release all resources associated
596 with a thread.
597 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
598
599 static void
freePrologThread(PL_local_data_t * ld,int after_fork)600 freePrologThread(PL_local_data_t *ld, int after_fork)
601 { PL_thread_info_t *info;
602 int acknowledge;
603 double time;
604 PL_local_data_t *old_ld;
605
606 { GET_LD
607 old_ld = LD;
608 }
609
610 if ( !threads_ready )
611 return; /* Post-mortem */
612
613 TLD_set_LD(ld);
614 { GET_LD
615
616 info = ld->thread.info;
617 DEBUG(MSG_THREAD, Sdprintf("Freeing prolog thread %d (status = %d)\n",
618 info->pl_tid, info->status));
619
620 if ( !after_fork )
621 { int rc1, rc2;
622
623 PL_LOCK(L_THREAD);
624 if ( info->status == PL_THREAD_RUNNING )
625 info->status = PL_THREAD_EXITED; /* foreign pthread_exit() */
626 acknowledge = (ld->exit_requested == EXIT_REQ_PROCESS);
627 PL_UNLOCK(L_THREAD);
628
629 if ( ld->stacks.argument.base ) /* are stacks initialized? */
630 { ld->critical++; /* startCritical */
631 info->in_exit_hooks = TRUE;
632 if ( LD == ld )
633 rc1 = callEventHook(PLEV_THIS_THREAD_EXIT);
634 else
635 rc1 = TRUE;
636 rc2 = callEventHook(PLEV_THREAD_EXIT, info);
637 if ( (!rc1 || !rc2) && exception_term )
638 { Sdprintf("Event hook \"thread_finished\" left an exception\n");
639 PL_write_term(Serror, exception_term, 1200,
640 PL_WRT_QUOTED|PL_WRT_NEWLINE);
641 PL_clear_exception();
642 }
643 info->in_exit_hooks = FALSE;
644 ld->critical--; /* endCritical */
645 }
646 } else
647 { acknowledge = FALSE;
648 info->detached = TRUE; /* cleanup */
649 }
650
651 #ifdef O_PROFILE
652 if ( ld->profile.active )
653 activateProfiler(FALSE, ld);
654 #endif
655
656 destroy_event_list(&ld->event.hook.onthreadexit);
657 cleanupLocalDefinitions(ld);
658
659 DEBUG(MSG_THREAD, Sdprintf("Destroying data\n"));
660 ld->magic = 0;
661 if ( ld->stacks.global.base ) /* otherwise not initialised */
662 { simpleMutexLock(&ld->thread.scan_lock);
663 freeStacks(ld);
664 simpleMutexUnlock(&ld->thread.scan_lock);
665 }
666 freePrologLocalData(ld);
667
668 /*PL_unregister_atom(ld->prompt.current);*/
669
670 freeThreadSignals(ld);
671 time = info->is_engine ? 0.0 : ThreadCPUTime(ld, CPU_USER);
672
673 if ( !after_fork )
674 { PL_LOCK(L_THREAD);
675 GD->statistics.threads_finished++;
676 assert(GD->statistics.threads_created - GD->statistics.threads_finished >= 1);
677 GD->statistics.thread_cputime += time;
678 }
679 destroy_thread_message_queue(&ld->thread.messages);
680 free_predicate_references(ld);
681 if ( ld->btrace_store )
682 { btrace_destroy(ld->btrace_store);
683 ld->btrace_store = NULL;
684 }
685 #ifdef O_LOCALE
686 if ( ld->locale.current )
687 releaseLocale(ld->locale.current);
688 #endif
689 info->thread_data = NULL; /* avoid a loop */
690 info->has_tid = FALSE; /* needed? */
691 if ( !after_fork )
692 PL_UNLOCK(L_THREAD);
693
694 if ( info->detached || acknowledge )
695 free_thread_info(info);
696
697 ld->thread.info = NULL; /* help force a crash if ld used */
698 maybe_free_local_data(ld);
699
700 if ( acknowledge ) /* == canceled */
701 { DEBUG(MSG_CLEANUP_THREAD,
702 Sdprintf("Acknowledge dead of %d\n", info->pl_tid));
703 pthread_detach(pthread_self());
704 sem_post(sem_canceled_ptr);
705 }
706 }
707
708 TLD_set_LD(old_ld);
709 }
710
711
712 static void
free_prolog_thread(void * data)713 free_prolog_thread(void *data)
714 { PL_local_data_t *ld = data;
715
716 freePrologThread(ld, FALSE);
717 }
718
719
720 #ifdef O_QUEUE_STATS
721 static void msg_statistics(void);
722 #endif
723
724 #ifdef O_ATFORK /* Not yet default */
725
726 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
727 reinit_threads_after_fork() is called after a fork in the child. Its
728 task is to restore the consistency of the thread information. It
729 performs the following tasks:
730
731 * Reinitialize all mutexes we know of
732 Just calling simpleMutexInit() seems a bit dubious, but as we have
733 no clue about their locking status, what are the alternatives?
734 * Make the thread the main (=1) thread if this is not already the
735 case. This is needed if another thread has initiated the fork.
736 * Reclaim stacks and other resources associated with other threads
737 that existed before the fork.
738
739 There are several issues with the current implementation:
740
741 * If fork/1 is called while the calling thread holds a mutex (as in
742 with_mutex/2), the consistency of this mutex will be lost.
743 * I doubt we have all mutexes. Certainly not of the packages.
744 We should check at least I/O, user-level mutexes and other mutexes
745 that are created locally.
746 * If another thread than the main thread forks, we still need to
747 properly reclaim the main-thread resources.
748 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
749
750 static void
reinit_threads_after_fork(void)751 reinit_threads_after_fork(void)
752 { GET_LD
753 counting_mutex *m;
754 PL_thread_info_t *info;
755 int i;
756
757 if ( will_exec ||
758 (GD->statistics.threads_created - GD->statistics.threads_finished) == 1)
759 return; /* no point */
760
761 info = LD->thread.info;
762
763 for(m = GD->thread.mutexes; m; m = m->next)
764 { simpleMutexInit(&m->mutex); /* Dubious */
765 m->count = 0;
766 m->lock_count = 0;
767 #ifdef O_CONTENTION_STATISTICS
768 m->collisions = 0;
769 #endif
770 }
771
772 if ( info->pl_tid != 1 )
773 { DEBUG(MSG_THREAD, Sdprintf("Forked thread %d\n", info->pl_tid));
774 *GD->thread.threads[1] = *info;
775 info->status = PL_THREAD_UNUSED;
776 info = GD->thread.threads[1];
777 LD->thread.info = info;
778 info->pl_tid = 1;
779 }
780 set_system_thread_id(info);
781
782 for(i=2; i<=GD->thread.highest_id; i++)
783 { if ( (info=GD->thread.threads[i]) )
784 { if ( info->status != PL_THREAD_UNUSED )
785 { freePrologThread(info->thread_data, TRUE);
786 info->status = PL_THREAD_UNUSED;
787 }
788 }
789 }
790 GD->thread.highest_id = 1;
791
792 GD->statistics.thread_cputime = 0.0;
793 GD->statistics.threads_created = 1;
794 GD->statistics.threads_finished = 0;
795
796 assert(PL_thread_self() == 1);
797 }
798
799 #else
800
801 #undef pthread_atfork /* may be a macro */
802 #define pthread_atfork(prep, parent, child) (void)0
803
804 #endif /*O_ATFORK*/
805
806 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
807 PL_cleanup_fork() must be called between fork() and exec() to remove
808 traces of Prolog that are not supposed to leak into the new process.
809 Note that we must be careful here. Notably, the code cannot lock or
810 unlock any mutex as the behaviour of mutexes is undefined over fork().
811
812 Earlier versions used the file-table to close file descriptors that are
813 in use by Prolog. This can't work as the table is guarded by a mutex.
814 Now we use the FD_CLOEXEC flag in Snew();
815 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
816
817 void
PL_cleanup_fork(void)818 PL_cleanup_fork(void)
819 { will_exec = TRUE;
820 stopItimer();
821 }
822
823
824 void
initPrologThreads(void)825 initPrologThreads(void)
826 { PL_thread_info_t *info;
827 static int init_ldata_key = FALSE;
828
829 initAlloc();
830
831 #if defined(USE_CRITICAL_SECTIONS) && !defined(O_SHARED_KERNEL)
832 W32initMutexes(); /* see also DllMain() */
833 #endif
834
835 #ifdef PTW32_STATIC_LIB
836 win_thread_initialize();
837 #endif
838
839 PL_LOCK(L_THREAD);
840 if ( threads_ready )
841 { PL_UNLOCK(L_THREAD);
842 return;
843 }
844
845 #ifdef O_QUEUE_STATS
846 atexit(msg_statistics);
847 #endif
848
849 if ( !init_ldata_key )
850 { init_ldata_key = TRUE;
851 #if !(defined(USE_CRITICAL_SECTIONS) && defined(O_SHARED_KERNEL))
852 #ifndef HAVE___THREAD
853 TLD_alloc(&PL_ldata); /* see also alloc_thread() */
854 #endif
855 #endif
856 }
857 TLD_set_LD(&PL_local_data);
858
859 PL_local_data.magic = LD_MAGIC;
860 { GD->thread.thread_max = 4; /* see resizeThreadMax() */
861 GD->thread.highest_allocated = 1;
862 GD->thread.threads = allocHeapOrHalt(GD->thread.thread_max *
863 sizeof(*GD->thread.threads));
864 memset(GD->thread.threads, 0,
865 GD->thread.thread_max * sizeof(*GD->thread.threads));
866 info = GD->thread.threads[1] = allocHeapOrHalt(sizeof(*info));
867 memset(info, 0, sizeof(*info));
868 info->pl_tid = 1;
869 info->debug = TRUE;
870 GD->thread.highest_id = 1;
871 info->thread_data = &PL_local_data;
872 info->status = PL_THREAD_RUNNING;
873 PL_local_data.thread.info = info;
874 PL_local_data.thread.magic = PL_THREAD_MAGIC;
875 set_system_thread_id(info);
876 init_message_queue(&PL_local_data.thread.messages, 0);
877 init_predicate_references(&PL_local_data);
878
879 GD->statistics.thread_cputime = 0.0;
880 GD->statistics.threads_created = 1;
881 pthread_mutex_init(&GD->thread.index.mutex, NULL);
882 pthread_cond_init(&GD->thread.index.cond, NULL);
883 initMutexes();
884 link_mutexes();
885 threads_ready = TRUE;
886 }
887
888 pthread_atfork(NULL, NULL, reinit_threads_after_fork);
889 initMessageQueues();
890
891 PL_UNLOCK(L_THREAD);
892 }
893
894
895 void
cleanupThreads(void)896 cleanupThreads(void)
897 { int i;
898 /*TLD_free(PL_ldata);*/ /* this causes crashes */
899
900 if ( queueTable )
901 { destroyHTable(queueTable); /* removes shared queues */
902 queueTable = NULL;
903 simpleMutexDelete(&queueTable_mutex);
904 }
905 if ( GD->thread.mutexTable )
906 { destroyHTable(GD->thread.mutexTable);
907 GD->thread.mutexTable = NULL;
908 }
909 if ( threadTable )
910 { destroyHTable(threadTable);
911 threadTable = NULL;
912 }
913 for(i=1; i<GD->thread.thread_max; i++)
914 { PL_thread_info_t *info = GD->thread.threads[i];
915
916 if ( info )
917 freeHeap(info, sizeof(*info));
918 }
919 freeHeap(GD->thread.threads,
920 GD->thread.thread_max * sizeof(*GD->thread.threads));
921 GD->thread.threads = NULL;
922 threads_ready = FALSE;
923 }
924
925
926 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
927 A first step towards clean destruction of the system. Ideally, we would
928 like the following to happen:
929
930 * Close-down all threads except for the main one
931 + Have all thread exit hooks called
932 * Run the at_halt/1 hooks in the main thread
933 * Exit from the main thread.
934
935 There are a lot of problems however.
936
937 * Somehow Halt() should always be called from the main thread
938 to have the process working properly.
939 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
940
941 int
exitPrologThreads(void)942 exitPrologThreads(void)
943 { int rc;
944 int i;
945 int me = PL_thread_self();
946 int canceled = 0;
947
948 DEBUG(MSG_THREAD, Sdprintf("exitPrologThreads(): me = %d\n", me));
949
950 sem_init(sem_canceled_ptr, USYNC_THREAD, 0);
951
952 for(i=1; i<= GD->thread.highest_id; i++)
953 { PL_thread_info_t *info = GD->thread.threads[i];
954
955 if ( info && info->thread_data && i != me )
956 { switch(info->status)
957 { case PL_THREAD_FAILED:
958 case PL_THREAD_EXITED:
959 case PL_THREAD_EXCEPTION:
960 { void *r;
961 int rc;
962
963 if ( (rc=pthread_join(info->tid, &r)) )
964 Sdprintf("Failed to join thread %d: %s\n", i, ThError(rc));
965
966 break;
967 }
968 case PL_THREAD_RUNNING:
969 { if ( !info->is_engine )
970 { info->thread_data->exit_requested = EXIT_REQ_PROCESS;
971
972 if ( info->cancel )
973 { switch( (*info->cancel)(i) )
974 { case PL_THREAD_CANCEL_FAILED:
975 break;
976 case PL_THREAD_CANCEL_MUST_JOIN:
977 canceled++;
978 /*FALLTHROUGH*/
979 case PL_THREAD_CANCEL_JOINED:
980 continue;
981 }
982 }
983
984 if ( PL_thread_raise(i, SIG_PLABORT) )
985 canceled++;
986 }
987
988 break;
989 }
990 default:
991 break;
992 }
993 }
994 }
995
996 if ( canceled > 0 )
997 { int maxwait = 10;
998
999 DEBUG(MSG_CLEANUP_THREAD, Sdprintf("Waiting for %d threads ", canceled));
1000 for(maxwait = 10; maxwait > 0 && canceled > 0; maxwait--)
1001 { while ( sem_trywait(sem_canceled_ptr) == 0 )
1002 { DEBUG(MSG_CLEANUP_THREAD, Sdprintf("."));
1003 canceled--;
1004 }
1005 if ( canceled > 0 )
1006 { DEBUG(MSG_CLEANUP_THREAD, Sdprintf("W"));
1007 Pause(0.1);
1008 }
1009 }
1010 DEBUG(MSG_CLEANUP_THREAD, Sdprintf("\nLeft: %d threads\n", canceled));
1011 }
1012
1013 if ( canceled )
1014 { GET_LD
1015 fid_t fid;
1016
1017 if ( (fid = PL_open_foreign_frame()) )
1018 { term_t head = PL_new_term_ref();
1019 term_t running = PL_new_term_ref();
1020 term_t tail = PL_copy_term_ref(running);
1021
1022 rc = TRUE;
1023 for(i = 1; i <= GD->thread.highest_id; i++)
1024 { PL_thread_info_t *info = GD->thread.threads[i];
1025
1026 if ( info && info->thread_data && i != me )
1027 { if ( info->status == PL_THREAD_RUNNING )
1028 { if ( !PL_unify_list(tail, head, tail) ||
1029 !unify_thread_id(head, info) )
1030 { rc = FALSE;
1031 break;
1032 }
1033 }
1034 }
1035 }
1036
1037 if ( rc )
1038 { rc = ( PL_unify_nil(tail) &&
1039 printMessage(ATOM_informational,
1040 PL_FUNCTOR_CHARS, "threads_not_died", 1,
1041 PL_TERM, running)
1042 );
1043 }
1044 } else
1045 { rc = FALSE;
1046 }
1047
1048 if ( !rc )
1049 Sdprintf("%d threads wouldn't die\n", canceled);
1050 rc = FALSE;
1051 } else
1052 { DEBUG(MSG_THREAD, Sdprintf("done\n"));
1053 #ifndef WIN64 /* FIXME: Hangs if nothing is printed */
1054 sem_destroy(sem_canceled_ptr);
1055 #endif
1056 rc = TRUE;
1057 }
1058
1059 threads_ready = FALSE;
1060 return rc;
1061 }
1062
1063
1064 /*******************************
1065 * ALIAS NAME *
1066 *******************************/
1067
1068 int
aliasThread(int tid,atom_t type,atom_t name)1069 aliasThread(int tid, atom_t type, atom_t name)
1070 { GET_LD
1071 PL_thread_info_t *info;
1072 thread_handle *th;
1073 int rc = TRUE;
1074
1075 PL_LOCK(L_THREAD);
1076 if ( !threadTable )
1077 threadTable = newHTable(16);
1078
1079 if ( (threadTable && lookupHTable(threadTable, (void *)name)) ||
1080 (queueTable && lookupHTable(queueTable, (void *)name)) )
1081 { term_t obj = PL_new_term_ref();
1082
1083 PL_UNLOCK(L_THREAD);
1084 PL_put_atom(obj, name);
1085 return PL_error(NULL, 0, "Alias name already taken",
1086 ERR_PERMISSION, ATOM_create, type, obj);
1087 }
1088
1089 info = GD->thread.threads[tid];
1090 if ( (th = create_thread_handle(info)) )
1091 { th->alias = name;
1092 PL_register_atom(name);
1093 PL_register_atom(info->symbol);
1094 addNewHTable(threadTable, (void *)name, (void *)info->symbol);
1095 } else
1096 { rc = PL_no_memory();
1097 }
1098 PL_UNLOCK(L_THREAD);
1099
1100 return rc;
1101 }
1102
1103
1104 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1105 PL_get_thread_alias() gets the alias name of a thread. It is intended
1106 for crash-analysis. Normal applications should use PL_unify_thread_id().
1107
1108 The implementation is wrong for two reasons. It should register the atom
1109 and this implementation should lock L_THREAD because otherwise the atom
1110 may be gone even before it is locked. However, locking is not unlikely
1111 to deadlock during crash analysis ...
1112 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1113
1114 int
PL_get_thread_alias(int tid,atom_t * alias)1115 PL_get_thread_alias(int tid, atom_t *alias)
1116 { PL_thread_info_t *info;
1117 thread_handle *th;
1118
1119 if ( tid == 0 )
1120 tid = PL_thread_self();
1121 if ( tid < 1 || tid > GD->thread.highest_id )
1122 return FALSE;
1123
1124 info = GD->thread.threads[tid];
1125 if ( info->symbol &&
1126 (th=symbol_thread_handle(info->symbol)) &&
1127 th->alias )
1128 { *alias = th->alias;
1129
1130 return TRUE;
1131 }
1132
1133 return FALSE;
1134 }
1135
1136
1137 /*******************************
1138 * GC ENGINES *
1139 *******************************/
1140
1141 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1142 gc_thread() is (indirectly) called from atom-GC if the engine is not yet
1143 fully reclaimed, we have several situations:
1144
1145 - The engine is still running. Detach it, such that it will be
1146 reclaimed silently when done.
1147 - The engine has completed. Join it.
1148 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1149
1150 static thread_handle *gced_threads = NULL;
1151 static int thread_gc_running = FALSE;
1152
1153 static void
discard_thread(thread_handle * h)1154 discard_thread(thread_handle *h)
1155 { int alive = FALSE;
1156 PL_thread_info_t *info;
1157
1158 if ( true(h, TH_IS_INTERACTOR) )
1159 { destroy_interactor(h, TRUE); /* cannot be running */
1160 return;
1161 }
1162
1163 PL_LOCK(L_THREAD);
1164 if ( (info=h->info) &&
1165 (alive=is_alive(info->status)) &&
1166 !info->detached )
1167 { if ( info->has_tid )
1168 { if ( pthread_detach(info->tid) == 0 )
1169 info->detached = TRUE;
1170 }
1171 }
1172 PL_UNLOCK(L_THREAD);
1173
1174 if ( !alive )
1175 { double delay = 0.0001;
1176
1177 if ( !info->detached )
1178 { void *r;
1179
1180 while( pthread_join(info->tid, &r) == EINTR )
1181 ;
1182 }
1183
1184 while ( info->thread_data )
1185 { Pause(delay);
1186 if ( delay < 0.01 )
1187 delay *= 2;
1188 }
1189 free_thread_info(info);
1190 }
1191 }
1192
1193
1194 static void *
thread_gc_loop(void * closure)1195 thread_gc_loop(void *closure)
1196 {
1197 #ifdef HAVE_SIGPROCMASK
1198 sigset_t set;
1199 allSignalMask(&set);
1200 pthread_sigmask(SIG_BLOCK, &set, NULL);
1201 #endif
1202
1203 for(;;)
1204 { thread_handle *h;
1205
1206 do
1207 { h = gced_threads;
1208 } while ( h && !COMPARE_AND_SWAP_PTR(&gced_threads, h, h->next_free) );
1209
1210 if ( h )
1211 { if ( GD->cleaning == CLN_NORMAL )
1212 discard_thread(h);
1213 PL_free(h);
1214 } else
1215 { break;
1216 }
1217 }
1218
1219 thread_gc_running = FALSE;
1220 return NULL;
1221 }
1222
1223
1224 static void
start_thread_gc_thread(void)1225 start_thread_gc_thread(void)
1226 { if ( !thread_gc_running )
1227 { pthread_attr_t attr;
1228 int rc;
1229 pthread_t thr;
1230
1231 pthread_attr_init(&attr);
1232 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1233 thread_gc_running = TRUE;
1234 rc = pthread_create(&thr, &attr, thread_gc_loop, NULL);
1235 pthread_attr_destroy(&attr);
1236 if ( rc != 0 )
1237 { Sdprintf("Failed to start thread gc thread\n");
1238 thread_gc_running = FALSE;
1239 }
1240 }
1241 }
1242
1243
1244 static void
gc_thread(thread_handle * ref)1245 gc_thread(thread_handle *ref)
1246 { thread_handle *h;
1247
1248 do
1249 { h = gced_threads;
1250 ref->next_free = h;
1251 } while( !COMPARE_AND_SWAP_PTR(&gced_threads, h, ref) );
1252
1253 start_thread_gc_thread();
1254 }
1255
1256
1257 /*******************************
1258 * THREAD SYMBOL *
1259 *******************************/
1260
1261 static int
write_thread_handle(IOSTREAM * s,atom_t eref,int flags)1262 write_thread_handle(IOSTREAM *s, atom_t eref, int flags)
1263 { thread_handle **refp = PL_blob_data(eref, NULL, NULL);
1264 thread_handle *ref = *refp;
1265 (void)flags;
1266
1267 Sfprintf(s, "<%s>(%d,%p)",
1268 true(ref, TH_IS_INTERACTOR) ? "engine" : "thread",
1269 ref->engine_id, ref);
1270 return TRUE;
1271 }
1272
1273
1274 static int
release_thread_handle(atom_t aref)1275 release_thread_handle(atom_t aref)
1276 { thread_handle **refp = PL_blob_data(aref, NULL, NULL);
1277 thread_handle *ref = *refp;
1278 PL_thread_info_t *info;
1279
1280 if ( (info=ref->info) )
1281 { /* assert(info->detached == FALSE || info->is_engine); TBD: Sort out */
1282 info->symbol = 0; /* TBD: Dubious */
1283 gc_thread(ref);
1284 } else
1285 PL_free(ref);
1286
1287 return TRUE;
1288 }
1289
1290
1291 static int
save_thread(atom_t aref,IOSTREAM * fd)1292 save_thread(atom_t aref, IOSTREAM *fd)
1293 { thread_handle **refp = PL_blob_data(aref, NULL, NULL);
1294 thread_handle *ref = *refp;
1295 (void)fd;
1296
1297 return PL_warning("Cannot save reference to <%s>(%d,%p)",
1298 true(ref, TH_IS_INTERACTOR) ? "engine" : "thread",
1299 ref->engine_id, ref);
1300 }
1301
1302
1303 static atom_t
load_thread(IOSTREAM * fd)1304 load_thread(IOSTREAM *fd)
1305 { (void)fd;
1306
1307 return PL_new_atom("<saved-thread-handle>");
1308 }
1309
1310
1311 static PL_blob_t thread_blob =
1312 { PL_BLOB_MAGIC,
1313 PL_BLOB_UNIQUE,
1314 "thread",
1315 release_thread_handle,
1316 NULL,
1317 write_thread_handle,
1318 NULL,
1319 save_thread,
1320 load_thread
1321 };
1322
1323
1324 /*******************************
1325 * PROLOG BINDING *
1326 *******************************/
1327
1328 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1329 Resizing max-threads. Note that we do *not* deallocate the old structure
1330 to ensure we can access GD->thread.threads[i] at any time without
1331 locking.
1332
1333 TBD: remember the old ones, such that PL_cleanup() can remove them.
1334 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1335
1336 static int
resizeThreadMax(void)1337 resizeThreadMax(void)
1338 { int newmax = GD->thread.thread_max*2;
1339 PL_thread_info_t **newinfo, **oldinfo;
1340 size_t dsize = GD->thread.thread_max * sizeof(*GD->thread.threads);
1341
1342 oldinfo = GD->thread.threads;
1343 newinfo = allocHeapOrHalt(newmax * sizeof(*GD->thread.threads));
1344 memset(addPointer(newinfo,dsize), 0, dsize);
1345 memcpy(newinfo, oldinfo, dsize);
1346 GD->thread.threads = newinfo;
1347 GD->thread.thread_max = newmax;
1348 GC_LINGER(oldinfo);
1349
1350 return TRUE;
1351 }
1352
1353
1354 /* MT: thread-safe */
1355
1356 static PL_thread_info_t *
alloc_thread(void)1357 alloc_thread(void)
1358 { PL_thread_info_t *info;
1359 PL_local_data_t *ld;
1360
1361 ld = allocHeapOrHalt(sizeof(PL_local_data_t));
1362 memset(ld, 0, sizeof(PL_local_data_t));
1363
1364 do
1365 { info = GD->thread.free;
1366 } while ( info && !COMPARE_AND_SWAP_PTR(&GD->thread.free, info, info->next_free) );
1367
1368 if ( info )
1369 { int i = info->pl_tid;
1370 assert(info->status == PL_THREAD_UNUSED);
1371 memset(info, 0, sizeof(*info));
1372 info->pl_tid = i;
1373 PL_LOCK(L_THREAD);
1374 } else
1375 { int i;
1376
1377 info = allocHeapOrHalt(sizeof(*info));
1378 memset(info, 0, sizeof(*info));
1379
1380 PL_LOCK(L_THREAD);
1381 i = info->pl_tid = ++GD->thread.highest_allocated;
1382 if ( i == GD->thread.thread_max )
1383 resizeThreadMax();
1384 if ( i > GD->thread.peak_id )
1385 GD->thread.peak_id = i;
1386
1387 assert(GD->thread.threads[i] == NULL);
1388 GD->thread.threads[i] = info;
1389 }
1390
1391 ld->thread.info = info;
1392 ld->thread.magic = PL_THREAD_MAGIC;
1393 info->thread_data = ld;
1394 info->status = PL_THREAD_RESERVED;
1395 info->debug = TRUE;
1396
1397 if ( info->pl_tid > GD->thread.highest_id )
1398 GD->thread.highest_id = info->pl_tid;
1399 PL_UNLOCK(L_THREAD);
1400
1401 ATOMIC_INC(&GD->statistics.threads_created);
1402
1403 return info;
1404 }
1405
1406
1407 int
PL_thread_self(void)1408 PL_thread_self(void)
1409 { GET_LD
1410 PL_local_data_t *ld = LD;
1411 PL_thread_info_t *info;
1412
1413 if ( ld && (info=ld->thread.info) )
1414 return info->pl_tid;
1415
1416 return -1; /* thread has no Prolog thread */
1417 }
1418
1419
1420 int
PL_unify_thread_id(term_t t,int i)1421 PL_unify_thread_id(term_t t, int i)
1422 { if ( i < 1 ||
1423 i > GD->thread.highest_id ||
1424 GD->thread.threads[i]->status == PL_THREAD_UNUSED ||
1425 GD->thread.threads[i]->status == PL_THREAD_RESERVED )
1426 return -1; /* error */
1427
1428 return unify_thread_id(t, GD->thread.threads[i]);
1429 }
1430
1431
1432 int
PL_get_thread_id_ex(term_t t,int * idp)1433 PL_get_thread_id_ex(term_t t, int *idp)
1434 { PL_thread_info_t *info;
1435
1436 if ( !get_thread(t, &info, TRUE) )
1437 return FALSE;
1438
1439 *idp = info->pl_tid;
1440
1441 return TRUE;
1442 }
1443
1444
1445
1446 #ifdef __WINDOWS__
1447
1448 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1449 PL_w32thread_raise(DWORD id, int sig)
1450 Sets the signalled mask for a specific Win32 thread. This is a
1451 partial work-around for the lack of proper asynchronous signal
1452 handling in the Win32 platform.
1453 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1454
1455 int
PL_w32thread_raise(DWORD id,int sig)1456 PL_w32thread_raise(DWORD id, int sig)
1457 { int i;
1458
1459 if ( sig < 0 || sig > MAXSIGNAL )
1460 return FALSE; /* illegal signal */
1461
1462 PL_LOCK(L_THREAD);
1463 for(i = 1; i <= GD->thread.highest_id; i++)
1464 { PL_thread_info_t *info = GD->thread.threads[i];
1465
1466 if ( info && info->w32id == id && info->thread_data )
1467 { raiseSignal(info->thread_data, sig);
1468 if ( info->w32id )
1469 PostThreadMessage(info->w32id, WM_SIGNALLED, 0, 0L);
1470 PL_UNLOCK(L_THREAD);
1471 DEBUG(MSG_THREAD, Sdprintf("Signalled %d to thread %d\n", sig, i));
1472 return TRUE;
1473 }
1474 }
1475 PL_UNLOCK(L_THREAD);
1476
1477 return FALSE; /* can't find thread */
1478 }
1479
1480 #endif /*__WINDOWS__*/
1481
1482 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1483 Make a thread stop processing blocking operations such that it can check
1484 whether it is signalled. Returns TRUE when successful, FALSE if the
1485 thread no longer exists and -1 if alerting is disabled.
1486 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1487
1488 static int
alertThread(PL_thread_info_t * info)1489 alertThread(PL_thread_info_t *info)
1490 { PL_local_data_t *ld = info->thread_data;
1491
1492 if ( ld->thread.alert.type )
1493 { int done = FALSE;
1494
1495 PL_LOCK(L_ALERT);
1496 switch(ld->thread.alert.type)
1497 { case ALERT_QUEUE_RD:
1498 cv_broadcast(&ld->thread.alert.obj.queue->cond_var);
1499 done = TRUE;
1500 break;
1501 case ALERT_QUEUE_WR:
1502 cv_broadcast(&ld->thread.alert.obj.queue->drain_var);
1503 done = TRUE;
1504 break;
1505 }
1506 PL_UNLOCK(L_ALERT);
1507 if ( done )
1508 return TRUE;
1509 }
1510
1511 #ifdef __WINDOWS__
1512 if ( info->w32id )
1513 { PostThreadMessage(info->w32id, WM_SIGNALLED, 0, 0L);
1514 return TRUE; /* NOTE: PostThreadMessage() can */
1515 /* fail if thread is being created */
1516 }
1517 #else
1518 if ( info->has_tid && GD->signals.sig_alert )
1519 return pthread_kill(info->tid, GD->signals.sig_alert) == 0;
1520 #endif
1521 return -1;
1522 }
1523
1524 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1525 PL_thread_raise() raises a synchronous signal in (usually) another
1526 thread.
1527
1528 (*) HACK. We should not lock L_THREAD in this function but we do need to
1529 prevent ld from dropping while we process it. Possibly we should move
1530 the signal mask to the info structure? This patch is badly needed as the
1531 system now crashes on any alrm from library(time). We need something
1532 better though.
1533 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1534
1535 int
PL_thread_raise(int tid,int sig)1536 PL_thread_raise(int tid, int sig)
1537 { if ( tid >= 1 && tid <= GD->thread.highest_id )
1538 { PL_thread_info_t *info = GD->thread.threads[tid];
1539
1540 if ( info &&
1541 info->status != PL_THREAD_UNUSED &&
1542 info->status != PL_THREAD_RESERVED )
1543 { GET_LD
1544 PL_local_data_t *ld;
1545 int rc;
1546
1547 if ( LD ) /* See (*) */
1548 ld = acquire_ldata(info);
1549 else
1550 ld = info->thread_data;
1551
1552 rc = ( ld &&
1553 ld->magic == LD_MAGIC &&
1554 raiseSignal(ld, sig) &&
1555 (alertThread(info) != FALSE) );
1556
1557 if ( LD )
1558 release_ldata(ld);
1559
1560 return rc;
1561 } else
1562 { return FALSE;
1563 }
1564 }
1565
1566 return FALSE;
1567 }
1568
1569
1570 int
thread_wait_signal(ARG1_LD)1571 thread_wait_signal(ARG1_LD)
1572 { int i;
1573
1574 while( !is_signalled(PASS_LD1) )
1575 {
1576 #ifdef __WINDOWS__
1577 MSG msg;
1578 if ( !GetMessage(&msg, (HWND)-1, WM_SIGNALLED, WM_SIGNALLED) )
1579 return -1;
1580 #else
1581 sigset_t set;
1582 int sig;
1583
1584 sigemptyset(&set);
1585 sigaddset(&set, GD->signals.sig_alert);
1586 sigwait(&set, &sig);
1587 #endif
1588 }
1589
1590 for(i=0; i<2; i++)
1591 { while( LD->signal.pending[i] )
1592 { int sig = 1+32*i;
1593 int mask = 1;
1594
1595 for( ; mask ; mask <<= 1, sig++ )
1596 { if ( LD->signal.pending[i] & mask )
1597 { __atomic_and_fetch(&LD->signal.pending[i], ~mask, __ATOMIC_SEQ_CST);
1598
1599 if ( sig == SIG_THREAD_SIGNAL )
1600 { dispatch_signal(sig, TRUE);
1601 if ( exception_term )
1602 return -1;
1603 } else
1604 { return sig;
1605 }
1606 }
1607 }
1608 }
1609 }
1610
1611 return -1; /* cannot happen */
1612 }
1613
1614 static
1615 PRED_IMPL("$thread_sigwait", 1, thread_sigwait, 0)
1616 { PRED_LD
1617 int sig;
1618
1619 if ( (sig = thread_wait_signal(PASS_LD1)) >= 0 )
1620 return PL_unify_atom_chars(A1, signal_name(sig));
1621
1622 return FALSE;
1623 }
1624
1625
1626
1627 const char *
threadName(int id)1628 threadName(int id)
1629 { PL_thread_info_t *info;
1630 thread_handle *th;
1631 char tmp[16];
1632
1633 if ( id == 0 )
1634 id = PL_thread_self();
1635 if ( id < 0 )
1636 return "[Not a prolog thread]";
1637
1638 info = GD->thread.threads[id];
1639 if ( info->symbol &&
1640 (th=symbol_thread_handle(info->symbol)) &&
1641 th->alias )
1642 return PL_atom_chars(th->alias);
1643
1644 sprintf(tmp, "%d", id);
1645 return buffer_string(tmp, BUF_STACK);
1646 }
1647
1648
1649 intptr_t
system_thread_id(PL_thread_info_t * info)1650 system_thread_id(PL_thread_info_t *info)
1651 { if ( !info )
1652 { GET_LD
1653 if ( LD )
1654 info = LD->thread.info;
1655 else
1656 return -1;
1657 }
1658 #ifdef PID_IDENTIFIES_THREAD
1659 return info->pid;
1660 #else
1661 #ifdef __WINDOWS__
1662 return info->w32id;
1663 #else
1664 return (intptr_t)info->tid;
1665 #endif
1666 #endif
1667 }
1668
1669 static void
set_system_thread_id(PL_thread_info_t * info)1670 set_system_thread_id(PL_thread_info_t *info)
1671 {
1672 info->tid = pthread_self();
1673 info->has_tid = TRUE;
1674 #if defined(HAVE_GETTID_SYSCALL)
1675 info->pid = syscall(__NR_gettid);
1676 #elif defined(HAVE_GETTID_MACRO)
1677 info->pid = gettid();
1678 #elif defined(__CYGWIN__)
1679 info->pid = getpid();
1680 #elif defined(__WINDOWS__)
1681 info->w32id = GetCurrentThreadId();
1682 #endif
1683 }
1684
1685
1686 static int
set_os_thread_name_from_charp(const char * s)1687 set_os_thread_name_from_charp(const char *s)
1688 {
1689 #ifdef HAVE_PTHREAD_SETNAME_NP
1690 char name[16];
1691
1692 strncpy(name, s, 15);
1693 name[15] = EOS;
1694
1695 #ifdef HAVE_PTHREAD_SETNAME_NP_WITH_TID
1696 if ( pthread_setname_np(pthread_self(), name) == 0 )
1697 return TRUE;
1698 #elif HAVE_PTHREAD_SETNAME_NP_WITH_TID_AND_ARG
1699 if ( pthread_setname_np(pthread_self(), "%s", (void *)name) == 0 )
1700 return TRUE;
1701 #else
1702 if ( pthread_setname_np(name) == 0 )
1703 return TRUE;
1704 #endif
1705 #endif
1706 return FALSE;
1707 }
1708
1709
1710 static int
set_os_thread_name(atom_t alias)1711 set_os_thread_name(atom_t alias)
1712 {
1713 #ifdef HAVE_PTHREAD_SETNAME_NP
1714 GET_LD
1715 term_t t = PL_new_term_ref();
1716 PL_put_atom(t, alias);
1717 char *s;
1718
1719 if ( PL_get_chars(t, &s, CVT_ATOM|REP_MB|BUF_DISCARDABLE) )
1720 return set_os_thread_name_from_charp(s);
1721 #endif
1722 return FALSE;
1723 }
1724
1725
1726 static const opt_spec make_thread_options[] =
1727 { { ATOM_alias, OPT_ATOM },
1728 { ATOM_debug, OPT_BOOL },
1729 { ATOM_detached, OPT_BOOL },
1730 { ATOM_stack_limit, OPT_SIZE },
1731 { ATOM_c_stack, OPT_SIZE },
1732 { ATOM_at_exit, OPT_TERM },
1733 { ATOM_inherit_from, OPT_TERM },
1734 { ATOM_affinity, OPT_TERM },
1735 { ATOM_queue_max_size, OPT_SIZE },
1736 { NULL_ATOM, 0 }
1737 };
1738
1739
1740 static void
set_thread_completion(PL_thread_info_t * info,int rc,term_t ex)1741 set_thread_completion(PL_thread_info_t *info, int rc, term_t ex)
1742 { PL_LOCK(L_THREAD);
1743 if ( rc )
1744 { info->status = PL_THREAD_SUCCEEDED;
1745 } else
1746 { if ( ex )
1747 { if ( info->detached )
1748 info->return_value = 0;
1749 else
1750 info->return_value = PL_record(ex);
1751 info->status = PL_THREAD_EXCEPTION;
1752 } else
1753 { info->status = PL_THREAD_FAILED;
1754 }
1755 }
1756 PL_UNLOCK(L_THREAD);
1757 }
1758
1759
1760 static void *
start_thread(void * closure)1761 start_thread(void *closure)
1762 { PL_thread_info_t *info = closure;
1763 thread_handle *th;
1764 term_t ex, goal;
1765 int rval;
1766
1767 assert(info->goal);
1768 blockSignal(SIGINT); /* only the main thread processes */
1769 /* Control-C */
1770 set_system_thread_id(info); /* early to get exit code ok */
1771
1772 if ( !initialise_thread(info) )
1773 return (void *)FALSE;
1774
1775 { GET_LD
1776
1777 pthread_cleanup_push(free_prolog_thread, info->thread_data);
1778
1779 PL_LOCK(L_THREAD);
1780 info->status = PL_THREAD_RUNNING;
1781 PL_UNLOCK(L_THREAD);
1782
1783 if ( info->symbol &&
1784 (th=symbol_thread_handle(info->symbol)) &&
1785 th->alias )
1786 set_os_thread_name(th->alias);
1787
1788 goal = PL_new_term_ref();
1789 PL_put_atom(goal, ATOM_dthread_init);
1790
1791 rval = callProlog(MODULE_system, goal, PL_Q_CATCH_EXCEPTION, &ex);
1792
1793 if ( rval )
1794 { if ( !PL_recorded(info->goal, goal) )
1795 { rval = raiseStackOverflow(GLOBAL_OVERFLOW);
1796 ex = exception_term;
1797 } else
1798 { rval = callProlog(info->module, goal, PL_Q_CATCH_EXCEPTION, &ex);
1799 }
1800 }
1801
1802 if ( !rval && info->detached )
1803 { if ( ex )
1804 { int print = TRUE;
1805
1806 if ( LD->exit_requested )
1807 { if ( classify_exception(ex) == EXCEPT_ABORT )
1808 print = FALSE;
1809 }
1810
1811 if ( print )
1812 { if ( !printMessage(ATOM_warning,
1813 PL_FUNCTOR_CHARS, "abnormal_thread_completion", 2,
1814 PL_TERM, goal,
1815 PL_FUNCTOR, FUNCTOR_exception1,
1816 PL_TERM, ex) )
1817 PL_clear_exception(); /* The thread is dead anyway */
1818 }
1819 } else
1820 { if ( !printMessage(ATOM_warning,
1821 PL_FUNCTOR_CHARS, "abnormal_thread_completion", 2,
1822 PL_TERM, goal,
1823 PL_ATOM, ATOM_fail) )
1824 PL_clear_exception(); /* The thread is dead anyway */
1825 }
1826 }
1827
1828 set_thread_completion(info, rval, ex);
1829 pthread_cleanup_pop(1);
1830 }
1831
1832 return (void *)TRUE;
1833 }
1834
1835
1836 static void
copy_local_data(PL_local_data_t * ldnew,PL_local_data_t * ldold,size_t max_queue_size)1837 copy_local_data(PL_local_data_t *ldnew, PL_local_data_t *ldold,
1838 size_t max_queue_size)
1839 { GET_LD
1840
1841 if ( !LD )
1842 TLD_set_LD(ldnew);
1843
1844 PL_register_atom(ldold->prompt.current);
1845 ldnew->prompt = ldold->prompt;
1846 if ( ldold->prompt.first )
1847 { ldnew->prompt.first = ldold->prompt.first;
1848 PL_register_atom(ldnew->prompt.first);
1849 }
1850 ldnew->modules = ldold->modules;
1851 ldnew->IO = ldold->IO;
1852 ldnew->IO.input_stack = NULL;
1853 ldnew->IO.output_stack = NULL;
1854 ldnew->encoding = ldold->encoding;
1855 #ifdef O_LOCALE
1856 ldnew->locale.current = acquireLocale(ldold->locale.current);
1857 #endif
1858 ldnew->_debugstatus = ldold->_debugstatus;
1859 ldnew->_debugstatus.retryFrame = 0;
1860 ldnew->_debugstatus.suspendTrace= 0;
1861 if ( ldold->_debugstatus.skiplevel != SKIP_VERY_DEEP )
1862 { ldnew->_debugstatus.debugging = DBG_OFF;
1863 ldnew->_debugstatus.tracing = FALSE;
1864 ldnew->_debugstatus.skiplevel = SKIP_VERY_DEEP;
1865 }
1866
1867 alloc_pool *pool = ldold->tabling.node_pool;
1868 if ( pool )
1869 ldnew->tabling.node_pool = new_alloc_pool(pool->name, pool->limit);
1870 ldnew->fli.string_buffers.tripwire
1871 = ldold->fli.string_buffers.tripwire;
1872 ldnew->statistics.start_time = WallTime();
1873 ldnew->prolog_flag.mask = ldold->prolog_flag.mask;
1874 ldnew->prolog_flag.occurs_check = ldold->prolog_flag.occurs_check;
1875 ldnew->prolog_flag.access_level = ldold->prolog_flag.access_level;
1876 #ifdef O_GMP
1877 ldnew->arith.rat = ldold->arith.rat;
1878 #endif
1879 ldnew->arith.f = ldold->arith.f;
1880 if ( ldold->prolog_flag.table )
1881 { PL_LOCK(L_PLFLAG);
1882 ldnew->prolog_flag.table = copyHTable(ldold->prolog_flag.table);
1883 PL_UNLOCK(L_PLFLAG);
1884 }
1885 ldnew->tabling.restraint = ldold->tabling.restraint;
1886 if ( !ldnew->thread.info->debug )
1887 { ldnew->_debugstatus.tracing = FALSE;
1888 ldnew->_debugstatus.debugging = DBG_OFF;
1889 set(&ldnew->prolog_flag.mask, PLFLAG_LASTCALL);
1890 }
1891 init_message_queue(&ldnew->thread.messages, max_queue_size);
1892 init_predicate_references(ldnew);
1893 }
1894
1895
1896 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1897 The pthread stacksize must be a multiple of the page size on some
1898 systems. We do the rounding here. If we do not know the page size we use
1899 8192, which should typically be a multiple of the page size.
1900 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1901
1902 static size_t
round_pages(size_t n)1903 round_pages(size_t n)
1904 { size_t psize;
1905
1906 #if defined(HAVE_SYSCONF) && defined(_SC_PAGESIZE)
1907 if ( (psize = sysconf(_SC_PAGESIZE)) == (size_t)-1 )
1908 psize = 8192;
1909 #else
1910 psize = 8192;
1911 #endif
1912
1913 return ROUND(n, psize);
1914 }
1915
1916
1917 #if defined(HAVE_PTHREAD_ATTR_SETAFFINITY_NP) || defined(HAVE_SCHED_SETAFFINITY)
1918
1919 static int
get_cpuset(term_t affinity,cpu_set_t * set)1920 get_cpuset(term_t affinity, cpu_set_t *set)
1921 { GET_LD
1922 term_t head, tail;
1923 int n=0;
1924 int cpu_count = CpuCount();
1925
1926 if ( !(tail = PL_copy_term_ref(affinity)) ||
1927 !(head = PL_new_term_ref()) )
1928 return FALSE;
1929
1930 CPU_ZERO(set);
1931 while(PL_get_list_ex(tail, head, tail))
1932 { int i;
1933
1934 if ( !PL_get_integer_ex(head, &i) )
1935 return FALSE;
1936 if ( i < 0 )
1937 return PL_domain_error("not_less_than_zero", head);
1938 if ( i >= cpu_count )
1939 return PL_existence_error("cpu", head);
1940
1941 CPU_SET(i, set);
1942
1943 if ( n++ == 100 && !PL_is_acyclic(tail) )
1944 return PL_type_error("list", tail);
1945 }
1946 if ( !PL_get_nil_ex(tail) )
1947 return FALSE;
1948
1949 if ( n == 0 )
1950 return PL_domain_error("cpu_affinity", affinity);
1951
1952 return TRUE;
1953 }
1954
1955 #endif /*defined(HAVE_PTHREAD_ATTR_SETAFFINITY_NP) || defined(HAVE_SCHED_SETAFFINITY)*/
1956
1957 static int
set_affinity(term_t affinity,pthread_attr_t * attr)1958 set_affinity(term_t affinity, pthread_attr_t *attr)
1959 {
1960 #ifdef HAVE_PTHREAD_ATTR_SETAFFINITY_NP
1961 cpu_set_t cpuset;
1962
1963 if ( !get_cpuset(affinity, &cpuset) )
1964 return EINVAL;
1965
1966 return pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
1967 #endif
1968
1969 return 0;
1970 }
1971
1972
1973 word
pl_thread_create(term_t goal,term_t id,term_t options)1974 pl_thread_create(term_t goal, term_t id, term_t options)
1975 { GET_LD
1976 PL_thread_info_t *info;
1977 thread_handle *th;
1978 PL_local_data_t *ldnew, *ldold = LD;
1979 atom_t alias = NULL_ATOM, idname;
1980 pthread_attr_t attr;
1981 size_t stack = 0;
1982 size_t c_stack = 0;
1983 term_t inherit_from = 0;
1984 term_t at_exit = 0;
1985 term_t affinity = 0;
1986 size_t queue_max_size = 0;
1987 int rc = 0;
1988 const char *func;
1989 int debug = -1;
1990 int detached = FALSE;
1991
1992 if ( !PL_is_callable(goal) )
1993 return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_callable, goal);
1994
1995 if ( !GD->thread.enabled || GD->cleaning != CLN_NORMAL )
1996 return PL_error(NULL, 0, "threading disabled",
1997 ERR_PERMISSION,
1998 ATOM_create, ATOM_thread, goal);
1999
2000 if ( !(info = alloc_thread()) )
2001 return PL_error(NULL, 0, NULL, ERR_RESOURCE, ATOM_threads);
2002
2003 ldnew = info->thread_data;
2004
2005 if ( !scan_options(options, 0, /*OPT_ALL,*/
2006 ATOM_thread_option, make_thread_options,
2007 &alias,
2008 &debug,
2009 &detached,
2010 &stack, /* stack */
2011 &c_stack, /* c_stack */
2012 &at_exit,
2013 &inherit_from,
2014 &affinity,
2015 &queue_max_size) )
2016 { free_thread_info(info);
2017 fail;
2018 }
2019 info->detached = detached;
2020 if ( at_exit && !PL_is_callable(at_exit) )
2021 { free_thread_info(info);
2022 return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_callable, at_exit);
2023 }
2024 if ( inherit_from )
2025 { PL_thread_info_t *oinfo;
2026
2027 if ( get_thread(inherit_from, &oinfo, TRUE) )
2028 { ldold = oinfo->thread_data;
2029 } else
2030 { free_thread_info(info);
2031 return FALSE;
2032 }
2033 }
2034 if ( debug >= 0 )
2035 info->debug = debug;
2036 else
2037 info->debug = ldold->thread.info->debug;
2038
2039 if ( !PL_is_variable(id) &&
2040 !(PL_get_atom(id, &idname) && idname == alias) )
2041 { free_thread_info(info);
2042 return PL_error("thread_create", 3, NULL, ERR_UNINSTANTIATION, 2, id);
2043 }
2044
2045 if ( stack )
2046 info->stack_limit = stack;
2047 else
2048 info->stack_limit = ldold->stacks.limit;
2049
2050 th = create_thread_handle(info);
2051 if ( alias )
2052 { if ( !aliasThread(info->pl_tid, ATOM_thread, alias) )
2053 { free_thread_info(info);
2054 fail;
2055 }
2056 }
2057 if ( !unify_thread_id(id, info) )
2058 { free_thread_info(info);
2059
2060 if ( !PL_exception(0) )
2061 return PL_uninstantiation_error(id);
2062
2063 fail;
2064 }
2065 if ( !info->detached )
2066 PL_unregister_atom(th->symbol);
2067
2068 info->goal = PL_record(goal);
2069 info->module = PL_context();
2070 copy_local_data(ldnew, ldold, queue_max_size);
2071 if ( at_exit )
2072 register_event_hook(&ldnew->event.hook.onthreadexit, FALSE, at_exit, 0);
2073
2074 pthread_attr_init(&attr);
2075 if ( info->detached )
2076 { func = "pthread_attr_setdetachstate";
2077 rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
2078 }
2079 if ( rc == 0 && affinity )
2080 rc = set_affinity(affinity, &attr);
2081 if ( rc == 0 )
2082 {
2083 #ifdef USE_COPY_STACK_SIZE
2084 struct rlimit rlim;
2085 if ( !stack && getrlimit(RLIMIT_STACK, &rlim) == 0 )
2086 { if ( rlim.rlim_cur != RLIM_INFINITY )
2087 stack = rlim.rlim_cur;
2088 /* What is an infinite stack!? */
2089 }
2090 #endif
2091 if ( c_stack )
2092 { stack = round_pages(c_stack);
2093 func = "pthread_attr_setstacksize";
2094 rc = pthread_attr_setstacksize(&attr, c_stack);
2095 info->c_stack_size = c_stack;
2096 } else
2097 { pthread_attr_getstacksize(&attr, &info->c_stack_size);
2098 }
2099 }
2100 if ( rc == 0 )
2101 { PL_LOCK(L_THREAD);
2102 info->status = PL_THREAD_CREATED;
2103 assert(info->goal);
2104 func = "pthread_create";
2105 rc = pthread_create(&info->tid, &attr, start_thread, info);
2106 PL_UNLOCK(L_THREAD);
2107 }
2108 pthread_attr_destroy(&attr);
2109
2110 if ( rc != 0 )
2111 { free_thread_info(info);
2112 if ( !PL_exception(0) )
2113 PL_error(NULL, 0, ThError(rc),
2114 ERR_SYSCALL, func);
2115 return FALSE;
2116 }
2117
2118 return TRUE;
2119 }
2120
2121
2122 static thread_handle *
symbol_thread_handle(atom_t a)2123 symbol_thread_handle(atom_t a)
2124 { void *data;
2125 size_t len;
2126 PL_blob_t *type;
2127
2128 if ( a && (data=PL_blob_data(a, &len, &type)) && type == &thread_blob )
2129 { thread_handle **erd = data;
2130
2131 return *erd;
2132 }
2133
2134 return NULL;
2135 }
2136
2137
2138 static int
get_thread(term_t t,PL_thread_info_t ** info,int warn)2139 get_thread(term_t t, PL_thread_info_t **info, int warn)
2140 { GET_LD
2141 int i = -1;
2142 atom_t a;
2143
2144 if ( PL_get_atom(t, &a) )
2145 { thread_handle *er;
2146 atom_t symbol = a;
2147
2148 from_symbol:
2149 if ( (er=symbol_thread_handle(symbol)) )
2150 { if ( er->info && !THREAD_STATUS_INVALID(er->info->status) )
2151 { i = er->engine_id;
2152 } else
2153 { no_thread:
2154 if ( warn )
2155 PL_existence_error("thread", t);
2156 return FALSE;
2157 }
2158 } else if ( isTextAtom(symbol) ) /* alias name? */
2159 { word w;
2160
2161 if ( (w = (word)lookupHTable(threadTable, (void *)symbol)) )
2162 { symbol = w;
2163 goto from_symbol;
2164 } else
2165 goto no_thread;
2166 } else
2167 { if ( warn )
2168 PL_type_error("thread", t);
2169 return FALSE;
2170 }
2171 } else if ( !PL_get_integer(t, &i) )
2172 { if ( warn )
2173 PL_type_error("thread", t);
2174 return FALSE;
2175 }
2176
2177 if ( i < 1 ||
2178 i > GD->thread.highest_id ||
2179 THREAD_STATUS_INVALID(GD->thread.threads[i]->status) )
2180 { goto no_thread;
2181 }
2182
2183 *info = GD->thread.threads[i];
2184
2185 return TRUE;
2186 }
2187
2188
2189 static thread_handle *
create_thread_handle(PL_thread_info_t * info)2190 create_thread_handle(PL_thread_info_t *info)
2191 { atom_t symbol;
2192
2193 if ( (symbol=info->symbol) )
2194 { return symbol_thread_handle(symbol);
2195 } else
2196 { thread_handle *ref;
2197
2198 if ( info->is_engine )
2199 ref = PL_malloc(sizeof(*ref)+sizeof(simpleMutex));
2200 else
2201 ref = PL_malloc(sizeof(*ref));
2202
2203 if ( ref )
2204 { int new;
2205
2206 memset(ref, 0, sizeof(*ref));
2207 if ( info->is_engine )
2208 { ref->interactor.mutex = (simpleMutex*)(ref+1);
2209 simpleMutexInit(ref->interactor.mutex);
2210 }
2211
2212 ref->info = info;
2213 ref->engine_id = info->pl_tid;
2214 ref->symbol = lookupBlob((char*)&ref, sizeof(ref), &thread_blob, &new);
2215 assert(new);
2216 info->symbol = ref->symbol;
2217 }
2218
2219 return ref;
2220 }
2221 }
2222
2223
2224 int
unify_thread_id(term_t id,PL_thread_info_t * info)2225 unify_thread_id(term_t id, PL_thread_info_t *info)
2226 { GET_LD
2227 thread_handle *th;
2228
2229 if ( (th = create_thread_handle(info)) )
2230 { atom_t name = th->alias ? th->alias : th->symbol;
2231
2232 return PL_unify_atom(id, name);
2233 } else /* during destruction? */
2234 return PL_unify_integer(id, info->pl_tid);
2235 }
2236
2237
2238 /* If lock = TRUE, this is used from thread_property/2 and we must
2239 be careful that the thread may vanish during the process if it
2240 is a detached thread. Note that we only avoid crashes. The fact
2241 that the value may not be true at the moment it is requested is
2242 simply a limitation of status pulling.
2243 */
2244
2245 static int
unify_engine_status(term_t status,PL_thread_info_t * info ARG_LD)2246 unify_engine_status(term_t status, PL_thread_info_t *info ARG_LD)
2247 { return PL_unify_atom(status, ATOM_suspended);
2248 }
2249
2250
2251 static int
unify_thread_status(term_t status,PL_thread_info_t * info,thread_status stat,int lock)2252 unify_thread_status(term_t status, PL_thread_info_t *info,
2253 thread_status stat, int lock)
2254 { GET_LD
2255
2256 switch(stat)
2257 { case PL_THREAD_CREATED:
2258 case PL_THREAD_RUNNING:
2259 { int rc = FALSE;
2260 if ( info->is_engine )
2261 { if ( lock ) PL_LOCK(L_THREAD);
2262 if ( !info->has_tid )
2263 rc = unify_engine_status(status, info PASS_LD);
2264 if ( lock ) PL_UNLOCK(L_THREAD);
2265 }
2266 return rc || PL_unify_atom(status, ATOM_running);
2267 }
2268 case PL_THREAD_EXITED:
2269 { term_t tmp = PL_new_term_ref();
2270 int rc = TRUE;
2271
2272 if ( info->return_value )
2273 { if ( lock ) PL_LOCK(L_THREAD);
2274 if ( info->return_value )
2275 rc = PL_recorded(info->return_value, tmp);
2276 if ( lock ) PL_UNLOCK(L_THREAD);
2277 }
2278
2279 if ( !rc )
2280 return raiseStackOverflow(GLOBAL_OVERFLOW);
2281 else
2282 return PL_unify_term(status,
2283 PL_FUNCTOR, FUNCTOR_exited1,
2284 PL_TERM, tmp);
2285 }
2286 case PL_THREAD_SUCCEEDED:
2287 return PL_unify_atom(status, ATOM_true);
2288 case PL_THREAD_FAILED:
2289 return PL_unify_atom(status, ATOM_false);
2290 case PL_THREAD_EXCEPTION:
2291 { term_t tmp = PL_new_term_ref();
2292 int rc = TRUE;
2293
2294 if ( lock ) PL_LOCK(L_THREAD);
2295 if ( info->return_value )
2296 rc = PL_recorded(info->return_value, tmp);
2297 if ( lock ) PL_UNLOCK(L_THREAD);
2298 if ( !rc )
2299 return raiseStackOverflow(GLOBAL_OVERFLOW);
2300 else
2301 return PL_unify_term(status,
2302 PL_FUNCTOR, FUNCTOR_exception1,
2303 PL_TERM, tmp);
2304 }
2305 case PL_THREAD_NOMEM:
2306 { return PL_unify_term(status,
2307 PL_FUNCTOR, FUNCTOR_exception1,
2308 PL_FUNCTOR, FUNCTOR_error2,
2309 PL_FUNCTOR, FUNCTOR_resource_error1,
2310 PL_ATOM, ATOM_memory,
2311 PL_VARIABLE);
2312 }
2313 default:
2314 DEBUG(MSG_THREAD, Sdprintf("info->status = %d\n", info->status));
2315 fail; /* can happen in current_thread/2 */
2316 }
2317 }
2318
2319
2320 word
pl_thread_self(term_t self)2321 pl_thread_self(term_t self)
2322 { GET_LD
2323
2324 return unify_thread_id(self, LD->thread.info);
2325 }
2326
2327 static void
unalias_thread(thread_handle * th)2328 unalias_thread(thread_handle *th)
2329 { atom_t name;
2330
2331 if ( (name=th->alias) )
2332 { atom_t symbol;
2333
2334 if ( (symbol=(word)deleteHTable(threadTable, (void *)th->alias)) )
2335 { th->alias = NULL_ATOM;
2336 PL_unregister_atom(name);
2337 PL_unregister_atom(symbol);
2338 }
2339 }
2340 }
2341
2342
2343 static void
free_thread_info(PL_thread_info_t * info)2344 free_thread_info(PL_thread_info_t *info)
2345 { record_t rec_rv, rec_g;
2346 PL_thread_info_t *freelist;
2347
2348 assert(info->status != PL_THREAD_UNUSED);
2349 info->status = PL_THREAD_UNUSED;
2350
2351 if ( info->thread_data )
2352 { info->detached = FALSE; /* avoid recursion and we are dead anyway */
2353 free_prolog_thread(info->thread_data);
2354 }
2355
2356 PL_LOCK(L_THREAD);
2357 if ( info->symbol )
2358 { thread_handle *th;
2359
2360 if ( (th=symbol_thread_handle(info->symbol)) )
2361 { th->info = NULL;
2362 if ( th->alias && !info->is_engine )
2363 unalias_thread(th);
2364 }
2365
2366 if ( info->detached && !info->is_engine )
2367 PL_unregister_atom(info->symbol);
2368 }
2369
2370 if ( (rec_rv=info->return_value) ) /* sync with unify_thread_status() */
2371 info->return_value = NULL;
2372 if ( (rec_g=info->goal) )
2373 info->goal = NULL;
2374
2375 if ( info->pl_tid == GD->thread.highest_id )
2376 { int i;
2377
2378 for(i=info->pl_tid-1; i>1; i--)
2379 { PL_thread_info_t *ih = GD->thread.threads[i];
2380 if ( ih && ih->status != PL_THREAD_UNUSED )
2381 break;
2382 }
2383
2384 GD->thread.highest_id = i;
2385 }
2386 PL_UNLOCK(L_THREAD);
2387
2388 do
2389 { freelist = GD->thread.free;
2390 info->next_free = freelist;
2391 } while( !COMPARE_AND_SWAP_PTR(&GD->thread.free, freelist, info) );
2392
2393 if ( rec_rv ) PL_erase(rec_rv);
2394 if ( rec_g ) PL_erase(rec_g);
2395 }
2396
2397
2398 static int
pthread_join_interruptible(pthread_t thread,void ** retval)2399 pthread_join_interruptible(pthread_t thread, void **retval)
2400 {
2401 #ifdef HAVE_PTHREAD_TIMEDJOIN_NP
2402 for(;;)
2403 { struct timespec deadline;
2404 int rc;
2405
2406 get_current_timespec(&deadline);
2407 deadline.tv_nsec += 250000000;
2408 carry_timespec_nanos(&deadline);
2409
2410 if ( (rc=pthread_timedjoin_np(thread, retval, &deadline)) == ETIMEDOUT )
2411 { if ( PL_handle_signals() < 0 )
2412 return EINTR;
2413 } else
2414 return rc;
2415 }
2416 #else
2417 return pthread_join(thread, retval);
2418 #endif
2419 }
2420
2421
2422
2423 static
2424 PRED_IMPL("thread_join", 2, thread_join, 0)
2425 { PRED_LD
2426 PL_thread_info_t *info;
2427 void *r;
2428 word rval;
2429 int rc;
2430 thread_status status;
2431
2432 term_t thread = A1;
2433 term_t retcode = A2;
2434
2435 if ( !get_thread(thread, &info, TRUE) )
2436 return FALSE;
2437
2438 if ( info == LD->thread.info || info->detached )
2439 { return PL_error("thread_join", 2,
2440 info->detached ? "Cannot join detached thread"
2441 : "Cannot join self",
2442 ERR_PERMISSION, ATOM_join, ATOM_thread, thread);
2443 }
2444
2445 rc = pthread_join_interruptible(info->tid, &r);
2446
2447 switch(rc)
2448 { case 0:
2449 break;
2450 case EINTR:
2451 return FALSE;
2452 case ESRCH:
2453 Sdprintf("Join %s: ESRCH from %d\n",
2454 threadName(info->pl_tid), info->tid);
2455 return PL_error("thread_join", 2, NULL,
2456 ERR_EXISTENCE, ATOM_thread, thread);
2457 default:
2458 return PL_error("thread_join", 2, ThError(rc),
2459 ERR_SYSCALL, "pthread_join");
2460 }
2461
2462 status = info->status;
2463 if ( !THREAD_STATUS_INVALID(status) &&
2464 COMPARE_AND_SWAP_INT((int*)&info->status, (int)status, (int)PL_THREAD_JOINED) )
2465 { rval = unify_thread_status(retcode, info, status, FALSE);
2466
2467 free_thread_info(info);
2468 } else
2469 { rval = PL_error(NULL, 0, "already joined",
2470 ERR_EXISTENCE, ATOM_thread, thread);
2471 }
2472
2473 return rval;
2474 }
2475
2476
2477 word
pl_thread_exit(term_t retcode)2478 pl_thread_exit(term_t retcode)
2479 { GET_LD
2480 PL_thread_info_t *info = LD->thread.info;
2481
2482 PL_LOCK(L_THREAD);
2483 info->status = PL_THREAD_EXITED;
2484 info->return_value = PL_record(retcode);
2485 PL_UNLOCK(L_THREAD);
2486
2487 DEBUG(MSG_THREAD, Sdprintf("thread_exit(%d)\n", info->pl_tid));
2488
2489 pthread_exit(NULL);
2490 assert(0);
2491 fail;
2492 }
2493
2494
2495 static
2496 PRED_IMPL("thread_detach", 1, thread_detach, 0)
2497 { PL_thread_info_t *info;
2498 PL_thread_info_t *release = NULL;
2499
2500 PL_LOCK(L_THREAD);
2501 if ( !get_thread(A1, &info, TRUE) )
2502 { PL_UNLOCK(L_THREAD);
2503 fail;
2504 }
2505
2506 if ( !info->detached )
2507 { int rc;
2508
2509 if ( (rc=pthread_detach(info->tid)) )
2510 { assert(rc == ESRCH);
2511
2512 release = info;
2513 } else
2514 { PL_register_atom(info->symbol);
2515 info->detached = TRUE;
2516 }
2517 }
2518
2519 PL_UNLOCK(L_THREAD);
2520
2521 if ( release )
2522 free_thread_info(release);
2523
2524 succeed;
2525 }
2526
2527
2528 static
2529 PRED_IMPL("thread_alias", 1, thread_alias, 0)
2530 { PRED_LD
2531 PL_thread_info_t *info = LD->thread.info;
2532 thread_handle *th;
2533 atom_t alias;
2534
2535 if ( (th = create_thread_handle(info)) &&
2536 th->alias )
2537 { term_t ex = PL_new_term_ref();
2538
2539 return ( unify_thread_id(ex, info) &&
2540 PL_permission_error("re-alias", "thread", ex) );
2541 }
2542
2543 return ( PL_get_atom_ex(A1, &alias) &&
2544 aliasThread(PL_thread_self(), ATOM_thread, alias) );
2545 }
2546
2547
2548 static size_t
sizeof_thread(PL_thread_info_t * info)2549 sizeof_thread(PL_thread_info_t *info)
2550 { size_t size = sizeof(*info);
2551 struct PL_local_data *ld = info->thread_data;
2552
2553 if ( info->status != PL_THREAD_RUNNING )
2554 return 0;
2555
2556 if ( ld )
2557 { size += sizeof(*ld);
2558 size += sizeStackP(&ld->stacks.global) + ld->stacks.global.spare;
2559 size += sizeStackP(&ld->stacks.local) + ld->stacks.local.spare;
2560 size += sizeStackP(&ld->stacks.trail) + ld->stacks.trail.spare;
2561 size += sizeStackP(&ld->stacks.argument);
2562
2563 size += sizeof_message_queue(&ld->thread.messages);
2564 size += sizeof_local_definitions(ld);
2565
2566 if ( ld->tabling.node_pool )
2567 size += ld->tabling.node_pool->size;
2568 }
2569
2570 return size;
2571 }
2572
2573
2574 /*******************************
2575 * THREAD PROPERTY *
2576 *******************************/
2577
2578 static atom_t
symbol_alias(atom_t symbol)2579 symbol_alias(atom_t symbol)
2580 { thread_handle *th;
2581
2582 if ( (th=symbol_thread_handle(symbol)) )
2583 return th->alias;
2584
2585 return NULL_ATOM;
2586 }
2587
2588 static int
thread_id_propery(PL_thread_info_t * info,term_t prop ARG_LD)2589 thread_id_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2590 { return PL_unify_integer(prop, info->pl_tid);
2591 }
2592
2593 static int
thread_alias_propery(PL_thread_info_t * info,term_t prop ARG_LD)2594 thread_alias_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2595 { atom_t symbol, alias;
2596
2597 if ( (symbol=info->symbol) &&
2598 (alias=symbol_alias(symbol)) )
2599 return PL_unify_atom(prop, alias);
2600
2601 fail;
2602 }
2603
2604 static int
thread_status_propery(PL_thread_info_t * info,term_t prop ARG_LD)2605 thread_status_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2606 { IGNORE_LD
2607
2608 return unify_thread_status(prop, info, info->status, TRUE);
2609 }
2610
2611 static int
thread_detached_propery(PL_thread_info_t * info,term_t prop ARG_LD)2612 thread_detached_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2613 { IGNORE_LD
2614
2615 return PL_unify_bool_ex(prop, info->detached);
2616 }
2617
2618 static int
thread_debug_propery(PL_thread_info_t * info,term_t prop ARG_LD)2619 thread_debug_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2620 { IGNORE_LD
2621
2622 return PL_unify_bool_ex(prop, info->debug);
2623 }
2624
2625 static int
thread_engine_propery(PL_thread_info_t * info,term_t prop ARG_LD)2626 thread_engine_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2627 { IGNORE_LD
2628
2629 return PL_unify_bool_ex(prop, info->is_engine);
2630 }
2631
2632 static int
thread_thread_propery(PL_thread_info_t * info,term_t prop ARG_LD)2633 thread_thread_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2634 { if ( info->is_engine )
2635 { thread_handle *th = symbol_thread_handle(info->symbol);
2636
2637 if ( th->interactor.thread )
2638 { atom_t alias = symbol_alias(th->interactor.thread);
2639
2640 return PL_unify_atom(prop, alias ? alias : th->interactor.thread);
2641 }
2642 }
2643
2644 return FALSE;
2645 }
2646
2647 static int
thread_tid_propery(PL_thread_info_t * info,term_t prop ARG_LD)2648 thread_tid_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2649 { IGNORE_LD
2650
2651 if ( info->has_tid )
2652 { intptr_t tid = system_thread_id(info);
2653
2654 if ( tid != -1 )
2655 return PL_unify_integer(prop, system_thread_id(info));
2656 }
2657
2658 return FALSE;
2659 }
2660
2661 static int
thread_size_propery(PL_thread_info_t * info,term_t prop ARG_LD)2662 thread_size_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2663 { size_t size;
2664
2665 PL_LOCK(L_THREAD);
2666 size = sizeof_thread(info);
2667 PL_UNLOCK(L_THREAD);
2668
2669 if ( size )
2670 return PL_unify_int64(prop, size);
2671
2672 return FALSE;
2673 }
2674
2675 static const tprop tprop_list [] =
2676 { { FUNCTOR_id1, thread_id_propery },
2677 { FUNCTOR_alias1, thread_alias_propery },
2678 { FUNCTOR_status1, thread_status_propery },
2679 { FUNCTOR_detached1, thread_detached_propery },
2680 { FUNCTOR_debug1, thread_debug_propery },
2681 { FUNCTOR_engine1, thread_engine_propery },
2682 { FUNCTOR_thread1, thread_thread_propery },
2683 { FUNCTOR_system_thread_id1, thread_tid_propery },
2684 { FUNCTOR_size1, thread_size_propery },
2685 { 0, NULL }
2686 };
2687
2688
2689 typedef struct
2690 { int tid;
2691 const tprop *p;
2692 int enum_threads;
2693 int enum_properties;
2694 } tprop_enum;
2695
2696
2697 static int
advance_state(tprop_enum * state)2698 advance_state(tprop_enum *state)
2699 { if ( state->enum_properties )
2700 { state->p++;
2701 if ( state->p->functor )
2702 succeed;
2703
2704 state->p = tprop_list;
2705 }
2706 if ( state->enum_threads )
2707 { do
2708 { state->tid++;
2709 if ( state->tid > GD->thread.highest_id )
2710 fail;
2711 } while ( GD->thread.threads[state->tid]->status == PL_THREAD_UNUSED ||
2712 GD->thread.threads[state->tid]->status == PL_THREAD_RESERVED );
2713
2714 succeed;
2715 }
2716
2717 fail;
2718 }
2719
2720
2721 static
2722 PRED_IMPL("thread_property", 2, thread_property, PL_FA_NONDETERMINISTIC)
2723 { PRED_LD
2724 term_t thread = A1;
2725 term_t property = A2;
2726 tprop_enum statebuf;
2727 tprop_enum *state;
2728
2729 switch( CTX_CNTRL )
2730 { case FRG_FIRST_CALL:
2731 { PL_thread_info_t *info;
2732
2733 memset(&statebuf, 0, sizeof(statebuf));
2734 state = &statebuf;
2735
2736 if ( PL_is_variable(thread) )
2737 { switch( get_prop_def(property, ATOM_thread_property,
2738 tprop_list, &statebuf.p) )
2739 { case 1:
2740 state->tid = 1;
2741 state->enum_threads = TRUE;
2742 goto enumerate;
2743 case 0:
2744 state->p = tprop_list;
2745 state->tid = 1;
2746 state->enum_threads = TRUE;
2747 state->enum_properties = TRUE;
2748 goto enumerate;
2749 case -1:
2750 fail;
2751 }
2752 } else if ( get_thread(thread, &info, TRUE) )
2753 { state->tid = info->pl_tid;
2754
2755 switch( get_prop_def(property, ATOM_thread_property,
2756 tprop_list, &statebuf.p) )
2757 { case 1:
2758 goto enumerate;
2759 case 0:
2760 state->p = tprop_list;
2761 state->enum_properties = TRUE;
2762 goto enumerate;
2763 case -1:
2764 fail;
2765 }
2766 } else
2767 { fail;
2768 }
2769 }
2770 case FRG_REDO:
2771 state = CTX_PTR;
2772 break;
2773 case FRG_CUTTED:
2774 state = CTX_PTR;
2775 freeForeignState(state, sizeof(*state));
2776 succeed;
2777 default:
2778 assert(0);
2779 fail;
2780 }
2781
2782 enumerate:
2783 { term_t arg = PL_new_term_ref();
2784
2785 if ( !state->enum_properties )
2786 _PL_get_arg(1, property, arg);
2787
2788 for(;;)
2789 { PL_thread_info_t *info = GD->thread.threads[state->tid];
2790
2791 if ( info && (*state->p->function)(info, arg PASS_LD) )
2792 { if ( state->enum_properties )
2793 { if ( !PL_unify_term(property,
2794 PL_FUNCTOR, state->p->functor,
2795 PL_TERM, arg) )
2796 goto error;
2797 }
2798 if ( state->enum_threads )
2799 { if ( !unify_thread_id(thread, info) )
2800 goto error;
2801 }
2802
2803 if ( advance_state(state) )
2804 { if ( state == &statebuf )
2805 { tprop_enum *copy = allocForeignState(sizeof(*copy));
2806
2807 *copy = *state;
2808 state = copy;
2809 }
2810
2811 ForeignRedoPtr(state);
2812 }
2813
2814 if ( state != &statebuf )
2815 freeForeignState(state, sizeof(*state));
2816 succeed;
2817 }
2818
2819 if ( !advance_state(state) )
2820 { error:
2821 if ( state != &statebuf )
2822 freeForeignState(state, sizeof(*state));
2823 fail;
2824 }
2825 }
2826 }
2827 }
2828
2829
2830 static
2831 PRED_IMPL("is_thread", 1, is_thread, 0)
2832 { PL_thread_info_t *info;
2833
2834 return get_thread(A1, &info, FALSE);
2835 }
2836
2837
2838 static
2839 PRED_IMPL("thread_setconcurrency", 2, thread_setconcurrency, 0)
2840 { PRED_LD
2841
2842 #ifdef HAVE_PTHREAD_SETCONCURRENCY
2843 int val = pthread_getconcurrency();
2844 int rc;
2845
2846 if ( PL_unify_integer(A1, val) )
2847 { if ( PL_compare(A1, A2) != 0 )
2848 { if ( PL_get_integer_ex(A2, &val) )
2849 { if ( (rc=pthread_setconcurrency(val)) != 0 )
2850 return PL_error(NULL, 0, ThError(rc),
2851 ERR_SYSCALL, "pthread_setconcurrency");
2852 }
2853 }
2854 }
2855
2856 succeed;
2857 #else
2858 return PL_unify_integer(A1, 0);
2859 #endif
2860 }
2861
2862 #if defined(HAVE_SCHED_SETAFFINITY) && defined(PID_IDENTIFIES_THREAD)
2863 #define HAVE_PRED_THREAD_AFFINITY 1
2864 static
2865 PRED_IMPL("thread_affinity", 3, thread_affinity, 0)
2866 { PRED_LD
2867 PL_thread_info_t *info;
2868 int rc;
2869
2870 PL_LOCK(L_THREAD);
2871 if ( (rc=get_thread(A1, &info, TRUE)) )
2872 { cpu_set_t cpuset;
2873
2874 if ( (rc=sched_getaffinity(info->pid, sizeof(cpuset), &cpuset)) == 0 )
2875 { int count = CPU_COUNT(&cpuset);
2876 int i, n;
2877 term_t tail = PL_copy_term_ref(A2);
2878 term_t head = PL_new_term_ref();
2879
2880 for(i=0, n=0; n<count; i++)
2881 { if ( CPU_ISSET(i, &cpuset) )
2882 { n++;
2883 if ( !PL_unify_list_ex(tail, head, tail) ||
2884 !PL_unify_integer(head, i) )
2885 goto error; /* rc == 0 (FALSE) */
2886 }
2887 }
2888 if ( !PL_unify_nil_ex(tail) )
2889 goto error; /* rc == 0 (FALSE) */
2890 } else
2891 { rc = PL_error(NULL, 0, ThError(rc),
2892 ERR_SYSCALL, "sched_getaffinity");
2893 goto error;
2894 }
2895
2896 if ( PL_compare(A2, A3) != 0 )
2897 { if ( (rc=get_cpuset(A3, &cpuset)) )
2898 { if ( (rc=sched_setaffinity(info->pid, sizeof(cpuset), &cpuset)) == 0 )
2899 { rc = TRUE;
2900 } else
2901 { rc = PL_error(NULL, 0, ThError(rc),
2902 ERR_SYSCALL, "sched_setaffinity");
2903 }
2904 }
2905 } else
2906 { rc = TRUE;
2907 }
2908 }
2909 error:
2910 PL_UNLOCK(L_THREAD);
2911
2912 return rc;
2913 }
2914 #endif /*HAVE_SCHED_SETAFFINITY*/
2915
2916
2917 /*******************************
2918 * CLEANUP *
2919 *******************************/
2920
2921 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
2922 Request a function to run when the Prolog thread is about to detach, but
2923 still capable of running Prolog queries.
2924 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
2925
2926 int
PL_thread_at_exit(void (* function)(void *),void * closure,int global)2927 PL_thread_at_exit(void (*function)(void *), void *closure, int global)
2928 { GET_LD
2929 event_list **list = global ? &GD->event.hook.onthreadexit
2930 : &LD->event.hook.onthreadexit;
2931 int (*func)() = (void *)function;
2932
2933 return register_event_function(list, FALSE, func, closure, 0);
2934 }
2935
2936 /*******************************
2937 * THREAD SIGNALS *
2938 *******************************/
2939
2940 typedef struct _thread_sig
2941 { struct _thread_sig *next; /* Next in queue */
2942 Module module; /* Module for running goal */
2943 record_t goal; /* Goal to run */
2944 } thread_sig;
2945
2946
2947 static int
is_alive(int status)2948 is_alive(int status)
2949 { switch(status)
2950 { case PL_THREAD_CREATED:
2951 case PL_THREAD_RUNNING:
2952 succeed;
2953 default:
2954 fail;
2955 }
2956 }
2957
2958
2959 foreign_t
pl_thread_signal(term_t thread,term_t goal)2960 pl_thread_signal(term_t thread, term_t goal)
2961 { GET_LD
2962 Module m = NULL;
2963 thread_sig *sg;
2964 PL_thread_info_t *info;
2965 PL_local_data_t *ld;
2966
2967 if ( !PL_strip_module(goal, &m, goal) )
2968 return FALSE;
2969
2970 PL_LOCK(L_THREAD);
2971 if ( !get_thread(thread, &info, TRUE) )
2972 { PL_UNLOCK(L_THREAD);
2973 fail;
2974 }
2975 if ( !is_alive(info->status) )
2976 { error:
2977 PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_thread, thread);
2978 PL_UNLOCK(L_THREAD);
2979 fail;
2980 }
2981
2982 sg = allocHeapOrHalt(sizeof(*sg));
2983 sg->next = NULL;
2984 sg->module = m;
2985 sg->goal = PL_record(goal);
2986
2987 ld = info->thread_data;
2988 if ( !ld->thread.sig_head )
2989 ld->thread.sig_head = ld->thread.sig_tail = sg;
2990 else
2991 { ld->thread.sig_tail->next = sg;
2992 ld->thread.sig_tail = sg;
2993 }
2994 raiseSignal(ld, SIG_THREAD_SIGNAL);
2995 if ( info->has_tid && !alertThread(info) )
2996 goto error;
2997
2998 PL_UNLOCK(L_THREAD);
2999
3000 succeed;
3001 }
3002
3003
3004 void
executeThreadSignals(int sig)3005 executeThreadSignals(int sig)
3006 { GET_LD
3007 thread_sig *sg, *next;
3008 fid_t fid;
3009 (void)sig;
3010
3011 if ( !is_alive(LD->thread.info->status) )
3012 return;
3013
3014 PL_LOCK(L_THREAD);
3015 sg = LD->thread.sig_head;
3016 LD->thread.sig_head = LD->thread.sig_tail = NULL;
3017 PL_UNLOCK(L_THREAD);
3018
3019 fid = PL_open_foreign_frame();
3020
3021 for( ; sg; sg = next)
3022 { term_t goal = PL_new_term_ref();
3023 Module gm;
3024 term_t ex;
3025 int rval;
3026
3027 next = sg->next;
3028 rval = PL_recorded(sg->goal, goal);
3029 PL_erase(sg->goal);
3030 gm = sg->module;
3031 freeHeap(sg, sizeof(*sg));
3032
3033 DEBUG(MSG_THREAD,
3034 Sdprintf("[%d] Executing thread signal\n", PL_thread_self()));
3035 if ( rval )
3036 {
3037 #ifdef O_LIMIT_DEPTH
3038 uintptr_t olimit = depth_limit;
3039 depth_limit = DEPTH_NO_LIMIT;
3040 #endif
3041 rval = callProlog(gm, goal, PL_Q_CATCH_EXCEPTION, &ex);
3042 #ifdef O_LIMIT_DEPTH
3043 depth_limit = olimit;
3044 #endif
3045 } else
3046 { rval = raiseStackOverflow(GLOBAL_OVERFLOW);
3047 ex = exception_term;
3048 }
3049
3050 if ( !rval && ex )
3051 { PL_raise_exception(ex);
3052 PL_close_foreign_frame(fid);
3053
3054 DEBUG(MSG_THREAD,
3055 { print_trace(8);
3056 Sdprintf("[%d]: Prolog backtrace:\n", PL_thread_self());
3057 PL_backtrace(5, 0);
3058 Sdprintf("[%d]: end Prolog backtrace:\n", PL_thread_self());
3059 });
3060
3061 for(sg = next; sg; sg=next)
3062 { next = sg->next;
3063 PL_erase(sg->goal);
3064 freeHeap(sg, sizeof(*sg));
3065 }
3066
3067 return;
3068 }
3069
3070 PL_rewind_foreign_frame(fid);
3071 }
3072
3073 PL_discard_foreign_frame(fid);
3074 }
3075
3076
3077 static void
freeThreadSignals(PL_local_data_t * ld)3078 freeThreadSignals(PL_local_data_t *ld)
3079 { thread_sig *sg;
3080 thread_sig *next;
3081
3082 for( sg = ld->thread.sig_head; sg; sg = next )
3083 { next = sg->next;
3084
3085 PL_erase(sg->goal);
3086 freeHeap(sg, sizeof(*sg));
3087 }
3088 }
3089
3090
3091 /*******************************
3092 * INTERACTORS *
3093 *******************************/
3094
3095 static int
get_interactor(term_t t,thread_handle ** thp,int warn ARG_LD)3096 get_interactor(term_t t, thread_handle **thp, int warn ARG_LD)
3097 { atom_t a;
3098
3099 if ( PL_get_atom(t, &a) )
3100 { thread_handle *th;
3101 atom_t symbol = a;
3102
3103 from_symbol:
3104 if ( (th=symbol_thread_handle(symbol)) &&
3105 true(th, TH_IS_INTERACTOR) )
3106 { if ( th->info || true(th, (TH_INTERACTOR_NOMORE|TH_INTERACTOR_DONE)) )
3107 { *thp = th;
3108 return TRUE;
3109 }
3110 if ( warn )
3111 PL_existence_error("engine", t);
3112 } else if ( isTextAtom(symbol) )
3113 { word w;
3114
3115 if ( (w = (word)lookupHTable(threadTable, (void *)symbol)) )
3116 { symbol = w;
3117 goto from_symbol;
3118 }
3119 if ( warn )
3120 PL_existence_error("engine", t);
3121 } else
3122 { if ( warn )
3123 PL_type_error("engine", t);
3124 }
3125 } else
3126 { if ( warn )
3127 PL_type_error("engine", t);
3128 }
3129
3130 return FALSE;
3131 }
3132
3133
3134 /** '$engine_create'(-Handle, +GoalAndTemplate, +Options)
3135 */
3136
3137 static const opt_spec make_engine_options[] =
3138 { { ATOM_stack_limit, OPT_SIZE|OPT_INF },
3139 { ATOM_alias, OPT_ATOM },
3140 { ATOM_inherit_from, OPT_TERM },
3141 { NULL_ATOM, 0 }
3142 };
3143
3144
3145 static
3146 PRED_IMPL("$engine_create", 3, engine_create, 0)
3147 { PRED_LD
3148 PL_engine_t new;
3149 PL_thread_attr_t attrs;
3150 size_t stack = 0;
3151 atom_t alias = NULL_ATOM;
3152 term_t inherit_from = 0;
3153
3154 memset(&attrs, 0, sizeof(attrs));
3155 if ( !scan_options(A3, 0,
3156 ATOM_engine_option, make_engine_options,
3157 &stack,
3158 &alias,
3159 &inherit_from) )
3160 return FALSE;
3161
3162 if ( stack )
3163 attrs.stack_limit = stack;
3164 else
3165 attrs.stack_limit = LD->stacks.limit;
3166
3167 if ( (new = PL_create_engine(&attrs)) )
3168 { PL_engine_t me;
3169 static predicate_t pred = NULL;
3170 record_t r;
3171 thread_handle *th;
3172 term_t t;
3173 int rc;
3174
3175 new->thread.info->is_engine = TRUE;
3176 th = create_thread_handle(new->thread.info);
3177 set(th, TH_IS_INTERACTOR);
3178 ATOMIC_INC(&GD->statistics.engines_created);
3179
3180 if ( alias )
3181 { if ( !aliasThread(new->thread.info->pl_tid, ATOM_engine, alias) )
3182 { destroy_interactor(th, FALSE);
3183 return FALSE;
3184 }
3185 }
3186
3187 if ( !(rc = unify_thread_id(A1, new->thread.info)) )
3188 { destroy_interactor(th, FALSE);
3189 if ( !PL_exception(0) )
3190 return PL_uninstantiation_error(A1);
3191
3192 return FALSE;
3193 }
3194 PL_unregister_atom(th->symbol);
3195
3196 if ( !pred )
3197 pred = PL_predicate("call", 1, "system");
3198
3199 r = PL_record(A2);
3200 rc = PL_set_engine(new, &me);
3201 assert(rc == PL_ENGINE_SET);
3202 LOCAL_LD = new;
3203
3204 if ( (t = PL_new_term_ref()) &&
3205 (th->interactor.argv = PL_new_term_refs(2)) &&
3206 PL_recorded(r, t) &&
3207 PL_get_arg(1, t, th->interactor.argv+0) &&
3208 PL_get_arg(2, t, th->interactor.argv+1) )
3209 { th->interactor.query = PL_open_query(NULL,
3210 PL_Q_CATCH_EXCEPTION|
3211 PL_Q_ALLOW_YIELD|
3212 PL_Q_EXT_STATUS,
3213 pred, th->interactor.argv+1);
3214 PL_set_engine(me, NULL);
3215 LOCAL_LD = me;
3216 } else
3217 { assert(0); /* TBD: copy exception */
3218 }
3219
3220 PL_erase(r);
3221
3222 return TRUE;
3223 }
3224
3225 return PL_no_memory();
3226 }
3227
3228
3229 static void
destroy_interactor(thread_handle * th,int gc)3230 destroy_interactor(thread_handle *th, int gc)
3231 { if ( th->interactor.query )
3232 { PL_engine_t me;
3233
3234 PL_set_engine(th->info->thread_data, &me);
3235 PL_close_query(th->interactor.query);
3236 PL_set_engine(me, NULL);
3237 th->interactor.query = 0;
3238 }
3239 if ( th->info )
3240 { PL_destroy_engine(th->info->thread_data);
3241 ATOMIC_INC(&GD->statistics.engines_finished);
3242 assert(th->info == NULL || gc);
3243 }
3244 if ( th->interactor.package )
3245 { PL_erase(th->interactor.package);
3246 th->interactor.package = 0;
3247 }
3248 clear(th, (TH_INTERACTOR_NOMORE|TH_INTERACTOR_DONE));
3249 simpleMutexDelete(th->interactor.mutex);
3250 th->interactor.mutex = NULL;
3251 unalias_thread(th);
3252 }
3253
3254
3255 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3256 Called when we computed the last result. This means we can destroy the
3257 Prolog engine, but the reference (and possible alias) must remain valid
3258 to deal with the engine_destroy/1 or final engine_next/1 if the last
3259 answer was deterministic.
3260 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3261
3262 static void
done_interactor(thread_handle * th)3263 done_interactor(thread_handle *th)
3264 {
3265 #ifndef NDEBUG
3266 GET_LD
3267 PL_engine_t e = th->info->thread_data;
3268 assert(e->thread.info->open_count == 1);
3269 assert(e == LD);
3270 #endif
3271
3272 set(th, TH_INTERACTOR_DONE);
3273 PL_thread_destroy_engine();
3274 ATOMIC_INC(&GD->statistics.engines_finished);
3275 assert(th->info == NULL);
3276 }
3277
3278
3279 static
3280 PRED_IMPL("engine_destroy", 1, engine_destroy, 0)
3281 { PRED_LD
3282 thread_handle *th;
3283
3284 if ( get_interactor(A1, &th, TRUE PASS_LD) )
3285 { destroy_interactor(th, FALSE);
3286
3287 return TRUE;
3288 }
3289
3290 return FALSE;
3291 }
3292
3293
3294 static void
copy_debug_mode(PL_local_data_t * to,PL_local_data_t * from)3295 copy_debug_mode(PL_local_data_t *to, PL_local_data_t *from)
3296 { PL_local_data_t *current = PL_current_engine();
3297
3298 if ( to->_debugstatus.debugging != from->_debugstatus.debugging )
3299 { TLD_set_LD(to);
3300 debugmode(from->_debugstatus.debugging, NULL);
3301 TLD_set_LD(current);
3302 }
3303 if ( to->_debugstatus.tracing != from->_debugstatus.tracing )
3304 { TLD_set_LD(to);
3305 tracemode(from->_debugstatus.tracing, NULL);
3306 TLD_set_LD(current);
3307 }
3308 }
3309
3310
3311 static PL_local_data_t *
activate_interactor(thread_handle * th)3312 activate_interactor(thread_handle *th)
3313 { PL_local_data_t *ld = th->info->thread_data;
3314
3315 TLD_set_LD(ld);
3316 ld->thread.info->tid = pthread_self();
3317 ld->thread.info->has_tid = TRUE;
3318 set_system_thread_id(ld->thread.info);
3319
3320 return ld;
3321 }
3322
3323
3324 static PL_local_data_t *
suspend_interactor(PL_engine_t me,thread_handle * th)3325 suspend_interactor(PL_engine_t me, thread_handle *th)
3326 { if ( th->info )
3327 detach_engine(th->info->thread_data);
3328 TLD_set_LD(me);
3329
3330 return me;
3331 }
3332
3333
3334 #define YIELD_ENGINE_YIELD 256 /* keep in sync with engine_yield/1 */
3335
3336 static int
interactor_post_answer_nolock(thread_handle * th,term_t ref,term_t package,term_t term ARG_LD)3337 interactor_post_answer_nolock(thread_handle *th,
3338 term_t ref, term_t package, term_t term ARG_LD)
3339 { PL_engine_t me = LD;
3340
3341 if ( package && th->interactor.package )
3342 return PL_permission_error("post_to", "engine", ref);
3343
3344 if ( !th->interactor.query )
3345 { if ( true(th, TH_INTERACTOR_NOMORE) )
3346 { clear(th, TH_INTERACTOR_NOMORE);
3347 set(th, TH_INTERACTOR_DONE);
3348 return FALSE;
3349 }
3350 return PL_existence_error("engine", ref);
3351 }
3352
3353 if ( package )
3354 th->interactor.package = PL_record(package);
3355
3356 if ( (LOCAL_LD = activate_interactor(th)) )
3357 { term_t t;
3358 int rc;
3359 record_t r;
3360
3361 copy_debug_mode(LD, me);
3362 rc = PL_next_solution(th->interactor.query);
3363
3364 switch( rc )
3365 { case PL_S_TRUE:
3366 { r = PL_record(th->interactor.argv+0);
3367 break;
3368 }
3369 case PL_S_LAST:
3370 { r = PL_record(th->interactor.argv+0);
3371 PL_close_query(th->interactor.query);
3372 th->interactor.query = 0;
3373 done_interactor(th);
3374 set(th, TH_INTERACTOR_NOMORE);
3375 break;
3376 }
3377 case PL_S_FALSE:
3378 { PL_close_query(th->interactor.query);
3379 th->interactor.query = 0;
3380 done_interactor(th);
3381 LOCAL_LD = suspend_interactor(me, th);
3382
3383 return FALSE;
3384 }
3385 case PL_S_EXCEPTION:
3386 { record_t r = PL_record(PL_exception(th->interactor.query));
3387 term_t ex;
3388 int rc;
3389
3390 PL_close_query(th->interactor.query);
3391 th->interactor.query = 0;
3392 done_interactor(th);
3393 LOCAL_LD = suspend_interactor(me, th);
3394
3395 rc = ( (ex = PL_new_term_ref()) &&
3396 PL_recorded(r, ex) &&
3397 PL_raise_exception(ex) );
3398
3399 PL_erase(r);
3400 return rc;
3401 }
3402 case YIELD_ENGINE_YIELD: /* engine_yield/1 */
3403 { r = PL_record(PL_yielded(th->interactor.query));
3404 break;
3405 }
3406 default:
3407 { term_t ex = PL_new_term_ref();
3408
3409 return ( PL_put_integer(ex, rc) &&
3410 PL_domain_error("engine_yield_code", ex) );
3411 }
3412 }
3413
3414 LOCAL_LD = suspend_interactor(me, th);
3415 rc = ( (t=PL_new_term_ref()) &&
3416 PL_recorded(r, t) &&
3417 PL_unify(term, t) );
3418 PL_erase(r);
3419
3420 return rc;
3421 }
3422
3423 assert(0);
3424 return FALSE;
3425 }
3426
3427
3428 static atom_t
thread_symbol(const PL_local_data_t * ld)3429 thread_symbol(const PL_local_data_t *ld)
3430 { if ( ld->thread.info->is_engine )
3431 { const thread_handle *th = symbol_thread_handle(ld->thread.info->symbol);
3432 return th->interactor.thread;
3433 } else
3434 { return ld->thread.info->symbol;
3435 }
3436 }
3437
3438
3439 static int
interactor_post_answer(term_t ref,term_t package,term_t term ARG_LD)3440 interactor_post_answer(term_t ref, term_t package, term_t term ARG_LD)
3441 { thread_handle *th;
3442
3443 if ( get_interactor(ref, &th, TRUE PASS_LD) )
3444 { int rc;
3445
3446 simpleMutexLock(th->interactor.mutex);
3447 th->interactor.thread = thread_symbol(LD);
3448 rc = interactor_post_answer_nolock(th, ref, package, term PASS_LD);
3449 th->interactor.thread = NULL_ATOM;
3450 simpleMutexUnlock(th->interactor.mutex);
3451
3452 return rc;
3453 }
3454
3455 return FALSE;
3456 }
3457
3458
3459 /** engine_next(+Engine, -Next)
3460 */
3461
3462 static
3463 PRED_IMPL("engine_next", 2, engine_next, 0)
3464 { PRED_LD
3465
3466 return interactor_post_answer(A1, 0, A2 PASS_LD);
3467 }
3468
3469 /** engine_post(+Engine, +Term)
3470 */
3471
3472 static
3473 PRED_IMPL("engine_post", 2, engine_post, 0)
3474 { PRED_LD
3475 thread_handle *th;
3476
3477 if ( get_interactor(A1, &th, TRUE PASS_LD) )
3478 { int rc;
3479
3480 simpleMutexLock(th->interactor.mutex);
3481 if ( !th->interactor.package )
3482 { if ( (th->interactor.package = PL_record(A2)) )
3483 rc = TRUE;
3484 else
3485 rc = FALSE;
3486 } else
3487 { rc = PL_permission_error("post_to", "engine", A1);
3488 }
3489 simpleMutexUnlock(th->interactor.mutex);
3490
3491 return rc;
3492 }
3493
3494 return FALSE;
3495 }
3496
3497 /** engine_post(+Engine, +Term, -Next)
3498 */
3499
3500 static
3501 PRED_IMPL("engine_post", 3, engine_post, 0)
3502 { PRED_LD
3503
3504 return interactor_post_answer(A1, A2, A3 PASS_LD);
3505 }
3506
3507
3508 static
3509 PRED_IMPL("engine_fetch", 1, engine_fetch, 0)
3510 { PRED_LD
3511 PL_thread_info_t *info = LD->thread.info;
3512 term_t exv;
3513
3514 if ( info->is_engine )
3515 { thread_handle *th = symbol_thread_handle(info->symbol);
3516
3517 if ( th->interactor.package )
3518 { term_t tmp;
3519 int rc;
3520
3521 rc = ( (tmp = PL_new_term_ref()) &&
3522 PL_recorded(th->interactor.package, tmp) &&
3523 PL_unify(A1, tmp) );
3524 PL_erase(th->interactor.package);
3525 th->interactor.package = 0;
3526
3527 return rc;
3528 }
3529 }
3530
3531 return ( (exv = PL_new_term_refs(2)) &&
3532 PL_unify_atom_chars(exv+0, "delivery") &&
3533 unify_thread_id(exv+1, info) &&
3534 PL_error("engine_fetch", 1, "Use engine_post/2,3", ERR_EXISTENCE3,
3535 ATOM_term, exv+0, exv+1) );
3536 }
3537
3538
3539 static
3540 PRED_IMPL("is_engine", 1, is_engine, 0)
3541 { PRED_LD
3542 thread_handle *th;
3543
3544 return get_interactor(A1, &th, FALSE PASS_LD);
3545 }
3546
3547
3548 /*******************************
3549 * MESSAGE QUEUES *
3550 *******************************/
3551
3552 typedef enum
3553 { QUEUE_WAIT_READ, /* wait for message */
3554 QUEUE_WAIT_DRAIN /* wait for queue to drain */
3555 } queue_wait_type;
3556
3557 #define MSG_WAIT_INTR (-1)
3558 #define MSG_WAIT_TIMEOUT (-2)
3559 #define MSG_WAIT_DESTROYED (-3)
3560
3561 static int dispatch_cond_wait(message_queue *queue,
3562 queue_wait_type wait,
3563 struct timespec *deadline ARG_LD);
3564
3565 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3566 This code deals with telling other threads something. The interface:
3567
3568 thread_get_message(-Message)
3569 thread_send_message(+Id, +Message)
3570
3571 Queues can be waited for by multiple threads using different (partly)
3572 instantiated patterns for Message. For this reason all waiting threads
3573 should be restarted using pthread_cond_broadcast(). However, if there
3574 are a large number of workers only waiting for `any' message this will
3575 cause all of them to wakeup for only one to grab the message. This is
3576 highly undesirable and therefore the queue keeps two counts: the number
3577 of waiting threads and the number waiting with a variable. Only if they
3578 are not equal and there are multiple waiters we must be using broadcast.
3579 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3580
3581 typedef struct thread_message
3582 { struct thread_message *next; /* next in queue */
3583 record_t message; /* message in queue */
3584 word key; /* Indexing key */
3585 uint64_t sequence_id; /* Numbered sequence */
3586 } thread_message;
3587
3588
3589 static thread_message *
create_thread_message(term_t msg ARG_LD)3590 create_thread_message(term_t msg ARG_LD)
3591 { thread_message *msgp;
3592 record_t rec;
3593
3594 if ( !(rec=compileTermToHeap(msg, R_NOLOCK)) )
3595 return NULL;
3596
3597 if ( (msgp = allocHeap(sizeof(*msgp))) )
3598 { msgp->next = NULL;
3599 msgp->message = rec;
3600 msgp->key = getIndexOfTerm(msg);
3601 } else
3602 { freeRecord(rec);
3603 }
3604
3605 return msgp;
3606 }
3607
3608
3609 static void
free_thread_message(thread_message * msg)3610 free_thread_message(thread_message *msg)
3611 { if ( msg->message )
3612 freeRecord(msg->message);
3613
3614 freeHeap(msg, sizeof(*msg));
3615 }
3616
3617
3618 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3619 queue_message() adds a message to a message queue. The caller must hold
3620 the queue-mutex.
3621 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3622
3623 static int
queue_message(message_queue * queue,thread_message * msgp,struct timespec * deadline ARG_LD)3624 queue_message(message_queue *queue, thread_message *msgp,
3625 struct timespec *deadline ARG_LD)
3626 { if ( queue->max_size > 0 && queue->size >= queue->max_size )
3627 { queue->wait_for_drain++;
3628
3629 while ( queue->size >= queue->max_size )
3630 { switch ( dispatch_cond_wait(queue, QUEUE_WAIT_DRAIN, deadline PASS_LD) )
3631 { case CV_INTR:
3632 { if ( !LD ) /* needed for clean exit */
3633 { Sdprintf("Forced exit from queue_message()\n");
3634 exit(1);
3635 }
3636
3637 if ( is_signalled(LD) ) /* thread-signal */
3638 { queue->wait_for_drain--;
3639 return MSG_WAIT_INTR;
3640 }
3641 break;
3642 }
3643 case CV_TIMEDOUT:
3644 queue->wait_for_drain--;
3645 return MSG_WAIT_TIMEOUT;
3646 case CV_READY:
3647 case CV_MAYBE:
3648 break;
3649 default:
3650 assert(0); // should never happen
3651 }
3652 if ( queue->destroyed )
3653 { queue->wait_for_drain--;
3654 return MSG_WAIT_DESTROYED;
3655 }
3656 }
3657
3658 queue->wait_for_drain--;
3659 }
3660
3661 msgp->sequence_id = ++queue->sequence_next;
3662 if ( !queue->head )
3663 { queue->head = queue->tail = msgp;
3664 } else
3665 { queue->tail->next = msgp;
3666 queue->tail = msgp;
3667 }
3668 queue->size++;
3669
3670 if ( queue->waiting )
3671 { if ( queue->waiting > queue->waiting_var && queue->waiting > 1 )
3672 { DEBUG(MSG_QUEUE,
3673 Sdprintf("%d: %d of %d non-var waiters on %p; broadcasting\n",
3674 PL_thread_self(),
3675 queue->waiting - queue->waiting_var,
3676 queue->waiting,
3677 queue));
3678 cv_broadcast(&queue->cond_var);
3679 } else
3680 { DEBUG(MSG_QUEUE, Sdprintf("%d: %d waiters on %p; signalling\n",
3681 PL_thread_self(), queue->waiting, queue));
3682 cv_signal(&queue->cond_var);
3683 }
3684 } else
3685 { DEBUG(MSG_QUEUE, Sdprintf("%d: no waiters on %p\n",
3686 PL_thread_self(), queue));
3687 }
3688
3689 return TRUE;
3690 }
3691
3692
3693 /*******************************
3694 * READING FROM A QUEUE *
3695 *******************************/
3696
3697 #if TIME_WITH_SYS_TIME
3698 # include <sys/time.h>
3699 # include <time.h>
3700 #else
3701 # if HAVE_SYS_TIME_H
3702 # include <sys/time.h>
3703 # else
3704 # include <time.h>
3705 # endif
3706 #endif
3707 #ifdef HAVE_FTIME
3708 #include <sys/timeb.h>
3709 #endif
3710
3711 static void
timespec_diff(struct timespec * diff,const struct timespec * a,const struct timespec * b)3712 timespec_diff(struct timespec *diff,
3713 const struct timespec *a, const struct timespec *b)
3714 { diff->tv_sec = a->tv_sec - b->tv_sec;
3715 diff->tv_nsec = a->tv_nsec - b->tv_nsec;
3716 if ( diff->tv_nsec < 0 )
3717 { --diff->tv_sec;
3718 diff->tv_nsec += 1000000000;
3719 }
3720 }
3721
3722
3723 static int
timespec_sign(const struct timespec * t)3724 timespec_sign(const struct timespec *t)
3725 { return ( t->tv_sec > 0 ? 1 :
3726 t->tv_sec < 0 ? -1 :
3727 t->tv_nsec > 0 ? 1 :
3728 0 );
3729
3730 }
3731
3732
3733 static int
timespec_cmp(const struct timespec * a,const struct timespec * b)3734 timespec_cmp(const struct timespec *a, const struct timespec *b)
3735 { struct timespec diff;
3736
3737 timespec_diff(&diff, a, b);
3738
3739 return timespec_sign(&diff);
3740 }
3741
3742
3743 void
get_current_timespec(struct timespec * time)3744 get_current_timespec(struct timespec *time)
3745 {
3746 #ifdef HAVE_CLOCK_GETTIME
3747 clock_gettime(CLOCK_REALTIME, time);
3748 #else
3749 #ifdef HAVE_GETTIMEOFDAY
3750 struct timeval now;
3751
3752 gettimeofday(&now, NULL);
3753 time->tv_sec = now.tv_sec;
3754 time->tv_nsec = now.tv_usec*1000;
3755 #else /* HAVE_FTIME */
3756 struct timeb now;
3757
3758 ftime(&now);
3759 time->tv_sec = now.time;
3760 time->tv_nsec = now.millitm*1000000;
3761 #endif
3762 #endif
3763 }
3764
3765
3766 void
carry_timespec_nanos(struct timespec * time)3767 carry_timespec_nanos(struct timespec *time)
3768 { while ( time->tv_nsec >= 1000000000 )
3769 { time->tv_nsec -= 1000000000;
3770 time->tv_sec += 1;
3771 }
3772 }
3773
3774 #ifdef __WINDOWS__
3775
3776 int
cv_timedwait(message_queue * queue,CONDITION_VARIABLE * cond,CRITICAL_SECTION * mutex,struct timespec * deadline)3777 cv_timedwait(message_queue *queue,
3778 CONDITION_VARIABLE *cond, CRITICAL_SECTION *mutex,
3779 struct timespec *deadline)
3780 { GET_LD
3781 struct timespec tmp_timeout;
3782 DWORD api_timeout = 250;
3783 int last = FALSE;
3784 int rc;
3785
3786 for(;;)
3787 { get_current_timespec(&tmp_timeout);
3788 tmp_timeout.tv_nsec += 250000000;
3789 carry_timespec_nanos(&tmp_timeout);
3790
3791 if ( deadline && timespec_cmp(&tmp_timeout, deadline) >= 0 )
3792 { struct timespec d;
3793
3794 get_current_timespec(&tmp_timeout);
3795 timespec_diff(&d, deadline, &tmp_timeout);
3796 if ( timespec_sign(&d) > 0 )
3797 api_timeout = (d.tv_nsec + 1000000 - 1) / 1000000;
3798 else
3799 return CV_TIMEDOUT;
3800
3801 last = TRUE;
3802 }
3803
3804 rc = SleepConditionVariableCS(cond, mutex, api_timeout);
3805
3806 if ( is_signalled(LD) )
3807 return CV_INTR;
3808 if ( !rc )
3809 return last ? CV_TIMEDOUT : CV_MAYBE;
3810 if ( rc )
3811 return CV_READY;
3812 }
3813 }
3814
3815 #else /*__WINDOWS__*/
3816
3817 /* return: 0: ok, EINTR: interrupted, ETIMEDOUT: timeout
3818 */
3819
3820 int
cv_timedwait(message_queue * queue,pthread_cond_t * cond,pthread_mutex_t * mutex,struct timespec * deadline)3821 cv_timedwait(message_queue *queue,
3822 pthread_cond_t *cond, pthread_mutex_t *mutex,
3823 struct timespec *deadline)
3824 { GET_LD
3825 struct timespec tmp_timeout;
3826 struct timespec *api_timeout = &tmp_timeout;
3827 int rc;
3828
3829 for(;;)
3830 { get_current_timespec(&tmp_timeout);
3831 tmp_timeout.tv_nsec += 250000000;
3832 carry_timespec_nanos(&tmp_timeout);
3833
3834 if ( deadline && timespec_cmp(&tmp_timeout, deadline) >= 0 )
3835 api_timeout = deadline;
3836
3837 rc = pthread_cond_timedwait(cond, mutex, api_timeout);
3838 DEBUG(MSG_QUEUE_WAIT,
3839 Sdprintf("%d: wait on %p returned %d; size = %ld (%s)\n",
3840 PL_thread_self(), queue, rc,
3841 queue ? (long)queue->size : 0,
3842 api_timeout == deadline ? "final" : "0.25sec"));
3843
3844 switch( rc )
3845 { case ETIMEDOUT:
3846 if ( is_signalled(LD) )
3847 return CV_INTR;
3848 if ( api_timeout == deadline )
3849 return CV_TIMEDOUT;
3850 return CV_MAYBE;
3851 case EINTR: /* can not happen in POSIX, but can in */
3852 case 0: /* legacy systems */
3853 if ( is_signalled(LD) )
3854 return CV_INTR;
3855 return CV_READY;
3856 default:
3857 assert(0);
3858 return CV_READY;
3859 }
3860 }
3861 }
3862
3863 #endif /*__WINDOWS__*/
3864
3865 static int
dispatch_cond_wait(message_queue * queue,queue_wait_type wait,struct timespec * deadline ARG_LD)3866 dispatch_cond_wait(message_queue *queue, queue_wait_type wait,
3867 struct timespec *deadline ARG_LD)
3868 { int rc;
3869
3870 LD->thread.alert.obj.queue = queue;
3871 LD->thread.alert.type = wait == QUEUE_WAIT_READ ? ALERT_QUEUE_RD
3872 : ALERT_QUEUE_WR;
3873
3874 rc = cv_timedwait(queue,
3875 (wait == QUEUE_WAIT_READ ? &queue->cond_var
3876 : &queue->drain_var),
3877 &queue->mutex,
3878 deadline);
3879
3880 PL_LOCK(L_ALERT);
3881 LD->thread.alert.type = 0;
3882 LD->thread.alert.obj.queue = NULL;
3883 PL_UNLOCK(L_ALERT);
3884
3885 return rc;
3886 }
3887
3888 #ifdef O_QUEUE_STATS
3889 static uint64_t getmsg = 0;
3890 static uint64_t unified = 0;
3891 static uint64_t skipped = 0;
3892
3893 static void
msg_statistics(void)3894 msg_statistics(void)
3895 { Sdprintf("get_message: %lld, unified: %lld, skipped: %lld\n",
3896 getmsg, unified, skipped);
3897 }
3898 #define QSTAT(n) (n++)
3899 #else
3900 #define QSTAT(n) ((void)0)
3901 #endif
3902
3903 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3904 get_message() reads the next message from the message queue. It must be
3905 called with queue->mutex locked. It returns one of
3906
3907 * TRUE
3908 * FALSE
3909 * MSG_WAIT_INTR
3910 Got EINTR while waiting.
3911 * MSG_WAIT_TIMEOUT
3912 Got timeout while waiting
3913 * MSG_WAIT_DESTROYED
3914 Queue was destroyed while waiting
3915
3916 (*) We need to lock because AGC marks our atoms while the thread is
3917 running. The thread may pick a message containing an atom from the
3918 queue, which now has not been marked and is no longer part of the
3919 queue, so it isn't marked in the queue either.
3920
3921 Originally, I thought we only need this for queues that are not
3922 associated to threads. Those associated with a thread mark both the
3923 stacks and the queue in one pass, marking the atoms in either. However,
3924 we also need to lock to avoid get_message() destroying the record while
3925 markAtomsMessageQueue() scans it. This fixes the reopened Bug#142.
3926 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3927
3928 static int
get_message(message_queue * queue,term_t msg,struct timespec * deadline ARG_LD)3929 get_message(message_queue *queue, term_t msg, struct timespec *deadline ARG_LD)
3930 { int isvar = PL_is_variable(msg) ? 1 : 0;
3931 word key = (isvar ? 0L : getIndexOfTerm(msg));
3932 fid_t fid = PL_open_foreign_frame();
3933 uint64_t seen = 0;
3934
3935 QSTAT(getmsg);
3936
3937 DEBUG(MSG_QUEUE,
3938 { Sdprintf("%d: get_message(%p) key 0x%lx for ",
3939 PL_thread_self(), queue, key);
3940 PL_write_term(Serror, msg, 1200, PL_WRT_QUOTED|PL_WRT_NEWLINE);
3941 });
3942
3943 for(;;)
3944 { int rc;
3945 thread_message *msgp = queue->head;
3946 thread_message *prev = NULL;
3947
3948 if ( queue->destroyed )
3949 return MSG_WAIT_DESTROYED;
3950
3951 DEBUG(MSG_QUEUE,
3952 Sdprintf("%d: queue size=%ld\n",
3953 PL_thread_self(), (long)queue->size));
3954
3955 for( ; msgp; prev = msgp, msgp = msgp->next )
3956 { term_t tmp;
3957
3958 if ( msgp->sequence_id < seen )
3959 { QSTAT(skipped);
3960 DEBUG(MSG_QUEUE, Sdprintf("Already seen %ld (<%ld)\n",
3961 (long)msgp->sequence_id, (long)seen));
3962 continue;
3963 }
3964 seen = msgp->sequence_id;
3965
3966 if ( key && msgp->key && key != msgp->key )
3967 { DEBUG(MSG_QUEUE, Sdprintf("Message key mismatch\n"));
3968 continue; /* fast search */
3969 }
3970
3971 QSTAT(unified);
3972 tmp = PL_new_term_ref();
3973 if ( !PL_recorded(msgp->message, tmp) )
3974 { PL_discard_foreign_frame(fid);
3975 return raiseStackOverflow(GLOBAL_OVERFLOW);
3976 }
3977 DEBUG(MSG_QUEUE,
3978 { Sdprintf("%d: found term ", PL_thread_self());
3979 PL_write_term(Serror, tmp, 1200, PL_WRT_QUOTED|PL_WRT_NEWLINE);
3980 });
3981
3982 rc = PL_unify(msg, tmp);
3983
3984 if ( rc )
3985 { term_t ex = PL_new_term_ref();
3986
3987 if ( !(rc=foreignWakeup(ex PASS_LD)) )
3988 { if ( !isVar(*valTermRef(ex)) )
3989 PL_raise_exception(ex);
3990 }
3991 }
3992
3993 if ( rc )
3994 { DEBUG(MSG_QUEUE, Sdprintf("%d: match\n", PL_thread_self()));
3995
3996 if (GD->atoms.gc_active)
3997 markAtomsRecord(msgp->message);
3998
3999 simpleMutexLock(&queue->gc_mutex); /* see (*) */
4000 if ( prev )
4001 { if ( !(prev->next = msgp->next) )
4002 queue->tail = prev;
4003 } else
4004 { if ( !(queue->head = msgp->next) )
4005 queue->tail = NULL;
4006 }
4007 simpleMutexUnlock(&queue->gc_mutex);
4008
4009 free_thread_message(msgp);
4010 queue->size--;
4011 if ( queue->wait_for_drain )
4012 { DEBUG(MSG_QUEUE, Sdprintf("Queue drained. wakeup writers\n"));
4013 cv_signal(&queue->drain_var);
4014 }
4015
4016 PL_close_foreign_frame(fid);
4017 return TRUE;
4018 } else if ( exception_term )
4019 { PL_close_foreign_frame(fid);
4020 return FALSE;
4021 }
4022
4023 PL_rewind_foreign_frame(fid);
4024 }
4025
4026 queue->waiting++;
4027 queue->waiting_var += isvar;
4028 DEBUG(MSG_QUEUE_WAIT, Sdprintf("%d: waiting on queue\n", PL_thread_self()));
4029 rc = dispatch_cond_wait(queue, QUEUE_WAIT_READ, deadline PASS_LD);
4030 switch ( rc )
4031 { case CV_INTR:
4032 { DEBUG(MSG_QUEUE_WAIT, Sdprintf("%d: CV_INTR\n", PL_thread_self()));
4033
4034 if ( !LD ) /* needed for clean exit */
4035 { Sdprintf("Forced exit from get_message()\n");
4036 exit(1);
4037 }
4038
4039 if ( is_signalled(LD) ) /* thread-signal */
4040 { queue->waiting--;
4041 queue->waiting_var -= isvar;
4042 PL_discard_foreign_frame(fid);
4043 return MSG_WAIT_INTR;
4044 }
4045 break;
4046 }
4047 case CV_TIMEDOUT:
4048 { DEBUG(MSG_QUEUE_WAIT, Sdprintf("%d: CV_TIMEDOUT\n", PL_thread_self()));
4049
4050 queue->waiting--;
4051 queue->waiting_var -= isvar;
4052 PL_discard_foreign_frame(fid);
4053 return MSG_WAIT_TIMEOUT;
4054 }
4055 case CV_READY:
4056 case CV_MAYBE:
4057 DEBUG(MSG_QUEUE_WAIT,
4058 Sdprintf("%d: wakeup (%d) on queue\n",
4059 PL_thread_self(), rc));
4060 break;
4061 default:
4062 assert(0);
4063 }
4064 queue->waiting--;
4065 queue->waiting_var -= isvar;
4066 }
4067 }
4068
4069
4070 static int
peek_message(message_queue * queue,term_t msg ARG_LD)4071 peek_message(message_queue *queue, term_t msg ARG_LD)
4072 { thread_message *msgp;
4073 term_t tmp = PL_new_term_ref();
4074 word key = getIndexOfTerm(msg);
4075 fid_t fid = PL_open_foreign_frame();
4076
4077 msgp = queue->head;
4078
4079 for( msgp = queue->head; msgp; msgp = msgp->next )
4080 { if ( key && msgp->key && key != msgp->key )
4081 continue;
4082
4083 if ( !PL_recorded(msgp->message, tmp) )
4084 return raiseStackOverflow(GLOBAL_OVERFLOW);
4085
4086 if ( PL_unify(msg, tmp) )
4087 return TRUE;
4088 else if ( exception_term )
4089 return FALSE;
4090
4091 PL_rewind_foreign_frame(fid);
4092 }
4093
4094 return FALSE;
4095 }
4096
4097
4098 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
4099 Deletes the contents of the message-queue as well as the queue itself.
4100 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
4101
4102 static void
destroy_message_queue(message_queue * queue)4103 destroy_message_queue(message_queue *queue)
4104 { thread_message *msgp;
4105 thread_message *next;
4106
4107 if ( GD->cleaning || !queue->initialized )
4108 return; /* deallocation is centralised */
4109 queue->initialized = FALSE;
4110
4111 assert(!queue->waiting && !queue->wait_for_drain);
4112
4113 for( msgp = queue->head; msgp; msgp = next )
4114 { next = msgp->next;
4115
4116 free_thread_message(msgp);
4117 }
4118
4119 simpleMutexDelete(&queue->gc_mutex);
4120 cv_destroy(&queue->cond_var);
4121 if ( queue->max_size > 0 )
4122 cv_destroy(&queue->drain_var);
4123 if ( !queue->anonymous )
4124 simpleMutexDelete(&queue->mutex);
4125 }
4126
4127
4128 /* destroy the input queue of a thread. We have to take care of the case
4129 where our input queue is been waited for by another thread. This is
4130 similar to message_queue_destroy/1.
4131 */
4132
4133 static void
destroy_thread_message_queue(message_queue * q)4134 destroy_thread_message_queue(message_queue *q)
4135 { int done = FALSE;
4136
4137 if (!q->initialized )
4138 return;
4139
4140 while(!done)
4141 { simpleMutexLock(&q->mutex);
4142 q->destroyed = TRUE;
4143 if ( q->waiting || q->wait_for_drain )
4144 { if ( q->waiting )
4145 cv_broadcast(&q->cond_var);
4146 if ( q->wait_for_drain )
4147 cv_broadcast(&q->drain_var);
4148 } else
4149 done = TRUE;
4150 simpleMutexUnlock(&q->mutex);
4151 }
4152
4153 destroy_message_queue(q);
4154 }
4155
4156
4157 static void
init_message_queue(message_queue * queue,size_t max_size)4158 init_message_queue(message_queue *queue, size_t max_size)
4159 { memset(queue, 0, sizeof(*queue));
4160 simpleMutexInit(&queue->mutex);
4161 simpleMutexInit(&queue->gc_mutex);
4162 cv_init(&queue->cond_var, NULL);
4163 queue->max_size = max_size;
4164 if ( queue->max_size != 0 )
4165 cv_init(&queue->drain_var, NULL);
4166 queue->initialized = TRUE;
4167 }
4168
4169
4170 static size_t
sizeof_message_queue(message_queue * queue)4171 sizeof_message_queue(message_queue *queue)
4172 { size_t size = 0;
4173 thread_message *msgp;
4174
4175 simpleMutexLock(&queue->gc_mutex);
4176 for( msgp = queue->head; msgp; msgp = msgp->next )
4177 { size += sizeof(*msgp);
4178 size += msgp->message->size;
4179 }
4180 simpleMutexUnlock(&queue->gc_mutex);
4181
4182 return size;
4183 }
4184
4185 /* Prolog predicates */
4186
4187 static const opt_spec thread_get_message_options[] =
4188 { { ATOM_timeout, OPT_DOUBLE },
4189 { ATOM_deadline, OPT_DOUBLE },
4190 { NULL_ATOM, 0 }
4191 };
4192
4193 #ifndef DBL_MAX
4194 #define DBL_MAX 1.7976931348623158e+308
4195 #endif
4196
4197 /* This function is shared between thread_get_message/3 and
4198 thread_send_message/3.
4199
4200 It extracts a deadline from the deadline/1 and timeout/1 options.
4201 In both cases, the deadline is passed through to dispatch_cond_wait().
4202 Semantics are relatively simple:
4203
4204 1. If neither option is given, the deadline is NULL, which
4205 corresponds to an indefinite wait, or a deadline in the
4206 infinite future.
4207 2. A timeout is _exactly_ like a deadline of Now + Timeout,
4208 where Now is evaluated near the beginning of this function.
4209 3. If both deadline and a timeout options are given, the
4210 earlier deadline is effective.
4211 4. If the effective deadline is before Now, then return
4212 FALSE (leading to failure).
4213 */
4214
4215 static int
process_deadline_options(term_t options,struct timespec * ts,struct timespec ** pts)4216 process_deadline_options(term_t options,
4217 struct timespec *ts, struct timespec **pts)
4218 { struct timespec now;
4219 struct timespec deadline;
4220 struct timespec timeout;
4221 struct timespec *dlop=NULL;
4222 double tmo = DBL_MAX;
4223 double dlo = DBL_MAX;
4224
4225 if ( !scan_options(options, 0,
4226 ATOM_thread_get_message_option, thread_get_message_options,
4227 &tmo, &dlo) )
4228 return FALSE;
4229
4230 get_current_timespec(&now);
4231
4232 if ( dlo != DBL_MAX )
4233 { double ip, fp;
4234
4235 fp = modf(dlo, &ip);
4236 deadline.tv_sec = (time_t)ip;
4237 deadline.tv_nsec = (long)(fp*1000000000.0);
4238 dlop = &deadline;
4239
4240 if ( timespec_cmp(&deadline,&now) < 0 ) // if deadline in the past...
4241 return FALSE; // ... then FAIL
4242 }
4243
4244 // timeout option is processed exactly as if the deadline
4245 // was set to now + timeout.
4246 if ( tmo != DBL_MAX )
4247 { if ( tmo > 0.0 )
4248 { double ip, fp=modf(tmo,&ip);
4249
4250 timeout.tv_sec = now.tv_sec + (time_t)ip;
4251 timeout.tv_nsec = now.tv_nsec + (long)(fp*1000000000.0);
4252 carry_timespec_nanos(&timeout);
4253 if ( dlop==NULL || timespec_cmp(&timeout,&deadline) < 0 )
4254 dlop = &timeout;
4255 } else if ( tmo == 0.0 )
4256 { dlop = &now; /* scan once */
4257 } else // if timeout is negative ...
4258 return FALSE; // ... then FAIL
4259 }
4260 if (dlop)
4261 { *ts = *dlop;
4262 *pts = ts;
4263 } else
4264 { *pts=NULL;
4265 }
4266
4267 return TRUE;
4268 }
4269
4270
4271 static int
wait_queue_message(term_t qterm,message_queue * q,thread_message * msg,struct timespec * deadline ARG_LD)4272 wait_queue_message(term_t qterm, message_queue *q, thread_message *msg,
4273 struct timespec *deadline ARG_LD)
4274 { int rc;
4275
4276 for(;;)
4277 { rc = queue_message(q, msg, deadline PASS_LD);
4278
4279 switch(rc)
4280 { case MSG_WAIT_INTR:
4281 { if ( PL_handle_signals() >= 0 )
4282 continue;
4283 rc = FALSE;
4284 break;
4285 }
4286 case MSG_WAIT_DESTROYED:
4287 { if ( qterm )
4288 PL_existence_error("message_queue", qterm);
4289 rc = FALSE;
4290 break;
4291 }
4292 case MSG_WAIT_TIMEOUT:
4293 rc = FALSE;
4294 break;
4295 case TRUE:
4296 break;
4297 default:
4298 assert(0);
4299 }
4300
4301 break;
4302 }
4303
4304 return rc;
4305 }
4306
4307 static int
thread_send_message__LD(term_t queue,term_t msgterm,struct timespec * deadline ARG_LD)4308 thread_send_message__LD(term_t queue, term_t msgterm,
4309 struct timespec *deadline ARG_LD)
4310 { message_queue *q;
4311 thread_message *msg;
4312 int rc;
4313
4314 if ( !(msg = create_thread_message(msgterm PASS_LD)) )
4315 return PL_no_memory();
4316
4317 if ( !get_message_queue__LD(queue, &q PASS_LD) )
4318 { free_thread_message(msg);
4319 return FALSE;
4320 }
4321
4322 rc = wait_queue_message(queue, q, msg, deadline PASS_LD);
4323 release_message_queue(q);
4324
4325 if ( rc == FALSE )
4326 free_thread_message(msg);
4327
4328 return rc;
4329 }
4330
4331 static
4332 PRED_IMPL("thread_send_message", 2, thread_send_message, PL_FA_ISO)
4333 { PRED_LD
4334
4335 return thread_send_message__LD(A1, A2, NULL PASS_LD);
4336 }
4337
4338 static
4339 PRED_IMPL("thread_send_message", 3, thread_send_message, 0)
4340 { PRED_LD
4341 struct timespec deadline;
4342 struct timespec *dlop=NULL;
4343
4344 return process_deadline_options(A3,&deadline,&dlop)
4345 && thread_send_message__LD(A1, A2, dlop PASS_LD);
4346 }
4347
4348
4349
4350 static
4351 PRED_IMPL("thread_get_message", 1, thread_get_message, PL_FA_ISO)
4352 { PRED_LD
4353 int rc;
4354
4355 for(;;)
4356 { simpleMutexLock(&LD->thread.messages.mutex);
4357 rc = get_message(&LD->thread.messages, A1, NULL PASS_LD);
4358 simpleMutexUnlock(&LD->thread.messages.mutex);
4359
4360 if ( rc == MSG_WAIT_INTR )
4361 { if ( PL_handle_signals() >= 0 )
4362 continue;
4363 rc = FALSE;
4364 }
4365
4366 break;
4367 }
4368
4369 return rc;
4370 }
4371
4372
4373 static
4374 PRED_IMPL("thread_peek_message", 1, thread_peek_message_1, PL_FA_ISO)
4375 { PRED_LD
4376 int rc;
4377
4378 simpleMutexLock(&LD->thread.messages.mutex);
4379 rc = peek_message(&LD->thread.messages, A1 PASS_LD);
4380 simpleMutexUnlock(&LD->thread.messages.mutex);
4381
4382 return rc;
4383 }
4384
4385
4386 /*******************************
4387 * USER MESSAGE QUEUES *
4388 *******************************/
4389
4390 typedef struct mqref
4391 { message_queue *queue;
4392 } mqref;
4393
4394 static int
write_message_queue_ref(IOSTREAM * s,atom_t aref,int flags)4395 write_message_queue_ref(IOSTREAM *s, atom_t aref, int flags)
4396 { mqref *ref = PL_blob_data(aref, NULL, NULL);
4397 (void)flags;
4398
4399 Sfprintf(s, "<message_queue>(%p)", ref->queue);
4400 return TRUE;
4401 }
4402
4403
4404 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
4405 GC a message queue from the atom garbage collector. This should be fine
4406 because atoms in messages do not have locked atoms, so we are not
4407 calling atom functions.
4408 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
4409
4410 static int
release_message_queue_ref(atom_t aref)4411 release_message_queue_ref(atom_t aref)
4412 { mqref *ref = PL_blob_data(aref, NULL, NULL);
4413 message_queue *q;
4414
4415 DEBUG(MSG_QUEUE_GC,
4416 Sdprintf("GC message_queue %p\n", ref->queue));
4417
4418 if ( (q=ref->queue) )
4419 { destroy_message_queue(q); /* can be called twice */
4420 if ( !q->destroyed )
4421 deleteHTable(queueTable, (void *)q->id);
4422 simpleMutexDelete(&q->mutex);
4423 PL_free(q);
4424 }
4425
4426 return TRUE;
4427 }
4428
4429
4430 static int
save_message_queue(atom_t aref,IOSTREAM * fd)4431 save_message_queue(atom_t aref, IOSTREAM *fd)
4432 { mqref *ref = PL_blob_data(aref, NULL, NULL);
4433 (void)fd;
4434
4435 return PL_warning("Cannot save reference to <message_queue>(%p)", ref->queue);
4436 }
4437
4438
4439 static atom_t
load_message_queue(IOSTREAM * fd)4440 load_message_queue(IOSTREAM *fd)
4441 { (void)fd;
4442
4443 return PL_new_atom("<saved-message-queue-ref>");
4444 }
4445
4446
4447 static PL_blob_t message_queue_blob =
4448 { PL_BLOB_MAGIC,
4449 PL_BLOB_UNIQUE,
4450 "message_queue",
4451 release_message_queue_ref,
4452 NULL,
4453 write_message_queue_ref,
4454 NULL,
4455 save_message_queue,
4456 load_message_queue
4457 };
4458
4459
4460 static void
initMessageQueues(void)4461 initMessageQueues(void)
4462 { message_queue_blob.atom_name = ATOM_message_queue;
4463 PL_register_blob_type(&message_queue_blob);
4464 }
4465
4466 static int
unify_queue(term_t t,message_queue * q)4467 unify_queue(term_t t, message_queue *q)
4468 { GET_LD
4469
4470 return PL_unify_atom(t, q->id);
4471 }
4472
4473
4474 static void
free_queue_symbol(void * name,void * value)4475 free_queue_symbol(void *name, void *value)
4476 { message_queue *q = value;
4477
4478 destroy_message_queue(q); /* must this be synced? */
4479 PL_free(q);
4480 }
4481
4482
4483 static message_queue *
unlocked_message_queue_create(term_t queue,long max_size)4484 unlocked_message_queue_create(term_t queue, long max_size)
4485 { GET_LD
4486 atom_t name = NULL_ATOM;
4487 message_queue *q;
4488 word id;
4489
4490 if ( !queueTable )
4491 { simpleMutexInit(&queueTable_mutex);
4492 queueTable = newHTable(16);
4493 queueTable->free_symbol = free_queue_symbol;
4494 }
4495
4496 if ( PL_get_atom(queue, &name) )
4497 { if ( lookupHTable(queueTable, (void *)name) ||
4498 lookupHTable(threadTable, (void *)name) )
4499 { PL_error("message_queue_create", 1, NULL, ERR_PERMISSION,
4500 ATOM_create, ATOM_message_queue, queue);
4501 return NULL;
4502 }
4503 id = name;
4504 } else if ( PL_is_variable(queue) )
4505 { id = 0;
4506 } else
4507 { PL_error("message_queue_create", 1, NULL,
4508 ERR_TYPE, ATOM_message_queue, queue);
4509 return NULL;
4510 }
4511
4512 q = PL_malloc(sizeof(*q));
4513 init_message_queue(q, max_size);
4514 q->type = QTYPE_QUEUE;
4515 if ( !id )
4516 { mqref ref;
4517 int new;
4518
4519 ref.queue = q;
4520 q->id = lookupBlob((void*)&ref, sizeof(ref), &message_queue_blob, &new);
4521 q->anonymous = TRUE;
4522 } else
4523 { q->id = id;
4524 }
4525 addNewHTable(queueTable, (void *)q->id, q);
4526
4527 if ( unify_queue(queue, q) )
4528 { if ( q->anonymous )
4529 PL_unregister_atom(q->id); /* reclaim on GC */
4530 else
4531 PL_register_atom(q->id); /* protect against reclaim */
4532 return q;
4533 }
4534
4535 return NULL;
4536 }
4537
4538
4539 /* MT: Caller must hold the L_THREAD mutex
4540
4541 Note that this version does not deal with anonymous queues. High
4542 level code must use get_message_queue__LD();
4543 */
4544
4545 static int
get_message_queue_unlocked__LD(term_t t,message_queue ** queue ARG_LD)4546 get_message_queue_unlocked__LD(term_t t, message_queue **queue ARG_LD)
4547 { atom_t name;
4548 word id = 0;
4549 int tid = 0;
4550
4551 if ( PL_get_atom(t, &name) )
4552 { thread_handle *er;
4553
4554 if ( (er=symbol_thread_handle(name)) )
4555 { if ( er->info )
4556 tid = er->engine_id;
4557 else
4558 return PL_existence_error("thread", t);
4559 } else
4560 id = name;
4561 } else if ( !PL_get_integer(t, &tid) )
4562 { return PL_type_error("message_queue", t);
4563 }
4564
4565 if ( tid > 0 )
4566 { have_tid:
4567 if ( tid >= 1 && tid <= GD->thread.highest_id )
4568 { PL_thread_info_t *info = GD->thread.threads[tid];
4569
4570 if ( info->status == PL_THREAD_UNUSED ||
4571 info->status == PL_THREAD_RESERVED ||
4572 !info->thread_data )
4573 return PL_existence_error("thread", t);
4574
4575 *queue = &info->thread_data->thread.messages;
4576 return TRUE;
4577 }
4578
4579 return PL_type_error("thread", t);
4580 }
4581
4582 if ( !id )
4583 return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_message_queue, t);
4584
4585 if ( queueTable )
4586 { message_queue *q;
4587 if ( (q = lookupHTable(queueTable, (void *)id)) )
4588 { *queue = q;
4589 return TRUE;
4590 }
4591 }
4592 if ( threadTable )
4593 { word w;
4594
4595 if ( (w = (word)lookupHTable(threadTable, (void *)id)) )
4596 { thread_handle *th;
4597
4598 if ( (th=symbol_thread_handle(w)) )
4599 { tid = th->engine_id;
4600 goto have_tid;
4601 }
4602 }
4603 }
4604
4605 return PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, t);
4606 }
4607
4608
4609 /* Get a message queue and lock it
4610 */
4611
4612 static int
get_message_queue__LD(term_t t,message_queue ** queue ARG_LD)4613 get_message_queue__LD(term_t t, message_queue **queue ARG_LD)
4614 { int rc;
4615 message_queue *q;
4616 PL_blob_t *type;
4617 void *data;
4618
4619 if ( PL_get_blob(t, &data, NULL, &type) && type == &message_queue_blob )
4620 { mqref *ref = data;
4621
4622 q = ref->queue;
4623 simpleMutexLock(&q->mutex);
4624 if ( !q->destroyed )
4625 { *queue = q;
4626 return TRUE;
4627 }
4628 simpleMutexUnlock(&q->mutex);
4629 return PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, t);
4630 }
4631
4632 PL_LOCK(L_THREAD);
4633 rc = get_message_queue_unlocked__LD(t, queue PASS_LD);
4634 if ( rc )
4635 { message_queue *q = *queue;
4636
4637 simpleMutexLock(&q->mutex);
4638 if ( q->destroyed )
4639 { rc = PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, t);
4640 simpleMutexUnlock(&q->mutex);
4641 }
4642 }
4643 PL_UNLOCK(L_THREAD);
4644
4645 return rc;
4646 }
4647
4648
4649 /* Release a message queue, deleting it if it is no longer needed
4650 */
4651
4652 static void
release_message_queue(message_queue * queue)4653 release_message_queue(message_queue *queue)
4654 { int del = ( queue->destroyed &&
4655 !(queue->waiting || queue->wait_for_drain) &&
4656 queue->type != QTYPE_THREAD );
4657
4658 simpleMutexUnlock(&queue->mutex);
4659
4660 if ( del )
4661 { destroy_message_queue(queue);
4662 if ( !queue->anonymous )
4663 PL_free(queue);
4664 }
4665 }
4666
4667
4668 static
4669 PRED_IMPL("message_queue_create", 1, message_queue_create, 0)
4670 { int rval;
4671
4672 PL_LOCK(L_THREAD);
4673 rval = (unlocked_message_queue_create(A1, 0) ? TRUE : FALSE);
4674 PL_UNLOCK(L_THREAD);
4675
4676 return rval;
4677 }
4678
4679
4680 static const opt_spec message_queue_options[] =
4681 { { ATOM_alias, OPT_ATOM },
4682 { ATOM_max_size, OPT_SIZE },
4683 { NULL_ATOM, 0 }
4684 };
4685
4686
4687 static
4688 PRED_IMPL("message_queue_create", 2, message_queue_create2, 0)
4689 { PRED_LD
4690 atom_t alias = 0;
4691 size_t max_size = 0; /* to be processed */
4692 message_queue *q;
4693
4694 if ( !scan_options(A2, 0,
4695 ATOM_queue_option, message_queue_options,
4696 &alias,
4697 &max_size) )
4698 fail;
4699
4700 if ( alias )
4701 { if ( !PL_unify_atom(A1, alias) )
4702 return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_variable, A1);
4703 }
4704
4705 PL_LOCK(L_THREAD);
4706 q = unlocked_message_queue_create(A1, max_size);
4707 PL_UNLOCK(L_THREAD);
4708
4709 return q ? TRUE : FALSE;
4710 }
4711
4712
4713 static
4714 PRED_IMPL("message_queue_destroy", 1, message_queue_destroy, 0)
4715 { PRED_LD
4716 message_queue *q;
4717
4718 if ( !get_message_queue__LD(A1, &q PASS_LD) )
4719 return FALSE;
4720
4721 if ( q->type == QTYPE_THREAD )
4722 { release_message_queue(q);
4723
4724 return PL_error(NULL, 0, "is a thread-queue", ERR_PERMISSION,
4725 ATOM_destroy, ATOM_message_queue, A1);
4726 }
4727
4728 simpleMutexLock(&queueTable_mutex); /* see markAtomsMessageQueues() */
4729 deleteHTable(queueTable, (void*)q->id);
4730 simpleMutexUnlock(&queueTable_mutex);
4731
4732 if ( !q->anonymous )
4733 PL_unregister_atom(q->id);
4734
4735 simpleMutexLock(&q->gc_mutex); /* see markAtomsMessageQueues() */
4736 q->destroyed = TRUE;
4737 simpleMutexUnlock(&q->gc_mutex);
4738
4739 if ( q->waiting )
4740 cv_broadcast(&q->cond_var);
4741 if ( q->wait_for_drain )
4742 cv_broadcast(&q->drain_var);
4743
4744 release_message_queue(q);
4745
4746 return TRUE;
4747 }
4748
4749
4750 /*******************************
4751 * MESSAGE QUEUE PROPERTY *
4752 *******************************/
4753
4754 static int /* message_queue_property(Queue, alias(Name)) */
message_queue_alias_property(message_queue * q,term_t prop ARG_LD)4755 message_queue_alias_property(message_queue *q, term_t prop ARG_LD)
4756 { if ( !q->anonymous )
4757 return PL_unify_atom(prop, q->id);
4758
4759 fail;
4760 }
4761
4762
4763 static int /* message_queue_property(Queue, size(Size)) */
message_queue_size_property(message_queue * q,term_t prop ARG_LD)4764 message_queue_size_property(message_queue *q, term_t prop ARG_LD)
4765 { return PL_unify_integer(prop, q->size);
4766 }
4767
4768
4769 static int /* message_queue_property(Queue, max_size(Size)) */
message_queue_max_size_property(message_queue * q,term_t prop ARG_LD)4770 message_queue_max_size_property(message_queue *q, term_t prop ARG_LD)
4771 { size_t ms;
4772
4773 if ( (ms=q->max_size) > 0 )
4774 return PL_unify_integer(prop, ms);
4775
4776 fail;
4777 }
4778
4779 static int /* message_queue_property(Queue, waiting(Count)) */
message_queue_waiting_property(message_queue * q,term_t prop ARG_LD)4780 message_queue_waiting_property(message_queue *q, term_t prop ARG_LD)
4781 { int waiting;
4782
4783 if ( (waiting=q->waiting) > 0 )
4784 return PL_unify_integer(prop, waiting);
4785
4786 fail;
4787 }
4788
4789 static const tprop qprop_list [] =
4790 { { FUNCTOR_alias1, message_queue_alias_property },
4791 { FUNCTOR_size1, message_queue_size_property },
4792 { FUNCTOR_max_size1, message_queue_max_size_property },
4793 { FUNCTOR_waiting1, message_queue_waiting_property },
4794 { 0, NULL }
4795 };
4796
4797
4798 typedef struct
4799 { TableEnum e; /* Enumerator on queue-table */
4800 message_queue *q; /* current queue */
4801 const tprop *p; /* Pointer in properties */
4802 int enum_properties; /* Enumerate the properties */
4803 } qprop_enum;
4804
4805
4806 static int
advance_qstate(qprop_enum * state)4807 advance_qstate(qprop_enum *state)
4808 { if ( state->enum_properties )
4809 { state->p++;
4810 if ( state->p->functor )
4811 succeed;
4812
4813 state->p = qprop_list;
4814 }
4815 if ( state->e )
4816 { message_queue *q;
4817
4818 if ( advanceTableEnum(state->e, NULL, (void**)&q) )
4819 { state->q = q;
4820
4821 succeed;
4822 }
4823 }
4824
4825 fail;
4826 }
4827
4828
4829 static void
free_qstate(qprop_enum * state)4830 free_qstate(qprop_enum *state)
4831 { if ( state->e )
4832 freeTableEnum(state->e);
4833
4834 freeForeignState(state, sizeof(*state));
4835 }
4836
4837
4838 static
4839 PRED_IMPL("message_queue_property", 2, message_property, PL_FA_NONDETERMINISTIC)
4840 { PRED_LD
4841 term_t queue = A1;
4842 term_t property = A2;
4843 qprop_enum statebuf;
4844 qprop_enum *state;
4845
4846 switch( CTX_CNTRL )
4847 { case FRG_FIRST_CALL:
4848 { memset(&statebuf, 0, sizeof(statebuf));
4849 state = &statebuf;
4850
4851 if ( PL_is_variable(queue) )
4852 { if ( !queueTable )
4853 return FALSE;
4854
4855 switch( get_prop_def(property, ATOM_message_queue_property,
4856 qprop_list, &state->p) )
4857 { case 1:
4858 state->e = newTableEnum(queueTable);
4859 goto enumerate;
4860 case 0:
4861 state->e = newTableEnum(queueTable);
4862 state->p = qprop_list;
4863 state->enum_properties = TRUE;
4864 goto enumerate;
4865 case -1:
4866 fail;
4867 }
4868 } else if ( get_message_queue__LD(queue, &state->q PASS_LD) )
4869 { release_message_queue(state->q); /* FIXME: we need some form of locking! */
4870
4871 switch( get_prop_def(property, ATOM_message_queue_property,
4872 qprop_list, &state->p) )
4873 { case 1:
4874 goto enumerate;
4875 case 0:
4876 state->p = qprop_list;
4877 state->enum_properties = TRUE;
4878 goto enumerate;
4879 case -1:
4880 fail;
4881 }
4882 } else
4883 { fail;
4884 }
4885 }
4886 case FRG_REDO:
4887 state = CTX_PTR;
4888 break;
4889 case FRG_CUTTED:
4890 state = CTX_PTR;
4891 free_qstate(state);
4892 succeed;
4893 default:
4894 assert(0);
4895 fail;
4896 }
4897
4898 enumerate:
4899 if ( !state->q ) /* first time, enumerating queues */
4900 { message_queue *q;
4901
4902 assert(state->e);
4903 if ( advanceTableEnum(state->e, NULL, (void**)&q) )
4904 { state->q = q;
4905 } else
4906 { freeTableEnum(state->e);
4907 assert(state == &statebuf);
4908 fail;
4909 }
4910 }
4911
4912
4913 { term_t a1 = PL_new_term_ref();
4914
4915 if ( !state->enum_properties )
4916 _PL_get_arg(1, property, a1);
4917
4918 for(;;)
4919 { if ( (*state->p->function)(state->q, a1 PASS_LD) )
4920 { if ( state->enum_properties )
4921 { if ( !PL_unify_term(property,
4922 PL_FUNCTOR, state->p->functor,
4923 PL_TERM, a1) )
4924 goto error;
4925 }
4926 if ( state->e )
4927 { if ( !unify_queue(queue, state->q) )
4928 goto error;
4929 }
4930
4931 if ( advance_qstate(state) )
4932 { if ( state == &statebuf )
4933 { qprop_enum *copy = allocForeignState(sizeof(*copy));
4934
4935 *copy = *state;
4936 state = copy;
4937 }
4938
4939 ForeignRedoPtr(state);
4940 }
4941
4942 if ( state != &statebuf )
4943 free_qstate(state);
4944 else if ( state->e )
4945 freeTableEnum(state->e);
4946 succeed;
4947 }
4948
4949 if ( !advance_qstate(state) )
4950 { error:
4951 if ( state != &statebuf )
4952 free_qstate(state);
4953 else if ( state->e )
4954 freeTableEnum(state->e);
4955 fail;
4956 }
4957 }
4958 }
4959 }
4960
4961 static
4962 PRED_IMPL("message_queue_set", 2, message_queue_set, 0)
4963 { PRED_LD
4964 message_queue *q;
4965 atom_t name;
4966 size_t arity;
4967 int rc;
4968
4969 if ( !get_message_queue__LD(A1, &q PASS_LD) )
4970 return FALSE;
4971
4972 if ( PL_get_name_arity(A2, &name, &arity) && arity == 1 )
4973 { term_t a = PL_new_term_ref();
4974
4975 _PL_get_arg(1, A2, a);
4976 if ( name == ATOM_max_size )
4977 { size_t mx;
4978
4979 if ( (rc=PL_get_size_ex(a, &mx)) )
4980 { size_t omax = q->max_size;
4981
4982 q->max_size = mx;
4983
4984 if ( mx > omax && q->wait_for_drain )
4985 { DEBUG(MSG_QUEUE, Sdprintf("Queue drained. wakeup writers\n"));
4986 cv_signal(&q->drain_var);
4987 }
4988
4989 rc = TRUE;
4990 }
4991 } else
4992 { rc = PL_domain_error("message_queue_property", A2);
4993 }
4994 } else
4995 rc = PL_type_error("compound", A2);
4996
4997 release_message_queue(q);
4998
4999 return rc;
5000 }
5001
5002
5003 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5004 thread_get_message(+Queue, -Message)
5005 thread_get_message(-Message)
5006 Get a message from a message queue. If the queue is not provided get
5007 a message from the queue implicitly associated to the thread.
5008 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
5009
5010 static int
thread_get_message__LD(term_t queue,term_t msg,struct timespec * deadline ARG_LD)5011 thread_get_message__LD(term_t queue, term_t msg, struct timespec *deadline ARG_LD)
5012 { int rc;
5013
5014 for(;;)
5015 { message_queue *q;
5016
5017 if ( !get_message_queue__LD(queue, &q PASS_LD) )
5018 return FALSE;
5019
5020 rc = get_message(q, msg, deadline PASS_LD);
5021 release_message_queue(q);
5022
5023 switch(rc)
5024 { case MSG_WAIT_INTR:
5025 if ( PL_handle_signals() >= 0 )
5026 continue;
5027 rc = FALSE;
5028 break;
5029 case MSG_WAIT_DESTROYED:
5030 rc = PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, queue);
5031 break;
5032 case MSG_WAIT_TIMEOUT:
5033 rc = FALSE;
5034 break;
5035 default:
5036 ;
5037 }
5038
5039 break;
5040 }
5041
5042 return rc;
5043 }
5044
5045
5046 static
5047 PRED_IMPL("thread_get_message", 2, thread_get_message, 0)
5048 { PRED_LD
5049
5050 return thread_get_message__LD(A1, A2, NULL PASS_LD);
5051 }
5052
5053
5054 static
5055 PRED_IMPL("thread_get_message", 3, thread_get_message, 0)
5056 { PRED_LD
5057 struct timespec deadline;
5058 struct timespec *dlop=NULL;
5059
5060 return process_deadline_options(A3,&deadline,&dlop)
5061 && thread_get_message__LD(A1, A2, dlop PASS_LD);
5062 }
5063
5064
5065 static
5066 PRED_IMPL("thread_peek_message", 2, thread_peek_message_2, 0)
5067 { PRED_LD
5068 message_queue *q;
5069 int rc;
5070
5071 if ( !get_message_queue__LD(A1, &q PASS_LD) )
5072 fail;
5073
5074 rc = peek_message(q, A2 PASS_LD);
5075 release_message_queue(q);
5076 return rc;
5077 }
5078
5079
5080 /*******************************
5081 * MUTEX PRIMITIVES *
5082 *******************************/
5083
5084 #ifdef NEED_RECURSIVE_MUTEX_INIT
5085
5086 #ifdef RECURSIVE_MUTEXES
5087 static int
recursive_attr(pthread_mutexattr_t ** ap)5088 recursive_attr(pthread_mutexattr_t **ap)
5089 { static int done;
5090 static pthread_mutexattr_t attr;
5091 int rc;
5092
5093 if ( done )
5094 { *ap = &attr;
5095 return 0;
5096 }
5097
5098 PL_LOCK(L_THREAD);
5099 if ( done )
5100 { PL_UNLOCK(L_THREAD);
5101
5102 *ap = &attr;
5103 return 0;
5104 }
5105 if ( (rc=pthread_mutexattr_init(&attr)) )
5106 goto error;
5107 #ifdef HAVE_PTHREAD_MUTEXATTR_SETTYPE
5108 if ( (rc=pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)) )
5109 goto error;
5110 #else
5111 #ifdef HAVE_PTHREAD_MUTEXATTR_SETKIND_NP
5112 if ( (rc=pthread_mutexattr_setkind_np(&attr, PTHREAD_MUTEX_RECURSIVE_NP)) )
5113 goto error;
5114 #endif
5115 #endif
5116
5117 done = TRUE;
5118 PL_UNLOCK(L_THREAD);
5119 *ap = &attr;
5120
5121 return 0;
5122
5123 error:
5124 PL_UNLOCK(L_THREAD);
5125 return rc;
5126 }
5127 #endif
5128
5129 int
recursiveMutexInit(recursiveMutex * m)5130 recursiveMutexInit(recursiveMutex *m)
5131 {
5132 #ifdef RECURSIVE_MUTEXES
5133 pthread_mutexattr_t *attr = NULL;
5134 int rc;
5135
5136 if ( (rc=recursive_attr(&attr)) )
5137 return rc;
5138
5139 return pthread_mutex_init(m, attr);
5140
5141 #else /*RECURSIVE_MUTEXES*/
5142
5143 m->owner = 0;
5144 m->count = 0;
5145 return pthread_mutex_init(&(m->lock), NULL);
5146
5147 #endif /* RECURSIVE_MUTEXES */
5148 }
5149
5150 #endif /*NEED_RECURSIVE_MUTEX_INIT*/
5151
5152 #ifdef NEED_RECURSIVE_MUTEX_DELETE
5153
5154 int
recursiveMutexDelete(recursiveMutex * m)5155 recursiveMutexDelete(recursiveMutex *m)
5156 { if ( m->owner != 0 )
5157 return EBUSY;
5158
5159 return pthread_mutex_destroy(&(m->lock));
5160 }
5161
5162 #endif /*NEED_RECURSIVE_MUTEX_DELETE*/
5163
5164 #ifndef RECURSIVE_MUTEXES
5165 int
recursiveMutexLock(recursiveMutex * m)5166 recursiveMutexLock(recursiveMutex *m)
5167 { int result = 0;
5168 pthread_t self = pthread_self();
5169
5170 if ( pthread_equal(self, m->owner) )
5171 m->count++;
5172 else
5173 { result = pthread_mutex_lock(&(m->lock));
5174 m->owner = self;
5175 m->count = 1;
5176 }
5177
5178 return result;
5179 }
5180
5181
5182 int
recursiveMutexTryLock(recursiveMutex * m)5183 recursiveMutexTryLock(recursiveMutex *m)
5184 { int result = 0;
5185 pthread_t self = pthread_self();
5186
5187 if ( pthread_equal(self, m->owner) )
5188 m->count++;
5189 else
5190 { result = pthread_mutex_trylock(&(m->lock));
5191 if ( result == 0 )
5192 { m->owner = self;
5193 m->count = 1;
5194 }
5195 }
5196
5197 return result;
5198 }
5199
5200
5201 int
recursiveMutexUnlock(recursiveMutex * m)5202 recursiveMutexUnlock(recursiveMutex *m)
5203 { int result = 0;
5204 pthread_t self = pthread_self();
5205
5206 if ( pthread_equal(self,m->owner) )
5207 { if ( --m->count < 1 )
5208 { m->owner = 0;
5209 result = pthread_mutex_unlock(&(m->lock));
5210 }
5211 } else if ( !pthread_equal(m->owner, 0) )
5212 { Sdprintf("unlocking unowned mutex %p,not done!!\n", m);
5213 Sdprintf("\tlocking thread was %u , unlocking is %u\n",m->owner,self);
5214
5215 result = -1;
5216 }
5217
5218 return result;
5219 }
5220
5221 #endif /*RECURSIVE_MUTEXES*/
5222
5223 void
initSimpleMutex(counting_mutex * m,const char * name)5224 initSimpleMutex(counting_mutex *m, const char *name)
5225 { simpleMutexInit(&m->mutex);
5226 m->count = 0;
5227 m->lock_count = 0;
5228 #ifdef O_CONTENTION_STATISTICS
5229 m->collisions = 0;
5230 #endif
5231 m->name = name ? store_string(name) : (char*)NULL;
5232 m->prev = NULL;
5233
5234 PL_LOCK(L_MUTEX);
5235 m->next = GD->thread.mutexes;
5236 GD->thread.mutexes = m;
5237 if ( m->next )
5238 m->next->prev = m;
5239 PL_UNLOCK(L_MUTEX);
5240 }
5241
5242
5243 counting_mutex *
allocSimpleMutex(const char * name)5244 allocSimpleMutex(const char *name)
5245 { counting_mutex *m = allocHeapOrHalt(sizeof(*m));
5246 initSimpleMutex(m, name);
5247
5248 return m;
5249 }
5250
5251
5252 void
freeSimpleMutex(counting_mutex * m)5253 freeSimpleMutex(counting_mutex *m)
5254 { PL_LOCK(L_MUTEX);
5255 if ( m->next )
5256 m->next->prev = m->prev;
5257 if ( m->prev )
5258 m->prev->next = m->next;
5259 else
5260 GD->thread.mutexes = m->next;
5261 PL_UNLOCK(L_MUTEX);
5262
5263 simpleMutexDelete(&m->mutex);
5264 remove_string((char *)m->name);
5265 freeHeap(m, sizeof(*m));
5266 }
5267
5268
5269 /*******************************
5270 * FOREIGN INTERFACE *
5271 *******************************/
5272
5273 int
PL_thread_attach_engine(PL_thread_attr_t * attr)5274 PL_thread_attach_engine(PL_thread_attr_t *attr)
5275 { GET_LD
5276 PL_thread_info_t *info;
5277 PL_local_data_t *ldnew;
5278 PL_local_data_t *ldmain;
5279
5280 if ( LD )
5281 { if ( LD->thread.info->open_count+1 == 0 )
5282 { errno = EINVAL;
5283 return -1;
5284 }
5285 LD->thread.info->open_count++;
5286 return LD->thread.info->pl_tid;
5287 }
5288
5289 if ( !GD->thread.enabled || GD->cleaning != CLN_NORMAL )
5290 {
5291 #ifdef EPERM /* FIXME: Better reporting */
5292 errno = EPERM;
5293 #endif
5294 return -1;
5295 }
5296
5297 if ( !(info = alloc_thread()) )
5298 return -1; /* out of threads */
5299
5300 ldmain = GD->thread.threads[1]->thread_data;
5301 ldnew = info->thread_data;
5302
5303 if ( attr )
5304 { if ( attr->stack_limit )
5305 info->stack_limit = attr->stack_limit;
5306
5307 info->cancel = attr->cancel;
5308 }
5309
5310 info->goal = NULL;
5311 info->module = MODULE_user;
5312 info->detached = attr == NULL ||
5313 (attr->flags & PL_THREAD_NOT_DETACHED) == 0;
5314 info->open_count = 1;
5315
5316 copy_local_data(ldnew, ldmain, attr ? attr->max_queue_size : 0);
5317
5318 if ( !initialise_thread(info) )
5319 { free_thread_info(info);
5320 errno = ENOMEM;
5321 return -1;
5322 }
5323 set_system_thread_id(info);
5324 PL_LOCK(L_THREAD);
5325 info->status = PL_THREAD_RUNNING;
5326 PL_UNLOCK(L_THREAD);
5327
5328 if ( attr )
5329 { if ( attr->alias )
5330 { if ( !aliasThread(info->pl_tid, ATOM_thread, PL_new_atom(attr->alias)) )
5331 { free_thread_info(info);
5332 errno = EPERM;
5333 TLD_set_LD(NULL);
5334 return -1;
5335 }
5336 }
5337 if ( true(attr, PL_THREAD_NO_DEBUG) )
5338 { ldnew->_debugstatus.tracing = FALSE;
5339 ldnew->_debugstatus.debugging = DBG_OFF;
5340 set(&ldnew->prolog_flag.mask, PLFLAG_LASTCALL);
5341 info->debug = FALSE;
5342 }
5343 }
5344
5345 updateAlerted(ldnew);
5346 PL_call_predicate(MODULE_system, PL_Q_NORMAL, PROCEDURE_dthread_init0, 0);
5347
5348 return info->pl_tid;
5349 }
5350
5351
5352 int
PL_thread_destroy_engine(void)5353 PL_thread_destroy_engine(void)
5354 { GET_LD
5355
5356 if ( LD )
5357 { if ( --LD->thread.info->open_count == 0 )
5358 { free_prolog_thread(LD);
5359 TLD_set_LD(NULL);
5360 }
5361
5362 return TRUE;
5363 }
5364
5365 return FALSE; /* we had no thread */
5366 }
5367
5368
5369 int
attachConsole(void)5370 attachConsole(void)
5371 { GET_LD
5372 fid_t fid = PL_open_foreign_frame();
5373 int rval;
5374 predicate_t pred = PL_predicate("attach_console", 0, "user");
5375
5376 rval = PL_call_predicate(NULL, PL_Q_NODEBUG, pred, 0);
5377
5378 PL_discard_foreign_frame(fid);
5379
5380 return rval;
5381 }
5382
5383
5384 /*******************************
5385 * ENGINES *
5386 *******************************/
5387
5388 static PL_engine_t
PL_current_engine(void)5389 PL_current_engine(void)
5390 { GET_LD
5391
5392 return LD;
5393 }
5394
5395
5396 static void
detach_engine(PL_engine_t e)5397 detach_engine(PL_engine_t e)
5398 { PL_thread_info_t *info = e->thread.info;
5399
5400 info->has_tid = FALSE;
5401 #ifdef __linux__
5402 info->pid = -1;
5403 #endif
5404 #ifdef __WINDOWS__
5405 info->w32id = 0;
5406 #endif
5407 memset(&info->tid, 0, sizeof(info->tid));
5408 }
5409
5410
5411 int
PL_set_engine(PL_engine_t new,PL_engine_t * old)5412 PL_set_engine(PL_engine_t new, PL_engine_t *old)
5413 { PL_engine_t current = PL_current_engine();
5414
5415 if ( new != current && new != PL_ENGINE_CURRENT )
5416 { PL_LOCK(L_THREAD);
5417
5418 if ( new )
5419 { if ( new == PL_ENGINE_MAIN )
5420 new = &PL_local_data;
5421
5422 if ( new->magic != LD_MAGIC )
5423 { PL_UNLOCK(L_THREAD);
5424 return PL_ENGINE_INVAL;
5425 }
5426 if ( new->thread.info->has_tid )
5427 { PL_UNLOCK(L_THREAD);
5428 return PL_ENGINE_INUSE;
5429 }
5430 }
5431
5432 if ( current )
5433 detach_engine(current);
5434
5435 if ( new )
5436 { TLD_set_LD(new);
5437 new->thread.info->tid = pthread_self();
5438
5439 set_system_thread_id(new->thread.info);
5440 } else
5441 { TLD_set_LD(NULL);
5442 }
5443
5444 PL_UNLOCK(L_THREAD);
5445 }
5446
5447 if ( old )
5448 { *old = current;
5449 }
5450
5451 return PL_ENGINE_SET;
5452 }
5453
5454
5455 PL_engine_t
PL_create_engine(PL_thread_attr_t * attributes)5456 PL_create_engine(PL_thread_attr_t *attributes)
5457 { PL_engine_t e, current;
5458
5459 PL_set_engine(NULL, ¤t);
5460 if ( PL_thread_attach_engine(attributes) >= 0 )
5461 { e = PL_current_engine();
5462 } else
5463 e = NULL;
5464
5465 PL_set_engine(current, NULL);
5466
5467 return e;
5468 }
5469
5470
5471 int
PL_destroy_engine(PL_engine_t e)5472 PL_destroy_engine(PL_engine_t e)
5473 { int rc;
5474
5475 if ( e == PL_current_engine() )
5476 { rc = PL_thread_destroy_engine();
5477 } else
5478 { PL_engine_t current;
5479
5480 if ( PL_set_engine(e, ¤t) == PL_ENGINE_SET )
5481 { rc = PL_thread_destroy_engine();
5482 PL_set_engine(current, NULL);
5483
5484 } else
5485 rc = FALSE;
5486 }
5487
5488 return rc;
5489 }
5490
5491
5492 /*******************************
5493 * GC THREAD *
5494 *******************************/
5495
5496 static int GC_id = 0;
5497 static int GC_starting = 0;
5498
5499 static rc_cancel cancelGCThread(int tid);
5500
5501 static void *
GCmain(void * closure)5502 GCmain(void *closure)
5503 { PL_thread_attr_t attrs = {0};
5504 #ifdef HAVE_SIGPROCMASK
5505 sigset_t set;
5506 allSignalMask(&set);
5507 if ( GD->signals.sig_alert )
5508 sigdelset(&set, GD->signals.sig_alert);
5509 pthread_sigmask(SIG_BLOCK, &set, NULL);
5510 #endif
5511
5512 attrs.alias = "gc";
5513 attrs.flags = PL_THREAD_NO_DEBUG|PL_THREAD_NOT_DETACHED;
5514 set_os_thread_name_from_charp("gc");
5515
5516 if ( PL_thread_attach_engine(&attrs) > 0 )
5517 { GET_LD
5518 PL_thread_info_t *info = LD->thread.info;
5519 static predicate_t pred = 0;
5520 int rc;
5521
5522 if ( !pred )
5523 pred = PL_predicate("$gc", 0, "system");
5524
5525 GC_id = PL_thread_self();
5526 info->cancel = cancelGCThread;
5527 rc = PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, pred, 0);
5528 GC_id = 0;
5529
5530 set_thread_completion(info, rc, exception_term);
5531 PL_thread_destroy_engine();
5532 }
5533
5534 GC_starting = FALSE;
5535
5536 return NULL;
5537 }
5538
5539
5540 static int
GCthread(void)5541 GCthread(void)
5542 { if ( GC_id <= 0 )
5543 { if ( COMPARE_AND_SWAP_INT(&GC_starting, FALSE, TRUE) )
5544 { pthread_attr_t attr;
5545 pthread_t thr;
5546 int rc;
5547
5548 if ( !GD->thread.gc.initialized )
5549 { pthread_mutex_init(&GD->thread.gc.mutex, NULL);
5550 pthread_cond_init(&GD->thread.gc.cond, NULL);
5551 GD->thread.gc.initialized = TRUE;
5552 } else
5553 { GD->thread.gc.requests = 0;
5554 }
5555
5556 pthread_attr_init(&attr);
5557 rc = pthread_create(&thr, &attr, GCmain, NULL);
5558 pthread_attr_destroy(&attr);
5559 if ( rc != 0 )
5560 GC_starting = FALSE;
5561 }
5562 }
5563
5564 return GC_id;
5565 }
5566
5567
5568 static int
gc_sig_request(int sig)5569 gc_sig_request(int sig)
5570 { switch(sig)
5571 { case SIG_ATOM_GC:
5572 return GCREQUEST_AGC;
5573 case SIG_CLAUSE_GC:
5574 return GCREQUEST_CGC;
5575 case SIG_PLABORT:
5576 return GCREQUEST_ABORT;
5577 default:
5578 assert(0);
5579 return 0;
5580 }
5581 }
5582
5583
5584 static int
signalGCThreadCond(int tid,int sig)5585 signalGCThreadCond(int tid, int sig)
5586 { (void)tid;
5587
5588 pthread_mutex_lock(&GD->thread.gc.mutex);
5589 GD->thread.gc.requests |= gc_sig_request(sig);
5590 pthread_cond_signal(&GD->thread.gc.cond);
5591 pthread_mutex_unlock(&GD->thread.gc.mutex);
5592
5593 return TRUE;
5594 }
5595
5596
5597 int
signalGCThread(int sig)5598 signalGCThread(int sig)
5599 { GET_LD
5600 int tid;
5601
5602 if ( truePrologFlag(PLFLAG_GCTHREAD) &&
5603 !GD->bootsession &&
5604 (tid = GCthread()) > 0 &&
5605 signalGCThreadCond(tid, sig) )
5606 return TRUE;
5607
5608 return raiseSignal(LD, sig);
5609 }
5610
5611
5612 static int
gc_running(void)5613 gc_running(void)
5614 { int tid;
5615 PL_thread_info_t *info;
5616
5617 if ( (tid=GC_id) > 0 && (info = GD->thread.threads[tid]) &&
5618 info->status == PL_THREAD_RUNNING )
5619 return tid;
5620
5621 return 0;
5622 }
5623
5624
5625 int
isSignalledGCThread(int sig ARG_LD)5626 isSignalledGCThread(int sig ARG_LD)
5627 { if ( gc_running() )
5628 { return (GD->thread.gc.requests & gc_sig_request(sig)) != 0;
5629 } else
5630 { return PL_pending(sig);
5631 }
5632 }
5633
5634
5635 static
5636 PRED_IMPL("$gc_wait", 1, gc_wait, 0)
5637 { PRED_LD
5638
5639 for(;;)
5640 { unsigned int req;
5641 atom_t action;
5642
5643 pthread_mutex_lock(&GD->thread.gc.mutex);
5644 req = GD->thread.gc.requests;
5645 if ( !req )
5646 pthread_cond_wait(&GD->thread.gc.cond, &GD->thread.gc.mutex);
5647 pthread_mutex_unlock(&GD->thread.gc.mutex);
5648
5649 if ( (req&GCREQUEST_ABORT) )
5650 action = ATOM_abort;
5651 else if ( (req&GCREQUEST_AGC) )
5652 action = ATOM_garbage_collect_atoms;
5653 else if ( (req&GCREQUEST_CGC) )
5654 action = ATOM_garbage_collect_clauses;
5655 else
5656 continue;
5657
5658 return PL_unify_atom(A1, action);
5659 }
5660 }
5661
5662
5663 static
5664 PRED_IMPL("$gc_clear", 1, gc_clear, 0)
5665 { PRED_LD
5666 atom_t action;
5667
5668 if ( PL_get_atom_ex(A1, &action) )
5669 { unsigned int mask;
5670
5671 if ( action == ATOM_garbage_collect_atoms )
5672 mask = GCREQUEST_AGC;
5673 else if ( action == ATOM_garbage_collect_clauses )
5674 mask = GCREQUEST_CGC;
5675 else
5676 return PL_domain_error("action", A1);
5677
5678 pthread_mutex_lock(&GD->thread.gc.mutex);
5679 GD->thread.gc.requests &= ~mask;
5680 pthread_mutex_unlock(&GD->thread.gc.mutex);
5681
5682 return TRUE;
5683 }
5684
5685 return FALSE;
5686 }
5687
5688
5689 static rc_cancel
cancelGCThread(int tid)5690 cancelGCThread(int tid)
5691 { signalGCThreadCond(tid, SIG_PLABORT);
5692 return PL_THREAD_CANCEL_MUST_JOIN;
5693 }
5694
5695 static
5696 PRED_IMPL("$gc_stop", 0, gc_stop, 0)
5697 { int tid;
5698
5699 if ( (tid=gc_running()) )
5700 return cancelGCThread(tid) != PL_THREAD_CANCEL_FAILED;
5701
5702 return FALSE;
5703 }
5704
5705
5706 /*******************************
5707 * STATISTICS *
5708 *******************************/
5709
5710 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5711 thread_statistics(+Thread, +Key, -Value)
5712 Same as statistics(+Key, -Value) but operates on another thread.
5713
5714 statistics(heapused, X) walks along all threads and therefore locks
5715 L_THREAD. As it returns the same result on all threads we'll redirect
5716 this to run on our own thread.
5717 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
5718
5719 static
5720 PRED_IMPL("thread_statistics", 3, thread_statistics, 0)
5721 { PRED_LD
5722 PL_thread_info_t *info;
5723 PL_local_data_t *ld;
5724 int rval;
5725 atom_t k;
5726
5727 PL_LOCK(L_THREAD);
5728 if ( !get_thread(A1, &info, TRUE) )
5729 { PL_UNLOCK(L_THREAD);
5730 fail;
5731 }
5732
5733 if ( !(ld=info->thread_data) )
5734 { PL_UNLOCK(L_THREAD);
5735 return PL_error(NULL, 0, NULL,
5736 ERR_PERMISSION,
5737 ATOM_statistics, ATOM_thread, A1);
5738 }
5739
5740 if ( !PL_get_atom(A2, &k) )
5741 k = 0;
5742
5743 if ( k == ATOM_heapused )
5744 ld = LD;
5745 else if ( k == ATOM_cputime || k == ATOM_runtime )
5746 ld->statistics.user_cputime = ThreadCPUTime(ld, CPU_USER);
5747 else if ( k == ATOM_system_time )
5748 ld->statistics.system_cputime = ThreadCPUTime(ld, CPU_SYSTEM);
5749
5750 if ( LD == ld ) /* self: unlock first to avoid deadlock */
5751 { PL_UNLOCK(L_THREAD);
5752 return pl_statistics_ld(A2, A3, ld PASS_LD);
5753 }
5754
5755 rval = pl_statistics_ld(A2, A3, ld PASS_LD);
5756 PL_UNLOCK(L_THREAD);
5757
5758 return rval;
5759 }
5760
5761
5762 #ifdef __WINDOWS__
5763
5764 /* How to make the memory visible?
5765 */
5766
5767 /* see also pl-nt.c */
5768 #define nano * 0.000000001
5769 #define ntick 100.0
5770
5771 double
ThreadCPUTime(PL_local_data_t * ld,int which)5772 ThreadCPUTime(PL_local_data_t *ld, int which)
5773 { PL_thread_info_t *info = ld->thread.info;
5774 double t;
5775 FILETIME created, exited, kerneltime, usertime;
5776 HANDLE win_thread;
5777
5778 if ( !info->has_tid )
5779 return 0.0;
5780
5781 if ( !(win_thread = get_windows_thread(info)) )
5782 return 0.0;
5783
5784 if ( GetThreadTimes(win_thread,
5785 &created, &exited, &kerneltime, &usertime) )
5786 { FILETIME *p;
5787
5788 if ( which == CPU_SYSTEM )
5789 p = &kerneltime;
5790 else
5791 p = &usertime;
5792
5793 t = (double)p->dwHighDateTime * (4294967296.0 * ntick nano);
5794 t += (double)p->dwLowDateTime * (ntick nano);
5795 } else
5796 t = 0.0;
5797
5798 close_windows_thread(win_thread);
5799
5800 return t;
5801 }
5802
5803 #else /*__WINDOWS__*/
5804
5805 #define timespec_to_double(ts) \
5806 ((double)(ts).tv_sec + (double)(ts).tv_nsec/(double)1000000000.0)
5807
5808 #ifdef PTHREAD_CPUCLOCKS
5809 #define NO_THREAD_SYSTEM_TIME 1
5810
5811 double
ThreadCPUTime(PL_local_data_t * ld,int which)5812 ThreadCPUTime(PL_local_data_t *ld, int which)
5813 { PL_thread_info_t *info = ld->thread.info;
5814
5815 if ( which == CPU_SYSTEM )
5816 return 0.0;
5817
5818 if ( info->has_tid )
5819 { clockid_t clock_id;
5820 struct timespec ts;
5821 int rc;
5822
5823 if ( (rc=pthread_getcpuclockid(info->tid, &clock_id)) == 0 )
5824 { if (clock_gettime(clock_id, &ts) == 0)
5825 return timespec_to_double(ts);
5826 } else
5827 { DEBUG(MSG_THREAD,
5828 Sdprintf("Could not get thread time: %s\n", strerror(rc)));
5829 }
5830 }
5831
5832 return 0.0;
5833 }
5834
5835 #else /*PTHREAD_CPUCLOCKS*/
5836
5837 #ifdef HAVE_MACH_THREAD_ACT_H /* MacOS and other Mach based systems */
5838
5839 #include <mach/thread_act.h>
5840
5841 double
ThreadCPUTime(PL_local_data_t * ld,int which)5842 ThreadCPUTime(PL_local_data_t *ld, int which)
5843 { PL_thread_info_t *info = ld->thread.info;
5844
5845 if ( info->has_tid )
5846 { kern_return_t error;
5847 struct thread_basic_info th_info;
5848 mach_msg_type_number_t th_info_count = THREAD_BASIC_INFO_COUNT;
5849 thread_t thread = pthread_mach_thread_np(info->tid);
5850
5851 if ((error = thread_info(thread, THREAD_BASIC_INFO,
5852 (thread_info_t)&th_info, &th_info_count))
5853 != KERN_SUCCESS)
5854 return 0.0;
5855
5856 if ( which == CPU_SYSTEM )
5857 return ( th_info.system_time.seconds +
5858 th_info.system_time.microseconds / 1e6 );
5859 else
5860 return ( th_info.user_time.seconds +
5861 th_info.user_time.microseconds / 1e6 );
5862 }
5863
5864 return 0.0;
5865 }
5866
5867 #else /*HAVE_MACH_THREAD_ACT_H*/
5868
5869 #ifdef LINUX_PROCFS
5870
5871 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5872 Unfortunately POSIX threads does not define a way to get the CPU time
5873 per thread. Some systems do have mechanisms for that. POSIX does define
5874 clock_gettime(CLOCK_THREAD_CPUTIME_ID), but it certainly doesn't work in
5875 Linux 2.6.8.
5876
5877 Autoconf detects the presense of /proc and we read the values from
5878 there. This is rather slow. To make things not too bad we use a pool of
5879 open handles to entries in the /proc table. All this junk should really
5880 be moved into a file trying to implement thread CPU time for as many as
5881 possible platforms. If you happen to know such a library, please let me
5882 know.
5883 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
5884
5885 #include <fcntl.h>
5886
5887 #define CACHED_PROCPS_ENTRIES 5
5888
5889 typedef struct
5890 { int tid; /* process id */
5891 int fd; /* file descriptor */
5892 int offset; /* start of numbers */
5893 int usecoount;
5894 } procps_entry;
5895
5896 static procps_entry procps_entries[CACHED_PROCPS_ENTRIES]; /* cached entries */
5897
5898 static void
close_procps_entry(procps_entry * e)5899 close_procps_entry(procps_entry *e)
5900 { if ( e->tid )
5901 { close(e->fd);
5902 memset(e, 0, sizeof(*e));
5903 }
5904 }
5905
5906
5907 static procps_entry*
reclaim_procps_entry()5908 reclaim_procps_entry()
5909 { procps_entry *e, *low;
5910 int i; int lowc;
5911
5912 low=e=procps_entries;
5913 lowc=low->usecoount;
5914
5915 for(e++, i=1; i<CACHED_PROCPS_ENTRIES; e++, i++)
5916 { if ( e->usecoount < lowc )
5917 { lowc = e->usecoount;
5918 low = e;
5919 }
5920 }
5921
5922 for(e=procps_entries, i=0; i<CACHED_PROCPS_ENTRIES; e++, i++)
5923 e->usecoount = 0;
5924
5925 close_procps_entry(low);
5926
5927 return low;
5928 }
5929
5930
5931 static procps_entry *
open_procps_entry(procps_entry * e,int tid)5932 open_procps_entry(procps_entry *e, int tid)
5933 { char fname[256];
5934 int fd;
5935
5936 sprintf(fname, "/proc/self/task/%d/stat", tid);
5937 if ( (fd=open(fname, O_RDONLY)) >= 0 )
5938 { char buffer[1000];
5939 int pos;
5940
5941 pos = read(fd, buffer, sizeof(buffer)-1);
5942 if ( pos > 0 )
5943 { char *bp;
5944
5945 buffer[pos] = EOS;
5946 if ( (bp=strrchr(buffer, ')')) )
5947 { e->tid = tid;
5948 e->fd = fd;
5949 e->offset = (bp-buffer)+4;
5950 e->usecoount = 1;
5951
5952 return e;
5953 }
5954 }
5955 }
5956
5957 return NULL;
5958 }
5959
5960
5961 static procps_entry*
get_procps_entry(int tid)5962 get_procps_entry(int tid)
5963 { int i;
5964 procps_entry *e;
5965
5966 for(e=procps_entries, i=0; i<CACHED_PROCPS_ENTRIES; e++, i++)
5967 { if ( e->tid == tid )
5968 { e->usecoount++;
5969
5970 return e;
5971 }
5972 }
5973
5974 for(e=procps_entries, i=0; i<CACHED_PROCPS_ENTRIES; e++, i++)
5975 { if ( e->tid == 0 )
5976 return open_procps_entry(e, tid);
5977 }
5978
5979 e = reclaim_procps_entry();
5980 return open_procps_entry(e, tid);
5981 }
5982
5983
5984 double
ThreadCPUTime(PL_local_data_t * ld,int which)5985 ThreadCPUTime(PL_local_data_t *ld, int which)
5986 { PL_thread_info_t *info = ld->thread.info;
5987 procps_entry *e;
5988
5989 if ( (e=get_procps_entry(info->pid)) )
5990 { char buffer[1000];
5991 char *s;
5992 int64_t ticks;
5993 int i, n, nth = 10; /* user time */
5994
5995 if ( which == CPU_SYSTEM )
5996 nth++;
5997
5998 lseek(e->fd, e->offset, SEEK_SET);
5999 n = read(e->fd, buffer, sizeof(buffer)-1);
6000 if ( n >= 0 )
6001 buffer[n] = EOS;
6002 /* most likely does not need reuse */
6003 if ( info->status != PL_THREAD_RUNNING )
6004 close_procps_entry(e);
6005
6006 for(s=buffer, i=0; i<nth; i++, s++)
6007 { while(*s != ' ')
6008 s++;
6009 }
6010
6011 ticks = atoll(s);
6012 return (double)ticks/100.0;
6013 }
6014
6015 return 0.0;
6016 }
6017
6018 #else /*LINUX_PROCFS*/
6019
6020 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6021 Systems where we can get the thread CPU time from the calling thread
6022 only. Upto Linux 2.4, linux threads where more like processes, and we
6023 can get these using times(). Some POSIX systems have a clock
6024 CLOCK_THREAD_CPUTIME_ID. Odly enough, on Debian etche at least, fetching
6025 this with clock_gettime() produces bogus, but getting it through the
6026 syscall interface works. This might be because the kernel is fairly
6027 up-to-date, but the C library is very much out of data (glibc 2.3)
6028 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6029
6030 #ifndef SIG_SYNCTIME
6031 #define SIG_SYNCTIME SIGUSR1
6032 #endif
6033
6034 #ifdef USE_SEM_OPEN
6035 static sem_t *sem_synctime_ptr;
6036 #else
6037 static sem_t sem_synctime; /* used for atom-gc */
6038 #define sem_synctime_ptr (&sem_synctime)
6039 #endif
6040
6041 #ifdef LINUX_CPUCLOCKS
6042 #define NO_THREAD_SYSTEM_TIME 1
6043
6044 static void
SyncUserCPU(int sig)6045 SyncUserCPU(int sig)
6046 { GET_LD
6047
6048 if ( LD )
6049 { struct timespec ts;
6050
6051 if ( syscall(__NR_clock_gettime, CLOCK_THREAD_CPUTIME_ID, &ts) == 0 )
6052 { LD->statistics.user_cputime = timespec_to_double(ts);
6053 } else
6054 { perror("clock_gettime");
6055 }
6056 }
6057
6058 if ( sig )
6059 sem_post(sem_synctime_ptr);
6060 }
6061
6062
6063 static void
SyncSystemCPU(int sig)6064 SyncSystemCPU(int sig)
6065 { assert(0);
6066 }
6067
6068 #else /*LINUX_CPUCLOCKS*/
6069 #define THREAD_CPU_BY_PID 1
6070
6071 static void
SyncUserCPU(int sig)6072 SyncUserCPU(int sig)
6073 { GET_LD
6074
6075 if ( LD )
6076 LD->statistics.user_cputime = CpuTime(CPU_USER);
6077 if ( sig )
6078 sem_post(sem_synctime_ptr);
6079 }
6080
6081
6082 static void
SyncSystemCPU(int sig)6083 SyncSystemCPU(int sig)
6084 { GET_LD
6085
6086 if ( LD )
6087 LD->statistics.system_cputime = CpuTime(CPU_SYSTEM);
6088 if ( sig )
6089 sem_post(sem_synctime_ptr);
6090 }
6091
6092 #endif /*LINUX_CPUCLOCKS*/
6093
6094 double
ThreadCPUTime(PL_local_data_t * ld,int which)6095 ThreadCPUTime(PL_local_data_t *ld, int which)
6096 { GET_LD
6097 PL_thread_info_t *info = ld->thread.info;
6098
6099 #ifdef NO_THREAD_SYSTEM_TIME
6100 if ( which == CPU_SYSTEM )
6101 return 0.0;
6102 #endif
6103
6104 if ( info->thread_data == LD )
6105 { if ( which == CPU_USER )
6106 SyncUserCPU(0);
6107 else
6108 SyncSystemCPU(0);
6109 } else
6110 { struct sigaction old;
6111 struct sigaction new;
6112 sigset_t sigmask;
6113 sigset_t set;
6114 int ok;
6115
6116 blockSignals(&set);
6117 sem_init(sem_synctime_ptr, USYNC_THREAD, 0);
6118 allSignalMask(&sigmask);
6119 memset(&new, 0, sizeof(new));
6120 new.sa_handler = (which == CPU_USER ? SyncUserCPU : SyncSystemCPU);
6121 new.sa_flags = SA_RESTART;
6122 new.sa_mask = sigmask;
6123 sigaction(SIG_SYNCTIME, &new, &old);
6124
6125 if ( info->has_tid )
6126 ok = (pthread_kill(info->tid, SIG_SYNCTIME) == 0);
6127 else
6128 ok = FALSE;
6129
6130 if ( ok )
6131 { while( sem_wait(sem_synctime_ptr) == -1 && errno == EINTR )
6132 ;
6133 }
6134 sem_destroy(sem_synctime_ptr);
6135 sigaction(SIG_SYNCTIME, &old, NULL);
6136 unblockSignals(&set);
6137 if ( !ok )
6138 return 0.0;
6139 }
6140
6141 if ( which == CPU_USER )
6142 return info->thread_data->statistics.user_cputime;
6143 else
6144 return info->thread_data->statistics.system_cputime;
6145 }
6146
6147 #endif /*LINUX_PROCFS*/
6148 #endif /*HAVE_MACH_THREAD_ACT_H*/
6149 #endif /*PTHREAD_CPUCLOCKS*/
6150 #endif /*__WINDOWS__*/
6151
6152
6153 /*******************************
6154 * ITERATE OVER THREADS *
6155 *******************************/
6156
6157 void
forThreadLocalDataUnsuspended(void (* func)(PL_local_data_t *),unsigned flags)6158 forThreadLocalDataUnsuspended(void (*func)(PL_local_data_t *), unsigned flags)
6159 { GET_LD
6160 int me = PL_thread_self();
6161 int i;
6162
6163 for( i=1; i<=GD->thread.highest_id; i++ )
6164 { if ( i != me )
6165 { PL_thread_info_t *info = GD->thread.threads[i];
6166
6167 if ( info && info->thread_data &&
6168 ( info->status == PL_THREAD_RUNNING || info->in_exit_hooks ) )
6169 { PL_local_data_t *ld;
6170
6171 if ( (ld = acquire_ldata(info)) )
6172 { simpleMutexLock(&ld->thread.scan_lock);
6173 (*func)(ld);
6174 simpleMutexUnlock(&ld->thread.scan_lock);
6175 }
6176 }
6177 }
6178 }
6179 }
6180
6181
6182 /*******************************
6183 * ATOM MARK SUPPORT *
6184 *******************************/
6185
6186 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6187 We do not register atoms in message queues as the PL_register_atom()
6188 calls seriously harms concurrency due to contention on L_ATOM. So,
6189 instead we must sweep atoms in records in the message queues. Note that
6190 atom-gc runs with the L_THREAD mutex locked. As both he thread mutex
6191 queue tables are guarded by this mutex, the loops in markAtomsThreads()
6192 are safe.
6193
6194 The marking must be synchronised with removing a message from a queue
6195 (get_message()) and destroy_message_queue(). Notably the latter is
6196 tricky as this deletes `queue->gc_mutex` and frees the queue, so we must
6197 ensure we do not get into destroy_message_queue() when AGC is marking
6198 the queue's messages. There are three types of queues:
6199
6200 - Thread queues. AGC is synced with thread creation and destruction
6201 and thus safe.
6202 - Anonymous message queues. These are reclaimed from AGC and thus
6203 safe.
6204 - Named queues. These are destroyed using message_queue_destroy/1.
6205 For this case we
6206 - Use `queueTable_mutex` to sync deletion from `queueTable` with
6207 enumeration in markAtomsMessageQueues() which ensures we safely
6208 get the queue with queue->gc_mutex locked.
6209 - We hold queue->gc_mutex during the scan, which delays
6210 message_queue_destroy/1 to set `queue->destroyed`.
6211 - If `queue->destroyed` is set, AGC cannot get the queue anymore
6212 and we can safely discard it.
6213
6214 This is implemented by Keri Harris and Jan Wielemaker. We are not really
6215 happy with the complicated locking. Alternatively we could use the
6216 linger interface where the lingering queues are reclaimed by AGC. That
6217 might be cleaner, but is a more involved patch that doesn't perform
6218 significantly better.
6219 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6220
6221 static void
markAtomsMessageQueue(message_queue * queue)6222 markAtomsMessageQueue(message_queue *queue)
6223 { thread_message *msg;
6224
6225 for(msg=queue->head; msg; msg=msg->next)
6226 { markAtomsRecord(msg->message);
6227 }
6228 }
6229
6230
6231 void
markAtomsThreadMessageQueue(PL_local_data_t * ld)6232 markAtomsThreadMessageQueue(PL_local_data_t *ld)
6233 { message_queue *q = &ld->thread.messages;
6234
6235 simpleMutexLock(&q->gc_mutex);
6236 markAtomsMessageQueue(q);
6237 simpleMutexUnlock(&q->gc_mutex);
6238 }
6239
6240
6241 void
markAtomsMessageQueues(void)6242 markAtomsMessageQueues(void)
6243 { if ( queueTable )
6244 { message_queue *q;
6245 TableEnum e = newTableEnum(queueTable);
6246
6247 while ( TRUE )
6248 { simpleMutexLock(&queueTable_mutex);
6249 if ( !advanceTableEnum(e, NULL, (void**)&q) )
6250 { simpleMutexUnlock(&queueTable_mutex);
6251 break;
6252 }
6253 simpleMutexLock(&q->gc_mutex);
6254 simpleMutexUnlock(&queueTable_mutex);
6255 markAtomsMessageQueue(q);
6256 simpleMutexUnlock(&q->gc_mutex);
6257 }
6258 freeTableEnum(e);
6259 }
6260 }
6261
6262
6263 /*******************************
6264 * PREDICATES *
6265 *******************************/
6266
6267
6268 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6269 localiseDefinition(Definition def)
6270 Create a thread-local definition for the predicate `def'.
6271
6272 This function is called from getProcDefinition() if the procedure is
6273 not yet `localised'. Calling this function must be guarded by the
6274 L_PREDICATE mutex.
6275 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6276
6277 #ifndef offsetof
6278 #define offsetof(structure, field) ((int) &(((structure *)NULL)->field))
6279 #endif
6280
6281 static void
registerLocalDefinition(Definition def)6282 registerLocalDefinition(Definition def)
6283 { GET_LD
6284 DefinitionChain cell = allocHeapOrHalt(sizeof(*cell));
6285
6286 cell->definition = def;
6287 cell->next = LD->thread.local_definitions;
6288 LD->thread.local_definitions = cell;
6289 }
6290
6291
6292 static void
unregisterLocalDefinition(Definition def,PL_local_data_t * ld)6293 unregisterLocalDefinition(Definition def, PL_local_data_t *ld)
6294 { DefinitionChain cell;
6295
6296 for(cell = ld->thread.local_definitions; cell; cell = cell->next)
6297 { if ( cell->definition == def )
6298 { cell->definition = NULL;
6299 return;
6300 }
6301 }
6302 }
6303
6304
6305 LocalDefinitions
new_ldef_vector(void)6306 new_ldef_vector(void)
6307 { LocalDefinitions f = allocHeapOrHalt(sizeof(*f));
6308
6309 memset(f, 0, sizeof(*f));
6310 f->blocks[0] = f->preallocated - 1;
6311 f->blocks[1] = f->preallocated - 1;
6312 f->blocks[2] = f->preallocated - 1;
6313
6314 return f;
6315 }
6316
6317
6318 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6319 Called from destroyDefinition() for a thread-local predicate.
6320 destroyDefinition() is called for destroying temporary modules. If
6321 thread-local predicates are defined in the temporary module these must
6322 be destroyed and the registration with module must be removed or the
6323 thread cleanup will access the destroyed predicate.
6324
6325 Now, the thread having a localization for this predicate is most likely
6326 the calling thread, but in theory other threads can be involved and thus
6327 we need to scan the entire localization array.
6328 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6329
6330 void
destroyLocalDefinitions(Definition def)6331 destroyLocalDefinitions(Definition def)
6332 { GET_LD
6333 LocalDefinitions ldefs = def->impl.local.local;
6334 int b;
6335
6336 for(b=0; b<MAX_BLOCKS; b++)
6337 { Definition *d0 = ldefs->blocks[b];
6338
6339 if ( d0 )
6340 { size_t bs = (size_t)1<<b;
6341 size_t tid = bs;
6342 size_t end = tid+bs;
6343
6344 for(tid=bs; tid<end; tid++)
6345 { if ( d0[tid] )
6346 { PL_thread_info_t *info = GD->thread.threads[tid];
6347 PL_local_data_t *ld;
6348
6349 if ( LD ) /* See (*) */
6350 ld = acquire_ldata(info);
6351 else
6352 ld = info->thread_data;
6353
6354 DEBUG(MSG_THREAD_LOCAL,
6355 if ( ld != LD )
6356 Sdprintf("Destroying thread local predicate %s "
6357 "with active local definitions\n",
6358 predicateName(def)));
6359
6360 unregisterLocalDefinition(def, ld);
6361 destroyLocalDefinition(def, tid);
6362
6363 if ( LD )
6364 release_ldata(ld);
6365 }
6366 }
6367 }
6368 }
6369 }
6370
6371
6372 void
free_ldef_vector(LocalDefinitions ldefs)6373 free_ldef_vector(LocalDefinitions ldefs)
6374 { int i;
6375
6376 for(i=3; i<MAX_BLOCKS; i++)
6377 { size_t bs = (size_t)1<<i;
6378 Definition *d0 = ldefs->blocks[i];
6379
6380 if ( d0 )
6381 freeHeap(d0+bs, bs*sizeof(Definition));
6382 }
6383
6384 freeHeap(ldefs, sizeof(*ldefs));
6385 }
6386
6387
6388 Definition
localiseDefinition(Definition def)6389 localiseDefinition(Definition def)
6390 { Definition local = allocHeapOrHalt(sizeof(*local));
6391 size_t bytes = sizeof(arg_info)*def->functor->arity;
6392
6393 *local = *def;
6394 local->impl.any.args = allocHeapOrHalt(bytes);
6395 memcpy(local->impl.any.args, def->impl.any.args, bytes);
6396 clear(local, P_THREAD_LOCAL|P_DIRTYREG); /* remains P_DYNAMIC */
6397 local->impl.clauses.first_clause = NULL;
6398 local->impl.clauses.clause_indexes = NULL;
6399 ATOMIC_INC(&GD->statistics.predicates);
6400 ATOMIC_ADD(&local->module->code_size, sizeof(*local));
6401 DEBUG(MSG_PROC_COUNT, Sdprintf("Localise %s\n", predicateName(def)));
6402
6403 setSupervisor(local);
6404 registerLocalDefinition(def);
6405
6406 return local;
6407 }
6408
6409
6410 void
cleanupLocalDefinitions(PL_local_data_t * ld)6411 cleanupLocalDefinitions(PL_local_data_t *ld)
6412 { DefinitionChain ch = ld->thread.local_definitions;
6413 DefinitionChain next;
6414 unsigned int id = ld->thread.info->pl_tid;
6415
6416 for( ; ch; ch = next)
6417 { Definition def = ch->definition;
6418 next = ch->next;
6419
6420 if ( def )
6421 { DEBUG(MSG_CLEANUP,
6422 Sdprintf("Clean local def in thread %d for %s\n",
6423 id,
6424 predicateName(def)));
6425
6426 assert(true(def, P_THREAD_LOCAL));
6427 destroyLocalDefinition(def, id);
6428 }
6429 freeHeap(ch, sizeof(*ch));
6430 }
6431 }
6432
6433
6434 static size_t
sizeof_local_definitions(PL_local_data_t * ld)6435 sizeof_local_definitions(PL_local_data_t *ld)
6436 { DefinitionChain ch = ld->thread.local_definitions;
6437 size_t size = 0;
6438
6439 for( ; ch; ch = ch->next)
6440 { Definition def = ch->definition;
6441 Definition local;
6442
6443 assert(true(def, P_THREAD_LOCAL));
6444 if ( (local = getProcDefinitionForThread(def, ld->thread.info->pl_tid)) )
6445 size += sizeof_predicate(local);
6446 }
6447
6448 return size;
6449 }
6450
6451
6452
6453 /** '$thread_local_clause_count'(:Head, +Thread, -NumberOfClauses) is semidet.
6454
6455 True when NumberOfClauses is the number of clauses for the thread-local
6456 predicate Head in Thread. Fails silently if
6457
6458 - Head does not refer to an existing predicate
6459 - The predicate exists, but is not thread local (should
6460 this be an error?)
6461 - The thread does not exist
6462 - The definition was never localised for Thread. One
6463 could argue to return 0 for this case.
6464
6465 @author Keri Harris
6466 */
6467
6468 static
6469 PRED_IMPL("$thread_local_clause_count", 3, thread_local_clause_count, 0)
6470 { PRED_LD
6471 Procedure proc;
6472 Definition def;
6473 PL_thread_info_t *info;
6474 int number_of_clauses = 0;
6475
6476 term_t pred = A1;
6477 term_t thread = A2;
6478 term_t count = A3;
6479
6480 if ( !get_procedure(pred, &proc, 0, GP_RESOLVE) )
6481 fail;
6482
6483 def = proc->definition;
6484 if ( false(def, P_THREAD_LOCAL) )
6485 fail;
6486
6487 if ( !get_thread(thread, &info, FALSE) )
6488 fail;
6489
6490 PL_LOCK(L_THREAD);
6491 if ( (def = getProcDefinitionForThread(proc->definition, info->pl_tid)) )
6492 number_of_clauses = def->impl.clauses.number_of_clauses;
6493 PL_UNLOCK(L_THREAD);
6494
6495 return PL_unify_integer(count, number_of_clauses);
6496 }
6497
6498
6499 #else /*O_PLMT*/
6500
6501 int
signalGCThread(int sig)6502 signalGCThread(int sig)
6503 { GET_LD
6504
6505 return raiseSignal(LD, sig);
6506 }
6507
6508 int
isSignalledGCThread(int sig ARG_LD)6509 isSignalledGCThread(int sig ARG_LD)
6510 { return PL_pending(sig);
6511 }
6512
6513
6514 int
PL_thread_self()6515 PL_thread_self()
6516 { return -2;
6517 }
6518
6519
6520 int
PL_unify_thread_id(term_t t,int i)6521 PL_unify_thread_id(term_t t, int i)
6522 { if ( i == -2 )
6523 return PL_unify_atom(t, ATOM_main);
6524 return -1;
6525 }
6526
6527
6528 int
PL_thread_at_exit(void (* function)(void *),void * closure,int global)6529 PL_thread_at_exit(void (*function)(void *), void *closure, int global)
6530 { return FALSE;
6531 }
6532
6533 int
PL_thread_attach_engine(PL_thread_attr_t * attr)6534 PL_thread_attach_engine(PL_thread_attr_t *attr)
6535 { return -2;
6536 }
6537
6538 int
PL_thread_destroy_engine()6539 PL_thread_destroy_engine()
6540 { return FALSE;
6541 }
6542
6543 #ifdef __WINDOWS__
6544 int
PL_w32thread_raise(DWORD id,int sig)6545 PL_w32thread_raise(DWORD id, int sig)
6546 { return PL_raise(sig);
6547 }
6548 #endif
6549
6550 foreign_t
pl_thread_self(term_t id)6551 pl_thread_self(term_t id)
6552 { return PL_unify_atom(id, ATOM_main);
6553 }
6554
6555 PL_engine_t
PL_current_engine(void)6556 PL_current_engine(void)
6557 { return LD;
6558 }
6559
6560 int
PL_set_engine(PL_engine_t new,PL_engine_t * old)6561 PL_set_engine(PL_engine_t new, PL_engine_t *old)
6562 { if ( new != LD && new != PL_ENGINE_MAIN )
6563 return PL_ENGINE_INVAL;
6564
6565 if ( old )
6566 { *old = LD;
6567 }
6568
6569 return PL_ENGINE_SET;
6570 }
6571
6572 PL_engine_t
PL_create_engine(PL_thread_attr_t * attributes)6573 PL_create_engine(PL_thread_attr_t *attributes)
6574 { return NULL;
6575 }
6576
6577 int
PL_destroy_engine(PL_engine_t e)6578 PL_destroy_engine(PL_engine_t e)
6579 { fail;
6580 }
6581
6582 void
PL_cleanup_fork(void)6583 PL_cleanup_fork(void)
6584 {
6585 }
6586
6587 double
ThreadCPUTime(PL_local_data_t * ld,int which)6588 ThreadCPUTime(PL_local_data_t *ld, int which) {
6589 return CpuTime(which);
6590 }
6591
6592 void
initPrologThreads()6593 initPrologThreads()
6594 { /* TBD: only once? */
6595 #ifdef O_MULTIPLE_ENGINES
6596 PL_current_engine_ptr = &PL_local_data;
6597 #endif
6598 }
6599
6600 int
PL_get_thread_alias(int tid,atom_t * alias)6601 PL_get_thread_alias(int tid, atom_t *alias)
6602 { *alias = ATOM_main;
6603 return TRUE;
6604 }
6605
6606 #endif /*O_PLMT*/
6607
6608 int
get_prop_def(term_t t,atom_t expected,const tprop * list,const tprop ** def)6609 get_prop_def(term_t t, atom_t expected, const tprop *list, const tprop **def)
6610 { GET_LD
6611 functor_t f;
6612
6613 if ( PL_get_functor(t, &f) )
6614 { const tprop *p = list;
6615
6616 for( ; p->functor; p++ )
6617 { if ( f == p->functor )
6618 { *def = p;
6619 return TRUE;
6620 }
6621 }
6622
6623 PL_error(NULL, 0, NULL, ERR_DOMAIN, expected, t);
6624 return -1;
6625 }
6626
6627 if ( PL_is_variable(t) )
6628 return 0;
6629
6630 PL_error(NULL, 0, NULL, ERR_TYPE, expected, t);
6631 return -1;
6632 }
6633
6634
6635 /*******************************
6636 * STATISTICS *
6637 *******************************/
6638
6639 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6640 Find the summed size of the local stack. This is a measure for the CGC
6641 marking cost.
6642 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6643
6644 int
cgc_thread_stats(cgc_stats * stats ARG_LD)6645 cgc_thread_stats(cgc_stats *stats ARG_LD)
6646 {
6647 #ifdef O_PLMT
6648 int i;
6649
6650 for(i=1; i<=GD->thread.highest_id; i++)
6651 { PL_thread_info_t *info = GD->thread.threads[i];
6652 PL_local_data_t *ld = acquire_ldata(info);
6653
6654 if ( ld )
6655 { char *ltop = (char*)ld->stacks.local.top;
6656 char *lbase = (char*)ld->stacks.local.base;
6657
6658 if ( ltop > lbase )
6659 { stats->local_size += ltop-lbase;
6660 stats->threads++;
6661 stats->erased_skipped += ld->clauses.erased_skipped;
6662 }
6663 release_ldata(ld);
6664 }
6665 if ( GD->clauses.cgc_active )
6666 return FALSE;
6667 }
6668 #else
6669 stats->local_size = usedStack(local);
6670 stats->threads = 1;
6671 stats->erased_skipped = LD->clauses.erased_skipped;
6672 #endif
6673
6674 return TRUE;
6675 }
6676
6677
6678 /*******************************
6679 * HASH-TABLE KVS IN USE *
6680 *******************************/
6681
6682 int
pl_kvs_in_use(KVS kvs)6683 pl_kvs_in_use(KVS kvs)
6684 {
6685 #ifdef O_PLMT
6686 int i;
6687
6688 for(i=1; i<=GD->thread.highest_id; i++)
6689 { PL_thread_info_t *info = GD->thread.threads[i];
6690 if ( info && info->access.kvs == kvs )
6691 { return TRUE;
6692 }
6693 }
6694 #endif
6695
6696 return FALSE;
6697 }
6698
6699
6700 /*******************************
6701 * ATOM-TABLE IN USE *
6702 *******************************/
6703
6704 int
pl_atom_table_in_use(AtomTable atom_table)6705 pl_atom_table_in_use(AtomTable atom_table)
6706 {
6707 #ifdef O_PLMT
6708 int i;
6709
6710 for(i=1; i<=GD->thread.highest_id; i++)
6711 { PL_thread_info_t *info = GD->thread.threads[i];
6712 if ( info && info->access.atom_table == atom_table )
6713 { return TRUE;
6714 }
6715 }
6716 #endif
6717
6718 return FALSE;
6719 }
6720
6721
6722 int
pl_atom_bucket_in_use(Atom * bucket)6723 pl_atom_bucket_in_use(Atom *bucket)
6724 {
6725 #ifdef O_PLMT
6726 int i;
6727
6728 for(i=1; i<=GD->thread.highest_id; i++)
6729 { PL_thread_info_t *info = GD->thread.threads[i];
6730 if ( info && info->access.atom_bucket == bucket )
6731 { return TRUE;
6732 }
6733 }
6734 #endif
6735
6736 return FALSE;
6737 }
6738
6739
6740 #ifdef O_PLMT
6741 static int
ldata_in_use(PL_local_data_t * ld)6742 ldata_in_use(PL_local_data_t *ld)
6743 { int i;
6744
6745 for(i=1; i<=GD->thread.highest_id; i++)
6746 { PL_thread_info_t *info = GD->thread.threads[i];
6747 if ( info && info->access.ldata == ld )
6748 { return TRUE;
6749 }
6750 }
6751
6752 return FALSE;
6753 }
6754 #endif
6755
6756
6757 Atom**
pl_atom_buckets_in_use(void)6758 pl_atom_buckets_in_use(void)
6759 {
6760 #ifdef O_PLMT
6761 int i, index=0;
6762 size_t sz = 32;
6763
6764 Atom **buckets = allocHeapOrHalt(sz * sizeof(Atom*));
6765 memset(buckets, 0, sz * sizeof(Atom*));
6766
6767 for(i=1; i<=GD->thread.highest_id; i++)
6768 { PL_thread_info_t *info = GD->thread.threads[i];
6769 if ( info && info->access.atom_bucket )
6770 { if ( index >= sz-1 )
6771 { int j = 0;
6772 size_t oldsz = sz;
6773 sz *= 2;
6774 Atom **newbuckets = allocHeapOrHalt(sz * sizeof(Atom*));
6775 memset(newbuckets, 0, sz * sizeof(Atom*));
6776 for ( ; j < oldsz; j++ )
6777 { newbuckets[j] = buckets[j];
6778 }
6779 PL_free(buckets);
6780 buckets = newbuckets;
6781 }
6782 buckets[index] = info->access.atom_bucket;
6783 if ( buckets[index] ) /* atom_bucket may have been released */
6784 index++;
6785 }
6786 }
6787
6788 return buckets;
6789 #endif
6790
6791 return NULL;
6792 }
6793
6794
6795 Definition*
predicates_in_use(void)6796 predicates_in_use(void)
6797 {
6798 #ifdef O_PLMT
6799 int i, index=0;
6800 size_t sz = 32;
6801
6802 Definition *buckets = allocHeapOrHalt(sz * sizeof(Definition));
6803 memset(buckets, 0, sz * sizeof(Definition*));
6804
6805 for(i=1; i<=GD->thread.highest_id; i++)
6806 { PL_thread_info_t *info = GD->thread.threads[i];
6807 if ( info && info->access.predicate )
6808 { if ( index >= sz-1 )
6809 { int j = 0;
6810 size_t oldsz = sz;
6811 sz *= 2;
6812 Definition *newbuckets = allocHeapOrHalt(sz * sizeof(Definition));
6813 memset(newbuckets, 0, sz * sizeof(Definition));
6814 for ( ; j < oldsz; j++ )
6815 { newbuckets[j] = buckets[j];
6816 }
6817 PL_free(buckets);
6818 buckets = newbuckets;
6819 }
6820 buckets[index] = info->access.predicate;
6821 if ( buckets[index] ) /* atom_bucket may have been released */
6822 index++;
6823 }
6824 }
6825
6826 return buckets;
6827 #endif
6828
6829 return NULL;
6830 }
6831
6832
6833 /*******************************
6834 * FUNCTOR-TABLE IN USE *
6835 *******************************/
6836
6837 int
pl_functor_table_in_use(FunctorTable functor_table)6838 pl_functor_table_in_use(FunctorTable functor_table)
6839 {
6840 #ifdef O_PLMT
6841 int me = PL_thread_self();
6842 int i;
6843
6844 for(i=1; i<=GD->thread.highest_id; i++)
6845 { PL_thread_info_t *info = GD->thread.threads[i];
6846 if ( i != me && info && info->access.functor_table == functor_table )
6847 { return TRUE;
6848 }
6849 }
6850 #endif
6851
6852 return FALSE;
6853 }
6854
6855 /*******************************
6856 * CLAUSE/3 PREDICATE REFERENCES*
6857 *******************************/
6858
6859 #ifdef O_PLMT
6860
6861 static void
init_predicate_references(PL_local_data_t * ld)6862 init_predicate_references(PL_local_data_t *ld)
6863 { definition_refs *refs = &ld->predicate_references;
6864
6865 memset(refs, 0, sizeof(*refs));
6866 refs->blocks[0] = refs->preallocated - 1;
6867 refs->blocks[1] = refs->preallocated - 1;
6868 refs->blocks[2] = refs->preallocated - 1;
6869 }
6870
6871 static void
free_predicate_references(PL_local_data_t * ld)6872 free_predicate_references(PL_local_data_t *ld)
6873 { definition_refs *refs = &ld->predicate_references;
6874 int i;
6875
6876 for(i=3; i<MAX_BLOCKS; i++)
6877 { size_t bs = (size_t)1<<i;
6878 definition_ref *d0 = refs->blocks[i];
6879
6880 if ( d0 )
6881 freeHeap(d0+bs, bs*sizeof(definition_ref));
6882 }
6883 }
6884 #endif /*O_PLMT*/
6885
6886 static void
cgcActivatePredicate__LD(Definition def,gen_t gen ARG_LD)6887 cgcActivatePredicate__LD(Definition def, gen_t gen ARG_LD)
6888 { DirtyDefInfo ddi;
6889
6890 if ( (ddi=lookupHTable(GD->procedures.dirty, def)) )
6891 ddi_add_access_gen(ddi, gen);
6892 }
6893
6894
6895 definition_ref *
pushPredicateAccessObj(Definition def ARG_LD)6896 pushPredicateAccessObj(Definition def ARG_LD)
6897 { definition_refs *refs = &LD->predicate_references;
6898 definition_ref *dref;
6899 size_t top = refs->top+1;
6900 size_t idx = MSB(top);
6901
6902 DEBUG(MSG_CGC_PRED_REF,
6903 Sdprintf("pushPredicateAccess(%s)\n", predicateName(def)));
6904
6905 if ( !refs->blocks[idx] )
6906 { size_t bs = (size_t)1<<idx;
6907 definition_ref *newblock;
6908
6909 if ( !(newblock=PL_malloc_uncollectable(bs*sizeof(definition_ref))) )
6910 outOfCore();
6911
6912 memset(newblock, 0, bs*sizeof(definition_ref));
6913 if ( !COMPARE_AND_SWAP_PTR(&refs->blocks[idx], NULL, newblock-bs) )
6914 PL_free(newblock);
6915 }
6916
6917 enterDefinition(def); /* probably not needed in the end */
6918 dref = &refs->blocks[idx][top];
6919 dref->predicate = def;
6920 dref->generation = global_generation();
6921 refs->top = top;
6922 do
6923 { dref->generation = global_generation();
6924 } while ( dref->generation != global_generation() );
6925
6926 return dref;
6927 }
6928
6929
6930 gen_t
pushPredicateAccess__LD(Definition def ARG_LD)6931 pushPredicateAccess__LD(Definition def ARG_LD)
6932 { return pushPredicateAccessObj(def PASS_LD)->generation;
6933 }
6934
6935
6936 static definition_ref *
def_ref(const definition_refs * refs,size_t top)6937 def_ref(const definition_refs *refs, size_t top)
6938 { size_t idx = MSB(top);
6939
6940 return &refs->blocks[idx][top];
6941 }
6942
6943 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6944 Note that out-of-order access is possible. Matt Lilley created a case
6945 where the term_expansion/2 rule for end_of_file loads source files using
6946 forall(clause(chain_modules(M)), use_module(M)). This locks
6947 chain_modules/1. The created system:'$load_context_module/2' fact gets
6948 asserted to the load context and is popped when loading the file
6949 completes, while the reference to chain_modules/1 is popped when the
6950 enumeration completes.
6951 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6952
6953 void
popPredicateAccess__LD(Definition def ARG_LD)6954 popPredicateAccess__LD(Definition def ARG_LD)
6955 { definition_refs *refs = &LD->predicate_references;
6956 definition_ref *dref;
6957
6958 DEBUG(MSG_CGC_PRED_REF,
6959 Sdprintf("popPredicateAccess(%s)\n", predicateName(def)));
6960
6961 dref = def_ref(refs, refs->top);
6962 if ( dref->predicate == def )
6963 { dref->predicate = NULL;
6964 dref->generation = 0;
6965 } else
6966 { size_t top;
6967
6968 DEBUG(MSG_CGC_PRED_REF, Sdprintf(" Out or order!\n"));
6969 for(top = refs->top; top > 0; top-- )
6970 { dref = def_ref(refs, top);
6971 if ( dref->predicate == def )
6972 { for(; top < refs->top; top++)
6973 { definition_ref *dr2 = def_ref(refs, top+1);
6974
6975 *dref = *dr2;
6976 dref = dr2;
6977 }
6978 goto out;
6979 }
6980 }
6981 assert(0);
6982 }
6983 out:
6984 leaveDefinition(def); /* probably not needed in the end */
6985
6986 refs->top--;
6987 }
6988
6989 size_t
popNPredicateAccess__LD(size_t n ARG_LD)6990 popNPredicateAccess__LD(size_t n ARG_LD)
6991 { definition_refs *refs = &LD->predicate_references;
6992
6993 DEBUG(MSG_CGC_PRED_REF,
6994 Sdprintf("popNPredicateAccess(%d)\n", (int)n));
6995
6996 while(n-- > 0)
6997 { definition_ref *dref;
6998 size_t top = refs->top;
6999 size_t idx = MSB(top);
7000
7001 dref = &refs->blocks[idx][top];
7002 DEBUG(MSG_CGC_PRED_REF,
7003 Sdprintf(" -- %s\n", predicateName(dref->predicate)));
7004 leaveDefinition(dref->predicate);
7005 dref->predicate = NULL;
7006 dref->generation = 0;
7007
7008 refs->top--;
7009 }
7010
7011 return refs->top;
7012 }
7013
7014
7015 static inline int
is_pointer_like(void * ptr)7016 is_pointer_like(void *ptr)
7017 {
7018 #if SIZEOF_VOIDP == 4
7019 intptr_t mask = 0x3;
7020 #elif SIZEOF_VOIDP == 8
7021 intptr_t mask = 0x7;
7022 #else
7023 #error "Unknown pointer size"
7024 #endif
7025 return ptr && ((intptr_t)ptr&mask) == 0;
7026 }
7027
7028 void
markAccessedPredicates(PL_local_data_t * ld)7029 markAccessedPredicates(PL_local_data_t *ld)
7030 { GET_LD
7031 definition_refs *refs = &ld->predicate_references;
7032 size_t i;
7033
7034 for(i=1; i<=refs->top; i++)
7035 { int idx = MSB(i);
7036 volatile definition_ref *drefp = &refs->blocks[idx][i];
7037 definition_ref dref = *drefp; /* struct copy */
7038
7039 if ( is_pointer_like(dref.predicate) )
7040 cgcActivatePredicate__LD(dref.predicate, dref.generation PASS_LD);
7041 }
7042 }
7043
7044 /*******************************
7045 * PUBLISH PREDICATES *
7046 *******************************/
7047
7048 #define NDET PL_FA_NONDETERMINISTIC
7049
7050 BeginPredDefs(thread)
7051 #ifdef O_PLMT
7052 PRED_DEF("thread_alias", 1, thread_alias, 0)
7053 PRED_DEF("thread_detach", 1, thread_detach, PL_FA_ISO)
7054 PRED_DEF("thread_join", 2, thread_join, 0)
7055 PRED_DEF("thread_statistics", 3, thread_statistics, 0)
7056 PRED_DEF("thread_property", 2, thread_property, NDET|PL_FA_ISO)
7057 PRED_DEF("is_thread", 1, is_thread, 0)
7058 PRED_DEF("$thread_sigwait", 1, thread_sigwait, 0)
7059 #ifdef HAVE_PRED_THREAD_AFFINITY
7060 PRED_DEF("thread_affinity", 3, thread_affinity, 0)
7061 #endif
7062
7063 PRED_DEF("message_queue_create", 1, message_queue_create, 0)
7064 PRED_DEF("message_queue_create", 2, message_queue_create2, PL_FA_ISO)
7065 PRED_DEF("message_queue_property", 2, message_property, NDET|PL_FA_ISO)
7066 PRED_DEF("message_queue_set", 2, message_queue_set, 0)
7067
7068 PRED_DEF("thread_send_message", 2, thread_send_message, PL_FA_ISO)
7069 PRED_DEF("thread_send_message", 3, thread_send_message, 0)
7070 PRED_DEF("thread_get_message", 1, thread_get_message, PL_FA_ISO)
7071 PRED_DEF("thread_get_message", 2, thread_get_message, PL_FA_ISO)
7072 PRED_DEF("thread_get_message", 3, thread_get_message, PL_FA_ISO)
7073 PRED_DEF("thread_peek_message", 1, thread_peek_message_1, PL_FA_ISO)
7074 PRED_DEF("thread_peek_message", 2, thread_peek_message_2, PL_FA_ISO)
7075 PRED_DEF("message_queue_destroy", 1, message_queue_destroy, PL_FA_ISO)
7076 PRED_DEF("thread_setconcurrency", 2, thread_setconcurrency, 0)
7077
7078 PRED_DEF("$engine_create", 3, engine_create, 0)
7079 PRED_DEF("engine_destroy", 1, engine_destroy, 0)
7080 PRED_DEF("engine_next", 2, engine_next, 0)
7081 PRED_DEF("engine_post", 2, engine_post, 0)
7082 PRED_DEF("engine_post", 3, engine_post, 0)
7083 PRED_DEF("engine_fetch", 1, engine_fetch, 0)
7084 PRED_DEF("is_engine", 1, is_engine, 0)
7085
7086 PRED_DEF("mutex_statistics", 0, mutex_statistics, 0)
7087
7088 PRED_DEF("$thread_local_clause_count", 3, thread_local_clause_count, 0)
7089 PRED_DEF("$gc_wait", 1, gc_wait, 0)
7090 PRED_DEF("$gc_clear", 1, gc_clear, 0)
7091 PRED_DEF("$gc_stop", 0, gc_stop, 0)
7092 #endif
7093 EndPredDefs
7094