1 /*  Part of SWI-Prolog
2 
3     Author:        Jan Wielemaker
4     E-mail:        J.Wielemaker@vu.nl
5     WWW:           http://www.swi-prolog.org
6     Copyright (c)  1999-2020, University of Amsterdam,
7                               VU University Amsterdam
8 			      CWI, Amsterdam
9     All rights reserved.
10 
11     Redistribution and use in source and binary forms, with or without
12     modification, are permitted provided that the following conditions
13     are met:
14 
15     1. Redistributions of source code must retain the above copyright
16        notice, this list of conditions and the following disclaimer.
17 
18     2. Redistributions in binary form must reproduce the above copyright
19        notice, this list of conditions and the following disclaimer in
20        the documentation and/or other materials provided with the
21        distribution.
22 
23     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24     "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26     FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27     COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28     INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29     BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
31     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
32     LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
33     ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34     POSSIBILITY OF SUCH DAMAGE.
35 */
36 
37 /* #define O_DEBUG 1 */
38 
39 #define _GNU_SOURCE 1			/* get recursive mutex stuff to */
40 					/* compile clean with glibc.  Can */
41 					/* this do any harm? */
42 #ifdef __WINDOWS__
43 #include <winsock2.h>
44 #include <windows.h>
45 #include <errno.h>			/* must be before pl-incl.h */
46 #endif
47 
48 #if __MINGW32__
49 #define __try
50 #define __except(_) if (0)
51 #define __finally
52 #endif
53 
54 #include "pl-incl.h"
55 #include "pl-tabling.h"
56 #include "os/pl-cstack.h"
57 #include "pl-prof.h"
58 #include "pl-event.h"
59 #include <stdio.h>
60 #include <math.h>
61 
62 #if __WINDOWS__				/* this is a stub.  Should be detected */
63 #undef HAVE_PTHREAD_SETNAME_NP		/* in configure.ac */
64 #endif
65 
66 #ifdef O_PLMT
67 
68 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
69 			   MULTITHREADING SUPPORT
70 
71 APPROACH
72 ========
73 
74     * Prolog threads are C-threads
75       Prolog multi-threading is based upon a C thread library.  At first,
76       we will concentrate on the Posix pthread standard, due to its wide
77       availability especially on Unix systems.
78 
79       This approach has some clear advantages: clean mixing of thread-based
80       foreign code and portability.
81 
82     * Data
83       All global data that cannot be removed is split into three large
84       structures:
85 
86 	+ PL_code_data
87 	  This structure contains `code' data: data that is set up at
88 	  initialisation time and never changed afterwards.
89 	  PL_initialise() initialises this and no further precautions
90 	  are needed.
91 
92 	+ PL_global_data
93 	  This structure contains all global data required for the
94 	  Prolog `heap'.  This data is shared between threads and
95 	  access should be properly synchronised.
96 
97 	+ PL_local_data
98 	  This structure contains the thread-local data.  If a new
99 	  Prolog engine is initialised in a thread, a new copy of this
100 	  structure is allocated and initialised.
101 
102 	  For compatibility reasons, we cannot pass this pointer around
103 	  as an argument between all functions in the system.  We will
104 	  locate it through the thread-id using a function.  Any function
105 	  requiring frequent access can fetch this pointer once at
106 	  start-up.  Cooperating functions can pass this pointer.
107 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
108 
109 #include <errno.h>
110 #if defined(__linux__)
111 #include <syscall.h>
112 #ifdef HAVE_GETTID_MACRO
113 _syscall0(pid_t,gettid)
114 #endif
115 #endif
116 
117 #ifdef HAVE_SYS_RESOURCE_H
118 #include <sys/resource.h>
119 #endif
120 
121 #ifdef HAVE_SYS_SYSCALL_H
122 #include <sys/syscall.h>
123 #endif
124 
125 #ifdef HAVE_SYS_CPUSET_H
126 #include <sys/param.h>         /* pulls sys/cdefs.h and sys/types.h for sys/cpuset.h */
127 #include <sys/cpuset.h>        /* CPU_ZERO(), CPU_SET, cpuset_t */
128 #endif
129 #ifdef HAVE_PTHREAD_NP_H
130 #include <pthread_np.h>        /* pthread_*_np */
131 #endif
132 #ifdef HAVE_CPUSET_T
133 typedef cpuset_t cpu_set_t;
134 #endif
135 
136 #ifdef HAVE_SEMA_INIT			/* Solaris */
137 #include <synch.h>
138 
139 typedef sema_t sem_t;
140 #define sem_trywait(s)	sema_trywait(s)
141 #define sem_destroy(s)	sema_destroy(s)
142 #define sem_post(s)	sema_post(s)
143 #define sem_init(s, type, cnt) sema_init(s, cnt, type, NULL)
144 
145 #else /*HAVE_SEMA_INIT*/
146 #include <semaphore.h>
147 
148 #ifndef USYNC_THREAD
149 #define USYNC_THREAD 0
150 #endif
151 
152 #endif /*HAVE_SEMA_INIT*/
153 
154 #ifdef USE_SEM_OPEN			/* see below */
155 static sem_t *sem_canceled_ptr;
156 #else
157 static sem_t sem_canceled;		/* used on halt */
158 #define sem_canceled_ptr (&sem_canceled)
159 #endif
160 
161 #ifndef __WINDOWS__
162 #include <signal.h>
163 
164 #ifdef USE_SEM_OPEN
165 
166 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
167 Apple Darwin (6.6) only contains the sem_init()   function as a stub. It
168 only provides named semaphores  through   sem_open().  These defines and
169 my_sem_open() try to hide the details of   this as much as possible from
170 the rest of the code. Note  that   we  unlink  the semaphore right after
171 creating it, using the common Unix trick to keep access to it as long as
172 we do not close it. We assume  the   OS  will close the semaphore as the
173 application terminates. All this is highly   undesirable, but it will do
174 for now. The USE_SEM_OPEN define  is  set   by  configure  based  on the
175 substring "darwin" in the architecture identifier.
176 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
177 
178 #define sem_init(ptr, flags, val) my_sem_open(&ptr, val)
179 #define sem_destroy(ptr)	  ((void)0)
180 
181 static int
my_sem_open(sem_t ** ptr,unsigned int val)182 my_sem_open(sem_t **ptr, unsigned int val)
183 { if ( !*ptr )
184   { sem_t *sem = sem_open("pl", O_CREAT|O_EXCL, 0600, val);
185 
186     DEBUG(MSG_THREAD, Sdprintf("sem = %p\n", sem));
187 
188     if ( sem == NULL )
189     { perror("sem_open");
190       exit(1);
191     }
192 
193     *ptr = sem;
194 
195     sem_unlink("pl");
196   }
197 
198   return 0;
199 }
200 
201 #endif /*USE_SEM_OPEN*/
202 
203 #ifndef SA_RESTART
204 #define SA_RESTART 0
205 #endif
206 #endif /*!__WINDOWS__*/
207 
208 #ifdef __WINDOWS__
209 /* Deal with different versions of the windows thread library
210 */
211 
212 static HANDLE
get_windows_thread(PL_thread_info_t * info)213 get_windows_thread(PL_thread_info_t *info)
214 {
215 #ifdef HAVE_PTHREAD_GETW32THREADHANDLE_NP
216   HANDLE wt = NULL;
217   __try
218   { wt = pthread_getw32threadhandle_np(info->tid);
219   } __except(EXCEPTION_EXECUTE_HANDLER)
220   return wt;
221 #else
222   return OpenThread(THREAD_ALL_ACCESS, FALSE, info->w32id);
223 #endif
224 }
225 
226 static void
close_windows_thread(HANDLE wt)227 close_windows_thread(HANDLE wt)
228 {
229 #ifndef HAVE_PTHREAD_GETW32THREADHANDLE_NP
230   CloseHandle(wt);
231 #endif
232 }
233 #endif /*__WINDOWS__*/
234 
235 
236 		 /*******************************
237 		 *	    GLOBAL DATA		*
238 		 *******************************/
239 
240 static Table threadTable;		/* name --> reference symbol */
241 static int threads_ready = FALSE;	/* Prolog threads available */
242 static Table queueTable;		/* name --> queue */
243 static simpleMutex queueTable_mutex;	/* GC synchronization */
244 static int will_exec;			/* process will exec soon */
245 
246 #ifdef HAVE___THREAD
247 __thread PL_local_data_t *GLOBAL_LD;
248 #else
249 TLD_KEY PL_ldata;			/* key for thread PL_local_data */
250 #endif
251 
252 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
253 The global mutexes. Most are using  within   a  module and their name is
254 simply the module-name. The idea is that   a module holds a coherent bit
255 of data that needs a mutex for all operations.
256 
257 Some remarks:
258 
259     L_MISC
260 	General-purpose mutex.  Should only be used for simple, very
261 	local tasks and may not be used to lock anything significant.
262 
263     __WINDOWS__
264 	We use native windows CRITICAL_SECTIONS for mutexes here to
265 	get the best performance, notably on single-processor hardware.
266 	This is selected in pl-mutex.h based on the macro
267 	USE_CRITICAL_SECTIONS
268 
269 	Unfortunately critical sections have no static initialiser,
270 	so we need something called before anything else happens.  This
271 	can only be DllMain().
272 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
273 
274 #ifdef USE_CRITICAL_SECTIONS
275 #undef PTHREAD_MUTEX_INITIALIZER
276 #define PTHREAD_MUTEX_INITIALIZER {0}
277 #endif
278 
279 #define COUNT_MUTEX_INITIALIZER(name) \
280  { PTHREAD_MUTEX_INITIALIZER, \
281    name, \
282    0L \
283  }
284 
285 /* NOTE: These must be kept in sequence such that they align with
286    the #defines for L_* in pl-thread.h
287 */
288 
289 counting_mutex _PL_mutexes[] =
290 { COUNT_MUTEX_INITIALIZER("L_MISC"),		/* 0 */
291   COUNT_MUTEX_INITIALIZER("L_ALLOC"),
292   COUNT_MUTEX_INITIALIZER("L_REHASH_ATOMS"),
293   COUNT_MUTEX_INITIALIZER("L_FLAG"),
294   COUNT_MUTEX_INITIALIZER("L_FUNCTOR"),
295   COUNT_MUTEX_INITIALIZER("L_RECORD"),
296   COUNT_MUTEX_INITIALIZER("L_THREAD"),
297   COUNT_MUTEX_INITIALIZER("L_MUTEX"),
298   COUNT_MUTEX_INITIALIZER("L_PREDICATE"),
299   COUNT_MUTEX_INITIALIZER("L_MODULE"),
300   COUNT_MUTEX_INITIALIZER("L_SRCFILE"),		/* 10 */
301   COUNT_MUTEX_INITIALIZER("L_TABLE"),
302   COUNT_MUTEX_INITIALIZER("L_BREAK"),
303   COUNT_MUTEX_INITIALIZER("L_FILE"),
304   COUNT_MUTEX_INITIALIZER("L_SEETELL"),
305   COUNT_MUTEX_INITIALIZER("L_PLFLAG"),
306   COUNT_MUTEX_INITIALIZER("L_OP"),
307   COUNT_MUTEX_INITIALIZER("L_INIT"),
308   COUNT_MUTEX_INITIALIZER("L_TERM"),
309   COUNT_MUTEX_INITIALIZER("L_FOREIGN"),
310   COUNT_MUTEX_INITIALIZER("L_OS"),		/* 20 */
311   COUNT_MUTEX_INITIALIZER("L_LOCALE"),
312   COUNT_MUTEX_INITIALIZER("L_SORTR"),
313   COUNT_MUTEX_INITIALIZER("L_UMUTEX"),
314   COUNT_MUTEX_INITIALIZER("L_INIT_ATOMS"),
315   COUNT_MUTEX_INITIALIZER("L_CGCGEN"),
316   COUNT_MUTEX_INITIALIZER("L_EVHOOK"),
317   COUNT_MUTEX_INITIALIZER("L_OSDIR"),
318   COUNT_MUTEX_INITIALIZER("L_ALERT")
319 #ifdef __WINDOWS__
320 , COUNT_MUTEX_INITIALIZER("L_DDE")
321 , COUNT_MUTEX_INITIALIZER("L_CSTACK")
322 #endif
323 };
324 
325 
326 static void
link_mutexes(void)327 link_mutexes(void)
328 { counting_mutex *m;
329   int n = sizeof(_PL_mutexes)/sizeof(*m);
330   int i;
331 
332   GD->thread.mutexes = _PL_mutexes;
333   for(i=0, m=_PL_mutexes; i<n-1; i++, m++)
334     m->next = m+1;
335 }
336 
337 
338 #ifdef USE_CRITICAL_SECTIONS
339 
340 static void
W32initMutexes(void)341 W32initMutexes(void)
342 { static int done = FALSE;
343   counting_mutex *m;
344   int n = sizeof(_PL_mutexes)/sizeof(*m);
345   int i;
346 
347   if ( done )
348     return;
349   done = TRUE;
350 
351   for(i=0, m=_PL_mutexes; i<n; i++, m++)
352     simpleMutexInit(&m->mutex);
353 }
354 
355 static void
deleteMutexes()356 deleteMutexes()
357 { counting_mutex *m;
358   int n = sizeof(_PL_mutexes)/sizeof(*m);
359   int i;
360 
361   for(i=0, m=_PL_mutexes; i<n; i++, m++)
362     simpleMutexDelete(&m->mutex);
363 }
364 
365 
366 #ifdef O_SHARED_KERNEL
367 
368 BOOL WINAPI
DllMain(HINSTANCE hinstDll,DWORD fdwReason,LPVOID lpvReserved)369 DllMain(HINSTANCE hinstDll, DWORD fdwReason, LPVOID lpvReserved)
370 { BOOL result = TRUE;
371 
372   switch(fdwReason)
373   { case DLL_PROCESS_ATTACH:
374       GD->thread.instance = hinstDll;
375       W32initMutexes();
376       TLD_alloc(&PL_ldata);
377       break;
378     case DLL_PROCESS_DETACH:
379       deleteMutexes();
380       break;
381     case DLL_THREAD_ATTACH:
382     case DLL_THREAD_DETACH:
383       break;
384   }
385 
386   return result;
387 }
388 
389 #endif /*O_SHARED_KERNEL*/
390 
391 #endif /*USE_CRITICAL_SECTIONS*/
392 
393 static
394 PRED_IMPL("mutex_statistics", 0, mutex_statistics, 0)
395 { PRED_LD
396   counting_mutex *cm;
397   IOSTREAM *s = Scurout;
398 
399 #ifdef O_CONTENTION_STATISTICS
400   Sfprintf(s, "Name                                                       locked collisions\n"
401 	      "----------------------------------------------------------------------------\n");
402 #else
403   Sfprintf(s, "Name                               locked\n"
404 	      "-----------------------------------------\n");
405 #endif
406   PL_LOCK(L_MUTEX);
407   for(cm = GD->thread.mutexes; cm; cm = cm->next)
408   { int lc;
409 
410     if ( cm->count == 0 )
411       continue;
412 
413     Sfprintf(s, "%-56Us %8d", cm->name, cm->count); /* %Us: UTF-8 string */
414 #ifdef O_CONTENTION_STATISTICS
415     Sfprintf(s, " %8d", cm->collisions);
416 #endif
417     lc = (cm == &_PL_mutexes[L_MUTEX] ? 1 : 0);
418 
419     if ( cm->lock_count > lc )
420       Sfprintf(s, " LOCKS: %d\n", cm->lock_count - lc);
421     else
422       Sfprintf(s, "\n");
423   }
424   PL_UNLOCK(L_MUTEX);
425 
426   succeed;
427 }
428 
429 
430 #ifdef PTW32_STATIC_LIB
431 static void
win_thread_initialize(void)432 win_thread_initialize(void)
433 { static int done = FALSE;
434 
435   if ( done )
436     return;
437   done = TRUE;
438   ptw32_processInitialize();
439 }
440 #endif
441 
442 		 /*******************************
443 		 *	  LOCAL PROTOTYPES	*
444 		 *******************************/
445 
446 static PL_thread_info_t *alloc_thread(void);
447 static void	destroy_message_queue(message_queue *queue);
448 static void	destroy_thread_message_queue(message_queue *queue);
449 static void	init_message_queue(message_queue *queue, size_t max_size);
450 static size_t	sizeof_message_queue(message_queue *queue);
451 static size_t	sizeof_local_definitions(PL_local_data_t *ld);
452 static void	freeThreadSignals(PL_local_data_t *ld);
453 static thread_handle *create_thread_handle(PL_thread_info_t *info);
454 static void	free_thread_info(PL_thread_info_t *info);
455 static void	set_system_thread_id(PL_thread_info_t *info);
456 static thread_handle *symbol_thread_handle(atom_t a);
457 static void	destroy_interactor(thread_handle *th, int gc);
458 static PL_engine_t PL_current_engine(void);
459 static void	detach_engine(PL_engine_t e);
460 
461 static int	unify_queue(term_t t, message_queue *q);
462 static int	get_message_queue_unlocked__LD(term_t t, message_queue **queue ARG_LD);
463 static int	get_message_queue__LD(term_t t, message_queue **queue ARG_LD);
464 static void	release_message_queue(message_queue *queue);
465 static void	initMessageQueues(void);
466 static int	get_thread(term_t t, PL_thread_info_t **info, int warn);
467 static int	is_alive(int status);
468 static void	init_predicate_references(PL_local_data_t *ld);
469 static void	free_predicate_references(PL_local_data_t *ld);
470 static int	ldata_in_use(PL_local_data_t *ld);
471 #ifdef O_C_BACKTRACE
472 static void	print_trace(int depth);
473 #else
474 #define		print_trace(depth) (void)0
475 #endif
476 
477 static void timespec_diff(struct timespec *diff,
478 			  const struct timespec *a, const struct timespec *b);
479 static int  timespec_sign(const struct timespec *t);
480 
481 		 /*******************************
482 		 *	     LOCAL DATA		*
483 		 *******************************/
484 
485 #undef LD
486 #define LD LOCAL_LD
487 
488 
489 		 /*******************************
490 		 *	       ERRORS		*
491 		 *******************************/
492 
493 static char *
ThError(int e)494 ThError(int e)
495 { return strerror(e);
496 }
497 
498 
499 		 /*******************************
500 		 *     RUNTIME ENABLE/DISABLE	*
501 		 *******************************/
502 
503 int
enableThreads(int enable)504 enableThreads(int enable)
505 { if ( enable )
506   { GD->thread.enabled = TRUE;		/* print system message? */
507   } else
508   { PL_LOCK(L_THREAD);
509     if ( GD->statistics.threads_created -
510 	 GD->statistics.threads_finished == 1 ) /* I am alone :-( */
511     { GD->thread.enabled = FALSE;
512     } else
513     { GET_LD
514       term_t key = PL_new_term_ref();
515 
516       PL_put_atom(key, ATOM_threads);
517 
518       PL_UNLOCK(L_THREAD);
519       return PL_error(NULL, 0, "Active threads",
520 		      ERR_PERMISSION,
521 		      ATOM_modify, ATOM_flag, key);
522     }
523     PL_UNLOCK(L_THREAD);
524   }
525 
526   succeed;
527 }
528 
529 
530 		 /*******************************
531 		 *	 THREAD ALLOCATION	*
532 		 *******************************/
533 
534 static int
initialise_thread(PL_thread_info_t * info)535 initialise_thread(PL_thread_info_t *info)
536 { assert(info->thread_data);
537 
538   TLD_set_LD(info->thread_data);
539 
540   if ( !info->stack_limit ) info->stack_limit = GD->options.stackLimit;
541   if ( !info->table_space ) info->table_space = GD->options.tableSpace;
542 
543   if ( !initPrologStacks(info->stack_limit) )
544   { info->status = PL_THREAD_NOMEM;
545     TLD_set_LD(NULL);
546     return FALSE;
547   }
548 
549   initPrologLocalData(info->thread_data);
550   info->thread_data->magic = LD_MAGIC;
551 
552   return TRUE;
553 }
554 
555 
556 static void
free_local_data(PL_local_data_t * ld)557 free_local_data(PL_local_data_t *ld)
558 { simpleMutexDelete(&ld->thread.scan_lock);
559   freeHeap(ld, sizeof(*ld));
560 }
561 
562 static PL_local_data_t *ld_free_list = NULL;
563 
564 static void
clean_ld_free_list(void)565 clean_ld_free_list(void)
566 { if ( ld_free_list )
567   { PL_local_data_t *ld, **prev = &ld_free_list;
568     for(ld = ld_free_list; ld; ld=*prev)
569     { if ( !ldata_in_use(ld) )
570       { *prev = ld->next_free;
571 	free_local_data(ld);
572       } else
573       { prev = &ld->next_free;
574       }
575     }
576   }
577 }
578 
579 static void
maybe_free_local_data(PL_local_data_t * ld)580 maybe_free_local_data(PL_local_data_t *ld)
581 { if ( !ldata_in_use(ld) )
582   { free_local_data(ld);
583   } else
584   { PL_LOCK(L_THREAD);
585     clean_ld_free_list();
586     ld->next_free = ld_free_list;
587     ld_free_list = ld;
588     PL_UNLOCK(L_THREAD);
589   }
590 }
591 
592 
593 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
594 free_prolog_thread()
595     Called from a cleanup-handler to release all resources associated
596     with a thread.
597 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
598 
599 static void
freePrologThread(PL_local_data_t * ld,int after_fork)600 freePrologThread(PL_local_data_t *ld, int after_fork)
601 { PL_thread_info_t *info;
602   int acknowledge;
603   double time;
604   PL_local_data_t *old_ld;
605 
606   { GET_LD
607     old_ld = LD;
608   }
609 
610   if ( !threads_ready )
611     return;				/* Post-mortem */
612 
613   TLD_set_LD(ld);
614   { GET_LD
615 
616     info = ld->thread.info;
617     DEBUG(MSG_THREAD, Sdprintf("Freeing prolog thread %d (status = %d)\n",
618 			       info->pl_tid, info->status));
619 
620     if ( !after_fork )
621     { int rc1, rc2;
622 
623       PL_LOCK(L_THREAD);
624       if ( info->status == PL_THREAD_RUNNING )
625 	info->status = PL_THREAD_EXITED;	/* foreign pthread_exit() */
626       acknowledge = (ld->exit_requested == EXIT_REQ_PROCESS);
627       PL_UNLOCK(L_THREAD);
628 
629       if ( ld->stacks.argument.base )		/* are stacks initialized? */
630       { ld->critical++;   /* startCritical  */
631 	info->in_exit_hooks = TRUE;
632 	if ( LD == ld )
633 	  rc1 = callEventHook(PLEV_THIS_THREAD_EXIT);
634 	else
635 	  rc1 = TRUE;
636 	rc2 = callEventHook(PLEV_THREAD_EXIT, info);
637 	if ( (!rc1 || !rc2) && exception_term )
638 	{ Sdprintf("Event hook \"thread_finished\" left an exception\n");
639 	  PL_write_term(Serror, exception_term, 1200,
640 			PL_WRT_QUOTED|PL_WRT_NEWLINE);
641 	  PL_clear_exception();
642 	}
643 	info->in_exit_hooks = FALSE;
644 	ld->critical--;   /* endCritical */
645       }
646     } else
647     { acknowledge = FALSE;
648       info->detached = TRUE;		/* cleanup */
649     }
650 
651   #ifdef O_PROFILE
652     if ( ld->profile.active )
653       activateProfiler(FALSE, ld);
654   #endif
655 
656     destroy_event_list(&ld->event.hook.onthreadexit);
657     cleanupLocalDefinitions(ld);
658 
659     DEBUG(MSG_THREAD, Sdprintf("Destroying data\n"));
660     ld->magic = 0;
661     if ( ld->stacks.global.base )		/* otherwise not initialised */
662     { simpleMutexLock(&ld->thread.scan_lock);
663       freeStacks(ld);
664       simpleMutexUnlock(&ld->thread.scan_lock);
665     }
666     freePrologLocalData(ld);
667 
668     /*PL_unregister_atom(ld->prompt.current);*/
669 
670     freeThreadSignals(ld);
671     time = info->is_engine ? 0.0 : ThreadCPUTime(ld, CPU_USER);
672 
673     if ( !after_fork )
674     { PL_LOCK(L_THREAD);
675       GD->statistics.threads_finished++;
676       assert(GD->statistics.threads_created - GD->statistics.threads_finished >= 1);
677       GD->statistics.thread_cputime += time;
678     }
679     destroy_thread_message_queue(&ld->thread.messages);
680     free_predicate_references(ld);
681     if ( ld->btrace_store )
682     { btrace_destroy(ld->btrace_store);
683       ld->btrace_store = NULL;
684     }
685   #ifdef O_LOCALE
686     if ( ld->locale.current )
687       releaseLocale(ld->locale.current);
688   #endif
689     info->thread_data = NULL;		/* avoid a loop */
690     info->has_tid = FALSE;		/* needed? */
691     if ( !after_fork )
692       PL_UNLOCK(L_THREAD);
693 
694     if ( info->detached || acknowledge )
695       free_thread_info(info);
696 
697     ld->thread.info = NULL;		/* help force a crash if ld used */
698     maybe_free_local_data(ld);
699 
700     if ( acknowledge )			/* == canceled */
701     { DEBUG(MSG_CLEANUP_THREAD,
702 	    Sdprintf("Acknowledge dead of %d\n", info->pl_tid));
703       pthread_detach(pthread_self());
704       sem_post(sem_canceled_ptr);
705     }
706   }
707 
708   TLD_set_LD(old_ld);
709 }
710 
711 
712 static void
free_prolog_thread(void * data)713 free_prolog_thread(void *data)
714 { PL_local_data_t *ld = data;
715 
716   freePrologThread(ld, FALSE);
717 }
718 
719 
720 #ifdef O_QUEUE_STATS
721 static void msg_statistics(void);
722 #endif
723 
724 #ifdef O_ATFORK					/* Not yet default */
725 
726 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
727 reinit_threads_after_fork() is called after a  fork   in  the child. Its
728 task is to  restore  the  consistency   of  the  thread  information. It
729 performs the following tasks:
730 
731     * Reinitialize all mutexes we know of
732       Just calling simpleMutexInit() seems a bit dubious, but as we have
733       no clue about their locking status, what are the alternatives?
734     * Make the thread the main (=1) thread if this is not already the
735       case.  This is needed if another thread has initiated the fork.
736     * Reclaim stacks and other resources associated with other threads
737       that existed before the fork.
738 
739 There are several issues with the current implementation:
740 
741     * If fork/1 is called while the calling thread holds a mutex (as in
742       with_mutex/2), the consistency of this mutex will be lost.
743     * I doubt we have all mutexes.  Certainly not of the packages.
744       We should check at least I/O, user-level mutexes and other mutexes
745       that are created locally.
746     * If another thread than the main thread forks, we still need to
747       properly reclaim the main-thread resources.
748 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
749 
750 static void
reinit_threads_after_fork(void)751 reinit_threads_after_fork(void)
752 { GET_LD
753   counting_mutex *m;
754   PL_thread_info_t *info;
755   int i;
756 
757   if ( will_exec ||
758        (GD->statistics.threads_created - GD->statistics.threads_finished) == 1)
759     return;					/* no point */
760 
761   info = LD->thread.info;
762 
763   for(m = GD->thread.mutexes; m; m = m->next)
764   { simpleMutexInit(&m->mutex);			/* Dubious */
765     m->count = 0;
766     m->lock_count = 0;
767 #ifdef O_CONTENTION_STATISTICS
768     m->collisions = 0;
769 #endif
770   }
771 
772   if ( info->pl_tid != 1 )
773   { DEBUG(MSG_THREAD, Sdprintf("Forked thread %d\n", info->pl_tid));
774     *GD->thread.threads[1] = *info;
775     info->status = PL_THREAD_UNUSED;
776     info = GD->thread.threads[1];
777     LD->thread.info = info;
778     info->pl_tid = 1;
779   }
780   set_system_thread_id(info);
781 
782   for(i=2; i<=GD->thread.highest_id; i++)
783   { if ( (info=GD->thread.threads[i]) )
784     { if ( info->status != PL_THREAD_UNUSED )
785       { freePrologThread(info->thread_data, TRUE);
786 	info->status = PL_THREAD_UNUSED;
787       }
788     }
789   }
790   GD->thread.highest_id = 1;
791 
792   GD->statistics.thread_cputime = 0.0;
793   GD->statistics.threads_created = 1;
794   GD->statistics.threads_finished = 0;
795 
796   assert(PL_thread_self() == 1);
797 }
798 
799 #else
800 
801 #undef pthread_atfork				/* may be a macro */
802 #define pthread_atfork(prep, parent, child) (void)0
803 
804 #endif /*O_ATFORK*/
805 
806 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
807 PL_cleanup_fork() must be called between  fork()   and  exec() to remove
808 traces of Prolog that are not  supposed   to  leak into the new process.
809 Note that we must be careful  here.   Notably,  the  code cannot lock or
810 unlock any mutex as the behaviour of mutexes is undefined over fork().
811 
812 Earlier versions used the file-table to  close file descriptors that are
813 in use by Prolog. This can't work as   the  table is guarded by a mutex.
814 Now we use the FD_CLOEXEC flag in Snew();
815 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
816 
817 void
PL_cleanup_fork(void)818 PL_cleanup_fork(void)
819 { will_exec = TRUE;
820   stopItimer();
821 }
822 
823 
824 void
initPrologThreads(void)825 initPrologThreads(void)
826 { PL_thread_info_t *info;
827   static int init_ldata_key = FALSE;
828 
829   initAlloc();
830 
831 #if defined(USE_CRITICAL_SECTIONS) && !defined(O_SHARED_KERNEL)
832   W32initMutexes();		/* see also DllMain() */
833 #endif
834 
835 #ifdef PTW32_STATIC_LIB
836   win_thread_initialize();
837 #endif
838 
839   PL_LOCK(L_THREAD);
840   if ( threads_ready )
841   { PL_UNLOCK(L_THREAD);
842     return;
843   }
844 
845 #ifdef O_QUEUE_STATS
846   atexit(msg_statistics);
847 #endif
848 
849   if ( !init_ldata_key )
850   { init_ldata_key = TRUE;
851 #if !(defined(USE_CRITICAL_SECTIONS) && defined(O_SHARED_KERNEL))
852 #ifndef HAVE___THREAD
853     TLD_alloc(&PL_ldata);		/* see also alloc_thread() */
854 #endif
855 #endif
856   }
857   TLD_set_LD(&PL_local_data);
858 
859   PL_local_data.magic = LD_MAGIC;
860   { GD->thread.thread_max = 4;		/* see resizeThreadMax() */
861     GD->thread.highest_allocated = 1;
862     GD->thread.threads = allocHeapOrHalt(GD->thread.thread_max *
863 					 sizeof(*GD->thread.threads));
864     memset(GD->thread.threads, 0,
865 	   GD->thread.thread_max * sizeof(*GD->thread.threads));
866     info = GD->thread.threads[1] = allocHeapOrHalt(sizeof(*info));
867     memset(info, 0, sizeof(*info));
868     info->pl_tid = 1;
869     info->debug = TRUE;
870     GD->thread.highest_id = 1;
871     info->thread_data = &PL_local_data;
872     info->status = PL_THREAD_RUNNING;
873     PL_local_data.thread.info = info;
874     PL_local_data.thread.magic = PL_THREAD_MAGIC;
875     set_system_thread_id(info);
876     init_message_queue(&PL_local_data.thread.messages, 0);
877     init_predicate_references(&PL_local_data);
878 
879     GD->statistics.thread_cputime = 0.0;
880     GD->statistics.threads_created = 1;
881     pthread_mutex_init(&GD->thread.index.mutex, NULL);
882     pthread_cond_init(&GD->thread.index.cond, NULL);
883     initMutexes();
884     link_mutexes();
885     threads_ready = TRUE;
886   }
887 
888   pthread_atfork(NULL, NULL, reinit_threads_after_fork);
889   initMessageQueues();
890 
891   PL_UNLOCK(L_THREAD);
892 }
893 
894 
895 void
cleanupThreads(void)896 cleanupThreads(void)
897 { int i;
898   /*TLD_free(PL_ldata);*/		/* this causes crashes */
899 
900   if ( queueTable )
901   { destroyHTable(queueTable);		/* removes shared queues */
902     queueTable = NULL;
903     simpleMutexDelete(&queueTable_mutex);
904   }
905   if ( GD->thread.mutexTable )
906   { destroyHTable(GD->thread.mutexTable);
907     GD->thread.mutexTable = NULL;
908   }
909   if ( threadTable )
910   { destroyHTable(threadTable);
911     threadTable = NULL;
912   }
913   for(i=1; i<GD->thread.thread_max; i++)
914   { PL_thread_info_t *info = GD->thread.threads[i];
915 
916     if ( info )
917       freeHeap(info, sizeof(*info));
918   }
919   freeHeap(GD->thread.threads,
920 	   GD->thread.thread_max * sizeof(*GD->thread.threads));
921   GD->thread.threads = NULL;
922   threads_ready = FALSE;
923 }
924 
925 
926 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
927 A first step towards clean destruction of the system.  Ideally, we would
928 like the following to happen:
929 
930     * Close-down all threads except for the main one
931 	+ Have all thread exit hooks called
932     * Run the at_halt/1 hooks in the main thread
933     * Exit from the main thread.
934 
935 There are a lot of problems however.
936 
937     * Somehow Halt() should always be called from the main thread
938       to have the process working properly.
939 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
940 
941 int
exitPrologThreads(void)942 exitPrologThreads(void)
943 { int rc;
944   int i;
945   int me = PL_thread_self();
946   int canceled = 0;
947 
948   DEBUG(MSG_THREAD, Sdprintf("exitPrologThreads(): me = %d\n", me));
949 
950   sem_init(sem_canceled_ptr, USYNC_THREAD, 0);
951 
952   for(i=1; i<= GD->thread.highest_id; i++)
953   { PL_thread_info_t *info = GD->thread.threads[i];
954 
955     if ( info && info->thread_data && i != me )
956     { switch(info->status)
957       { case PL_THREAD_FAILED:
958 	case PL_THREAD_EXITED:
959 	case PL_THREAD_EXCEPTION:
960 	{ void *r;
961 	  int rc;
962 
963 	  if ( (rc=pthread_join(info->tid, &r)) )
964 	    Sdprintf("Failed to join thread %d: %s\n", i, ThError(rc));
965 
966 	  break;
967 	}
968 	case PL_THREAD_RUNNING:
969 	{ if ( !info->is_engine )
970 	  { info->thread_data->exit_requested = EXIT_REQ_PROCESS;
971 
972 	    if ( info->cancel )
973 	    { switch( (*info->cancel)(i) )
974 	      { case PL_THREAD_CANCEL_FAILED:
975 		  break;
976 	        case PL_THREAD_CANCEL_MUST_JOIN:
977 		  canceled++;
978 		  /*FALLTHROUGH*/
979 	        case PL_THREAD_CANCEL_JOINED:
980 		  continue;
981 	      }
982 	    }
983 
984 	    if ( PL_thread_raise(i, SIG_PLABORT) )
985 	      canceled++;
986 	  }
987 
988 	  break;
989 	}
990 	default:
991 	  break;
992       }
993     }
994   }
995 
996   if ( canceled > 0 )
997   { int maxwait = 10;
998 
999     DEBUG(MSG_CLEANUP_THREAD, Sdprintf("Waiting for %d threads ", canceled));
1000     for(maxwait = 10; maxwait > 0 && canceled > 0; maxwait--)
1001     { while ( sem_trywait(sem_canceled_ptr) == 0 )
1002       { DEBUG(MSG_CLEANUP_THREAD, Sdprintf("."));
1003 	canceled--;
1004       }
1005       if ( canceled > 0 )
1006       { DEBUG(MSG_CLEANUP_THREAD, Sdprintf("W"));
1007 	Pause(0.1);
1008       }
1009     }
1010     DEBUG(MSG_CLEANUP_THREAD, Sdprintf("\nLeft: %d threads\n", canceled));
1011   }
1012 
1013   if ( canceled )
1014   { GET_LD
1015     fid_t fid;
1016 
1017     if ( (fid = PL_open_foreign_frame()) )
1018     { term_t head    = PL_new_term_ref();
1019       term_t running = PL_new_term_ref();
1020       term_t tail    = PL_copy_term_ref(running);
1021 
1022       rc = TRUE;
1023       for(i = 1; i <= GD->thread.highest_id; i++)
1024       { PL_thread_info_t *info = GD->thread.threads[i];
1025 
1026 	if ( info && info->thread_data && i != me )
1027 	{ if ( info->status == PL_THREAD_RUNNING )
1028 	  { if ( !PL_unify_list(tail, head, tail) ||
1029 		 !unify_thread_id(head, info) )
1030 	    { rc = FALSE;
1031 	      break;
1032 	    }
1033 	  }
1034 	}
1035       }
1036 
1037       if ( rc )
1038       { rc = ( PL_unify_nil(tail) &&
1039 	       printMessage(ATOM_informational,
1040 			    PL_FUNCTOR_CHARS, "threads_not_died", 1,
1041 			      PL_TERM, running)
1042 	     );
1043       }
1044     } else
1045     { rc = FALSE;
1046     }
1047 
1048     if ( !rc )
1049       Sdprintf("%d threads wouldn't die\n", canceled);
1050     rc = FALSE;
1051   } else
1052   { DEBUG(MSG_THREAD, Sdprintf("done\n"));
1053 #ifndef WIN64			/* FIXME: Hangs if nothing is printed */
1054     sem_destroy(sem_canceled_ptr);
1055 #endif
1056     rc = TRUE;
1057   }
1058 
1059   threads_ready = FALSE;
1060   return rc;
1061 }
1062 
1063 
1064 		 /*******************************
1065 		 *	    ALIAS NAME		*
1066 		 *******************************/
1067 
1068 int
aliasThread(int tid,atom_t type,atom_t name)1069 aliasThread(int tid, atom_t type, atom_t name)
1070 { GET_LD
1071   PL_thread_info_t *info;
1072   thread_handle *th;
1073   int rc = TRUE;
1074 
1075   PL_LOCK(L_THREAD);
1076   if ( !threadTable )
1077     threadTable = newHTable(16);
1078 
1079   if ( (threadTable && lookupHTable(threadTable, (void *)name)) ||
1080        (queueTable  && lookupHTable(queueTable,  (void *)name)) )
1081   { term_t obj = PL_new_term_ref();
1082 
1083     PL_UNLOCK(L_THREAD);
1084     PL_put_atom(obj, name);
1085     return PL_error(NULL, 0, "Alias name already taken",
1086 		    ERR_PERMISSION, ATOM_create, type, obj);
1087   }
1088 
1089   info = GD->thread.threads[tid];
1090   if ( (th = create_thread_handle(info)) )
1091   { th->alias = name;
1092     PL_register_atom(name);
1093     PL_register_atom(info->symbol);
1094     addNewHTable(threadTable, (void *)name, (void *)info->symbol);
1095   } else
1096   { rc = PL_no_memory();
1097   }
1098   PL_UNLOCK(L_THREAD);
1099 
1100   return rc;
1101 }
1102 
1103 
1104 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1105 PL_get_thread_alias() gets the alias name of   a  thread. It is intended
1106 for crash-analysis. Normal applications should use PL_unify_thread_id().
1107 
1108 The implementation is wrong for two reasons. It should register the atom
1109 and this implementation should lock L_THREAD  because otherwise the atom
1110 may be gone even before it is   locked. However, locking is not unlikely
1111 to deadlock during crash analysis ...
1112 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1113 
1114 int
PL_get_thread_alias(int tid,atom_t * alias)1115 PL_get_thread_alias(int tid, atom_t *alias)
1116 { PL_thread_info_t *info;
1117   thread_handle *th;
1118 
1119   if ( tid == 0 )
1120     tid = PL_thread_self();
1121   if ( tid < 1 || tid > GD->thread.highest_id )
1122     return FALSE;
1123 
1124   info = GD->thread.threads[tid];
1125   if ( info->symbol &&
1126        (th=symbol_thread_handle(info->symbol)) &&
1127        th->alias )
1128   { *alias = th->alias;
1129 
1130     return TRUE;
1131   }
1132 
1133   return FALSE;
1134 }
1135 
1136 
1137 		 /*******************************
1138 		 *	     GC ENGINES		*
1139 		 *******************************/
1140 
1141 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1142 gc_thread() is (indirectly) called from atom-GC if the engine is not yet
1143 fully reclaimed, we have several situations:
1144 
1145   - The engine is still running.  Detach it, such that it will be
1146     reclaimed silently when done.
1147   - The engine has completed.  Join it.
1148 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1149 
1150 static thread_handle *gced_threads = NULL;
1151 static int       thread_gc_running = FALSE;
1152 
1153 static void
discard_thread(thread_handle * h)1154 discard_thread(thread_handle *h)
1155 { int alive = FALSE;
1156   PL_thread_info_t *info;
1157 
1158   if ( true(h, TH_IS_INTERACTOR) )
1159   { destroy_interactor(h, TRUE);		/* cannot be running */
1160     return;
1161   }
1162 
1163   PL_LOCK(L_THREAD);
1164   if ( (info=h->info) &&
1165        (alive=is_alive(info->status)) &&
1166        !info->detached )
1167   { if ( info->has_tid )
1168     { if ( pthread_detach(info->tid) == 0 )
1169 	info->detached = TRUE;
1170     }
1171   }
1172   PL_UNLOCK(L_THREAD);
1173 
1174   if ( !alive )
1175   { double delay = 0.0001;
1176 
1177     if ( !info->detached )
1178     { void *r;
1179 
1180       while( pthread_join(info->tid, &r) == EINTR )
1181 	;
1182     }
1183 
1184     while ( info->thread_data )
1185     { Pause(delay);
1186       if ( delay < 0.01 )
1187 	delay *= 2;
1188     }
1189     free_thread_info(info);
1190   }
1191 }
1192 
1193 
1194 static void *
thread_gc_loop(void * closure)1195 thread_gc_loop(void *closure)
1196 {
1197 #ifdef HAVE_SIGPROCMASK
1198   sigset_t set;
1199   allSignalMask(&set);
1200   pthread_sigmask(SIG_BLOCK, &set, NULL);
1201 #endif
1202 
1203   for(;;)
1204   { thread_handle *h;
1205 
1206     do
1207     { h = gced_threads;
1208     } while ( h && !COMPARE_AND_SWAP_PTR(&gced_threads, h, h->next_free) );
1209 
1210     if ( h )
1211     { if ( GD->cleaning == CLN_NORMAL )
1212 	discard_thread(h);
1213       PL_free(h);
1214     } else
1215     { break;
1216     }
1217   }
1218 
1219   thread_gc_running = FALSE;
1220   return NULL;
1221 }
1222 
1223 
1224 static void
start_thread_gc_thread(void)1225 start_thread_gc_thread(void)
1226 { if ( !thread_gc_running )
1227   { pthread_attr_t attr;
1228     int rc;
1229     pthread_t thr;
1230 
1231     pthread_attr_init(&attr);
1232     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1233     thread_gc_running = TRUE;
1234     rc = pthread_create(&thr, &attr, thread_gc_loop, NULL);
1235     pthread_attr_destroy(&attr);
1236     if ( rc != 0 )
1237     { Sdprintf("Failed to start thread gc thread\n");
1238       thread_gc_running = FALSE;
1239     }
1240   }
1241 }
1242 
1243 
1244 static void
gc_thread(thread_handle * ref)1245 gc_thread(thread_handle *ref)
1246 { thread_handle *h;
1247 
1248   do
1249   { h = gced_threads;
1250     ref->next_free = h;
1251   } while( !COMPARE_AND_SWAP_PTR(&gced_threads, h, ref) );
1252 
1253   start_thread_gc_thread();
1254 }
1255 
1256 
1257 		 /*******************************
1258 		 *	   THREAD SYMBOL	*
1259 		 *******************************/
1260 
1261 static int
write_thread_handle(IOSTREAM * s,atom_t eref,int flags)1262 write_thread_handle(IOSTREAM *s, atom_t eref, int flags)
1263 { thread_handle **refp = PL_blob_data(eref, NULL, NULL);
1264   thread_handle *ref = *refp;
1265   (void)flags;
1266 
1267   Sfprintf(s, "<%s>(%d,%p)",
1268 	   true(ref, TH_IS_INTERACTOR) ? "engine" : "thread",
1269 	   ref->engine_id, ref);
1270   return TRUE;
1271 }
1272 
1273 
1274 static int
release_thread_handle(atom_t aref)1275 release_thread_handle(atom_t aref)
1276 { thread_handle **refp = PL_blob_data(aref, NULL, NULL);
1277   thread_handle *ref   = *refp;
1278   PL_thread_info_t *info;
1279 
1280   if ( (info=ref->info) )
1281   { /* assert(info->detached == FALSE || info->is_engine); TBD: Sort out */
1282     info->symbol = 0;				/* TBD: Dubious */
1283     gc_thread(ref);
1284   } else
1285     PL_free(ref);
1286 
1287   return TRUE;
1288 }
1289 
1290 
1291 static int
save_thread(atom_t aref,IOSTREAM * fd)1292 save_thread(atom_t aref, IOSTREAM *fd)
1293 { thread_handle **refp = PL_blob_data(aref, NULL, NULL);
1294   thread_handle *ref   = *refp;
1295   (void)fd;
1296 
1297   return PL_warning("Cannot save reference to <%s>(%d,%p)",
1298 		    true(ref, TH_IS_INTERACTOR) ? "engine" : "thread",
1299 		    ref->engine_id, ref);
1300 }
1301 
1302 
1303 static atom_t
load_thread(IOSTREAM * fd)1304 load_thread(IOSTREAM *fd)
1305 { (void)fd;
1306 
1307   return PL_new_atom("<saved-thread-handle>");
1308 }
1309 
1310 
1311 static PL_blob_t thread_blob =
1312 { PL_BLOB_MAGIC,
1313   PL_BLOB_UNIQUE,
1314   "thread",
1315   release_thread_handle,
1316   NULL,
1317   write_thread_handle,
1318   NULL,
1319   save_thread,
1320   load_thread
1321 };
1322 
1323 
1324 		 /*******************************
1325 		 *	 PROLOG BINDING		*
1326 		 *******************************/
1327 
1328 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1329 Resizing max-threads. Note that we do *not* deallocate the old structure
1330 to ensure we can  access  GD->thread.threads[i]   at  any  time  without
1331 locking.
1332 
1333 TBD: remember the old ones, such that PL_cleanup() can remove them.
1334 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1335 
1336 static int
resizeThreadMax(void)1337 resizeThreadMax(void)
1338 { int newmax = GD->thread.thread_max*2;
1339   PL_thread_info_t **newinfo, **oldinfo;
1340   size_t dsize = GD->thread.thread_max * sizeof(*GD->thread.threads);
1341 
1342   oldinfo = GD->thread.threads;
1343   newinfo = allocHeapOrHalt(newmax * sizeof(*GD->thread.threads));
1344   memset(addPointer(newinfo,dsize), 0, dsize);
1345   memcpy(newinfo, oldinfo, dsize);
1346   GD->thread.threads = newinfo;
1347   GD->thread.thread_max = newmax;
1348   GC_LINGER(oldinfo);
1349 
1350   return TRUE;
1351 }
1352 
1353 
1354 /* MT: thread-safe */
1355 
1356 static PL_thread_info_t *
alloc_thread(void)1357 alloc_thread(void)
1358 { PL_thread_info_t *info;
1359   PL_local_data_t *ld;
1360 
1361   ld = allocHeapOrHalt(sizeof(PL_local_data_t));
1362   memset(ld, 0, sizeof(PL_local_data_t));
1363 
1364   do
1365   { info = GD->thread.free;
1366   } while ( info && !COMPARE_AND_SWAP_PTR(&GD->thread.free, info, info->next_free) );
1367 
1368   if ( info )
1369   { int i = info->pl_tid;
1370     assert(info->status == PL_THREAD_UNUSED);
1371     memset(info, 0, sizeof(*info));
1372     info->pl_tid = i;
1373     PL_LOCK(L_THREAD);
1374   } else
1375   { int i;
1376 
1377     info = allocHeapOrHalt(sizeof(*info));
1378     memset(info, 0, sizeof(*info));
1379 
1380     PL_LOCK(L_THREAD);
1381     i = info->pl_tid = ++GD->thread.highest_allocated;
1382     if ( i == GD->thread.thread_max )
1383       resizeThreadMax();
1384     if ( i > GD->thread.peak_id )
1385       GD->thread.peak_id = i;
1386 
1387     assert(GD->thread.threads[i] == NULL);
1388     GD->thread.threads[i] = info;
1389   }
1390 
1391   ld->thread.info = info;
1392   ld->thread.magic = PL_THREAD_MAGIC;
1393   info->thread_data = ld;
1394   info->status = PL_THREAD_RESERVED;
1395   info->debug = TRUE;
1396 
1397   if ( info->pl_tid > GD->thread.highest_id )
1398     GD->thread.highest_id = info->pl_tid;
1399   PL_UNLOCK(L_THREAD);
1400 
1401   ATOMIC_INC(&GD->statistics.threads_created);
1402 
1403   return info;
1404 }
1405 
1406 
1407 int
PL_thread_self(void)1408 PL_thread_self(void)
1409 { GET_LD
1410   PL_local_data_t *ld = LD;
1411   PL_thread_info_t *info;
1412 
1413   if ( ld && (info=ld->thread.info) )
1414     return info->pl_tid;
1415 
1416   return -1;				/* thread has no Prolog thread */
1417 }
1418 
1419 
1420 int
PL_unify_thread_id(term_t t,int i)1421 PL_unify_thread_id(term_t t, int i)
1422 { if ( i < 1 ||
1423        i > GD->thread.highest_id ||
1424        GD->thread.threads[i]->status == PL_THREAD_UNUSED ||
1425        GD->thread.threads[i]->status == PL_THREAD_RESERVED )
1426     return -1;				/* error */
1427 
1428   return unify_thread_id(t, GD->thread.threads[i]);
1429 }
1430 
1431 
1432 int
PL_get_thread_id_ex(term_t t,int * idp)1433 PL_get_thread_id_ex(term_t t, int *idp)
1434 { PL_thread_info_t *info;
1435 
1436   if ( !get_thread(t, &info, TRUE) )
1437     return FALSE;
1438 
1439   *idp = info->pl_tid;
1440 
1441   return TRUE;
1442 }
1443 
1444 
1445 
1446 #ifdef __WINDOWS__
1447 
1448 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1449 PL_w32thread_raise(DWORD id, int sig)
1450     Sets the signalled mask for a specific Win32 thread. This is a
1451     partial work-around for the lack of proper asynchronous signal
1452     handling in the Win32 platform.
1453 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1454 
1455 int
PL_w32thread_raise(DWORD id,int sig)1456 PL_w32thread_raise(DWORD id, int sig)
1457 { int i;
1458 
1459   if ( sig < 0 || sig > MAXSIGNAL )
1460     return FALSE;			/* illegal signal */
1461 
1462   PL_LOCK(L_THREAD);
1463   for(i = 1; i <= GD->thread.highest_id; i++)
1464   { PL_thread_info_t *info = GD->thread.threads[i];
1465 
1466     if ( info && info->w32id == id && info->thread_data )
1467     { raiseSignal(info->thread_data, sig);
1468       if ( info->w32id )
1469 	PostThreadMessage(info->w32id, WM_SIGNALLED, 0, 0L);
1470       PL_UNLOCK(L_THREAD);
1471       DEBUG(MSG_THREAD, Sdprintf("Signalled %d to thread %d\n", sig, i));
1472       return TRUE;
1473     }
1474   }
1475   PL_UNLOCK(L_THREAD);
1476 
1477   return FALSE;				/* can't find thread */
1478 }
1479 
1480 #endif /*__WINDOWS__*/
1481 
1482 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1483 Make a thread stop processing blocking operations such that it can check
1484 whether it is signalled. Returns  TRUE   when  successful,  FALSE if the
1485 thread no longer exists and -1 if alerting is disabled.
1486 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1487 
1488 static int
alertThread(PL_thread_info_t * info)1489 alertThread(PL_thread_info_t *info)
1490 { PL_local_data_t *ld = info->thread_data;
1491 
1492   if ( ld->thread.alert.type )
1493   { int done = FALSE;
1494 
1495     PL_LOCK(L_ALERT);
1496     switch(ld->thread.alert.type)
1497     { case ALERT_QUEUE_RD:
1498         cv_broadcast(&ld->thread.alert.obj.queue->cond_var);
1499 	done = TRUE;
1500 	break;
1501       case ALERT_QUEUE_WR:
1502         cv_broadcast(&ld->thread.alert.obj.queue->drain_var);
1503 	done = TRUE;
1504 	break;
1505     }
1506     PL_UNLOCK(L_ALERT);
1507     if ( done )
1508       return TRUE;
1509   }
1510 
1511 #ifdef __WINDOWS__
1512   if ( info->w32id )
1513   { PostThreadMessage(info->w32id, WM_SIGNALLED, 0, 0L);
1514     return TRUE;			/* NOTE: PostThreadMessage() can */
1515 					/* fail if thread is being created */
1516   }
1517 #else
1518   if ( info->has_tid && GD->signals.sig_alert )
1519     return pthread_kill(info->tid, GD->signals.sig_alert) == 0;
1520 #endif
1521   return -1;
1522 }
1523 
1524 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1525 PL_thread_raise() raises a  synchronous  signal   in  (usually)  another
1526 thread.
1527 
1528 (*) HACK. We should not lock L_THREAD in this function but we do need to
1529 prevent ld from dropping while we process   it.  Possibly we should move
1530 the signal mask to the info structure? This patch is badly needed as the
1531 system now crashes on any  alrm   from  library(time). We need something
1532 better though.
1533 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1534 
1535 int
PL_thread_raise(int tid,int sig)1536 PL_thread_raise(int tid, int sig)
1537 { if ( tid >= 1 && tid <= GD->thread.highest_id )
1538   { PL_thread_info_t *info = GD->thread.threads[tid];
1539 
1540     if ( info &&
1541 	 info->status != PL_THREAD_UNUSED &&
1542 	 info->status != PL_THREAD_RESERVED )
1543     { GET_LD
1544       PL_local_data_t *ld;
1545       int rc;
1546 
1547       if ( LD )					/* See (*) */
1548 	ld = acquire_ldata(info);
1549       else
1550 	ld = info->thread_data;
1551 
1552       rc = ( ld &&
1553 	     ld->magic == LD_MAGIC &&
1554 	     raiseSignal(ld, sig) &&
1555 	     (alertThread(info) != FALSE) );
1556 
1557       if ( LD )
1558 	release_ldata(ld);
1559 
1560       return rc;
1561     } else
1562     { return FALSE;
1563     }
1564   }
1565 
1566   return FALSE;
1567 }
1568 
1569 
1570 int
thread_wait_signal(ARG1_LD)1571 thread_wait_signal(ARG1_LD)
1572 { int i;
1573 
1574   while( !is_signalled(PASS_LD1) )
1575   {
1576 #ifdef __WINDOWS__
1577     MSG msg;
1578     if ( !GetMessage(&msg, (HWND)-1, WM_SIGNALLED, WM_SIGNALLED) )
1579       return -1;
1580 #else
1581     sigset_t set;
1582     int sig;
1583 
1584     sigemptyset(&set);
1585     sigaddset(&set, GD->signals.sig_alert);
1586     sigwait(&set, &sig);
1587 #endif
1588   }
1589 
1590   for(i=0; i<2; i++)
1591   { while( LD->signal.pending[i] )
1592     { int sig = 1+32*i;
1593       int mask = 1;
1594 
1595       for( ; mask ; mask <<= 1, sig++ )
1596       { if ( LD->signal.pending[i] & mask )
1597 	{ __atomic_and_fetch(&LD->signal.pending[i], ~mask, __ATOMIC_SEQ_CST);
1598 
1599 	  if ( sig == SIG_THREAD_SIGNAL )
1600 	  { dispatch_signal(sig, TRUE);
1601 	    if ( exception_term )
1602 	      return -1;
1603 	  } else
1604 	  { return sig;
1605 	  }
1606 	}
1607       }
1608     }
1609   }
1610 
1611   return -1;					/* cannot happen */
1612 }
1613 
1614 static
1615 PRED_IMPL("$thread_sigwait", 1, thread_sigwait, 0)
1616 { PRED_LD
1617   int sig;
1618 
1619   if ( (sig = thread_wait_signal(PASS_LD1)) >= 0 )
1620     return PL_unify_atom_chars(A1, signal_name(sig));
1621 
1622   return FALSE;
1623 }
1624 
1625 
1626 
1627 const char *
threadName(int id)1628 threadName(int id)
1629 { PL_thread_info_t *info;
1630   thread_handle *th;
1631   char tmp[16];
1632 
1633   if ( id == 0 )
1634     id = PL_thread_self();
1635   if ( id < 0 )
1636     return "[Not a prolog thread]";
1637 
1638   info = GD->thread.threads[id];
1639   if ( info->symbol &&
1640        (th=symbol_thread_handle(info->symbol)) &&
1641        th->alias )
1642     return PL_atom_chars(th->alias);
1643 
1644   sprintf(tmp, "%d", id);
1645   return buffer_string(tmp, BUF_STACK);
1646 }
1647 
1648 
1649 intptr_t
system_thread_id(PL_thread_info_t * info)1650 system_thread_id(PL_thread_info_t *info)
1651 { if ( !info )
1652   { GET_LD
1653     if ( LD )
1654       info = LD->thread.info;
1655     else
1656       return -1;
1657   }
1658 #ifdef PID_IDENTIFIES_THREAD
1659   return info->pid;
1660 #else
1661 #ifdef __WINDOWS__
1662   return info->w32id;
1663 #else
1664   return (intptr_t)info->tid;
1665 #endif
1666 #endif
1667 }
1668 
1669 static void
set_system_thread_id(PL_thread_info_t * info)1670 set_system_thread_id(PL_thread_info_t *info)
1671 {
1672   info->tid = pthread_self();
1673   info->has_tid = TRUE;
1674 #if defined(HAVE_GETTID_SYSCALL)
1675   info->pid = syscall(__NR_gettid);
1676 #elif defined(HAVE_GETTID_MACRO)
1677   info->pid = gettid();
1678 #elif defined(__CYGWIN__)
1679   info->pid = getpid();
1680 #elif defined(__WINDOWS__)
1681   info->w32id = GetCurrentThreadId();
1682 #endif
1683 }
1684 
1685 
1686 static int
set_os_thread_name_from_charp(const char * s)1687 set_os_thread_name_from_charp(const char *s)
1688 {
1689 #ifdef HAVE_PTHREAD_SETNAME_NP
1690   char name[16];
1691 
1692   strncpy(name, s, 15);
1693   name[15] = EOS;
1694 
1695 #ifdef HAVE_PTHREAD_SETNAME_NP_WITH_TID
1696   if ( pthread_setname_np(pthread_self(), name) == 0 )
1697     return TRUE;
1698 #elif HAVE_PTHREAD_SETNAME_NP_WITH_TID_AND_ARG
1699   if ( pthread_setname_np(pthread_self(), "%s", (void *)name) == 0 )
1700     return TRUE;
1701 #else
1702   if ( pthread_setname_np(name) == 0 )
1703     return TRUE;
1704 #endif
1705 #endif
1706   return FALSE;
1707 }
1708 
1709 
1710 static int
set_os_thread_name(atom_t alias)1711 set_os_thread_name(atom_t alias)
1712 {
1713 #ifdef HAVE_PTHREAD_SETNAME_NP
1714   GET_LD
1715   term_t t = PL_new_term_ref();
1716   PL_put_atom(t, alias);
1717   char *s;
1718 
1719   if ( PL_get_chars(t, &s, CVT_ATOM|REP_MB|BUF_DISCARDABLE) )
1720     return set_os_thread_name_from_charp(s);
1721 #endif
1722   return FALSE;
1723 }
1724 
1725 
1726 static const opt_spec make_thread_options[] =
1727 { { ATOM_alias,		 OPT_ATOM },
1728   { ATOM_debug,		 OPT_BOOL },
1729   { ATOM_detached,	 OPT_BOOL },
1730   { ATOM_stack_limit,	 OPT_SIZE },
1731   { ATOM_c_stack,	 OPT_SIZE },
1732   { ATOM_at_exit,	 OPT_TERM },
1733   { ATOM_inherit_from,	 OPT_TERM },
1734   { ATOM_affinity,	 OPT_TERM },
1735   { ATOM_queue_max_size, OPT_SIZE },
1736   { NULL_ATOM,		 0 }
1737 };
1738 
1739 
1740 static void
set_thread_completion(PL_thread_info_t * info,int rc,term_t ex)1741 set_thread_completion(PL_thread_info_t *info, int rc, term_t ex)
1742 { PL_LOCK(L_THREAD);
1743   if ( rc )
1744   { info->status = PL_THREAD_SUCCEEDED;
1745   } else
1746   { if ( ex )
1747     { if ( info->detached )
1748 	info->return_value = 0;
1749       else
1750 	info->return_value = PL_record(ex);
1751       info->status = PL_THREAD_EXCEPTION;
1752     } else
1753     { info->status = PL_THREAD_FAILED;
1754     }
1755   }
1756   PL_UNLOCK(L_THREAD);
1757 }
1758 
1759 
1760 static void *
start_thread(void * closure)1761 start_thread(void *closure)
1762 { PL_thread_info_t *info = closure;
1763   thread_handle *th;
1764   term_t ex, goal;
1765   int rval;
1766 
1767   assert(info->goal);
1768   blockSignal(SIGINT);			/* only the main thread processes */
1769 					/* Control-C */
1770   set_system_thread_id(info);		/* early to get exit code ok */
1771 
1772   if ( !initialise_thread(info) )
1773     return (void *)FALSE;
1774 
1775   { GET_LD
1776 
1777     pthread_cleanup_push(free_prolog_thread, info->thread_data);
1778 
1779     PL_LOCK(L_THREAD);
1780     info->status = PL_THREAD_RUNNING;
1781     PL_UNLOCK(L_THREAD);
1782 
1783     if ( info->symbol &&
1784 	 (th=symbol_thread_handle(info->symbol)) &&
1785 	 th->alias )
1786       set_os_thread_name(th->alias);
1787 
1788     goal = PL_new_term_ref();
1789     PL_put_atom(goal, ATOM_dthread_init);
1790 
1791     rval = callProlog(MODULE_system, goal, PL_Q_CATCH_EXCEPTION, &ex);
1792 
1793     if ( rval )
1794     { if ( !PL_recorded(info->goal, goal) )
1795       { rval = raiseStackOverflow(GLOBAL_OVERFLOW);
1796 	ex = exception_term;
1797       } else
1798       { rval  = callProlog(info->module, goal, PL_Q_CATCH_EXCEPTION, &ex);
1799       }
1800     }
1801 
1802     if ( !rval && info->detached )
1803     { if ( ex )
1804       { int print = TRUE;
1805 
1806 	if ( LD->exit_requested )
1807 	{ if ( classify_exception(ex) == EXCEPT_ABORT )
1808 	    print = FALSE;
1809 	}
1810 
1811 	if ( print )
1812 	{ if ( !printMessage(ATOM_warning,
1813 			     PL_FUNCTOR_CHARS, "abnormal_thread_completion", 2,
1814 			       PL_TERM, goal,
1815 			       PL_FUNCTOR, FUNCTOR_exception1,
1816 			       PL_TERM, ex) )
1817 	    PL_clear_exception();	/* The thread is dead anyway */
1818 	}
1819       } else
1820       { if ( !printMessage(ATOM_warning,
1821 			   PL_FUNCTOR_CHARS, "abnormal_thread_completion", 2,
1822 			     PL_TERM, goal,
1823 			     PL_ATOM, ATOM_fail) )
1824 	  PL_clear_exception();		/* The thread is dead anyway */
1825       }
1826     }
1827 
1828     set_thread_completion(info, rval, ex);
1829     pthread_cleanup_pop(1);
1830   }
1831 
1832   return (void *)TRUE;
1833 }
1834 
1835 
1836 static void
copy_local_data(PL_local_data_t * ldnew,PL_local_data_t * ldold,size_t max_queue_size)1837 copy_local_data(PL_local_data_t *ldnew, PL_local_data_t *ldold,
1838 		size_t max_queue_size)
1839 { GET_LD
1840 
1841   if ( !LD )
1842     TLD_set_LD(ldnew);
1843 
1844   PL_register_atom(ldold->prompt.current);
1845   ldnew->prompt			  = ldold->prompt;
1846   if ( ldold->prompt.first )
1847   { ldnew->prompt.first		  = ldold->prompt.first;
1848     PL_register_atom(ldnew->prompt.first);
1849   }
1850   ldnew->modules		  = ldold->modules;
1851   ldnew->IO			  = ldold->IO;
1852   ldnew->IO.input_stack		  = NULL;
1853   ldnew->IO.output_stack	  = NULL;
1854   ldnew->encoding		  = ldold->encoding;
1855 #ifdef O_LOCALE
1856   ldnew->locale.current		  = acquireLocale(ldold->locale.current);
1857 #endif
1858   ldnew->_debugstatus		  = ldold->_debugstatus;
1859   ldnew->_debugstatus.retryFrame  = 0;
1860   ldnew->_debugstatus.suspendTrace= 0;
1861   if ( ldold->_debugstatus.skiplevel != SKIP_VERY_DEEP )
1862   { ldnew->_debugstatus.debugging = DBG_OFF;
1863     ldnew->_debugstatus.tracing = FALSE;
1864     ldnew->_debugstatus.skiplevel = SKIP_VERY_DEEP;
1865   }
1866 
1867   alloc_pool *pool = ldold->tabling.node_pool;
1868   if ( pool )
1869     ldnew->tabling.node_pool = new_alloc_pool(pool->name, pool->limit);
1870   ldnew->fli.string_buffers.tripwire
1871 				  = ldold->fli.string_buffers.tripwire;
1872   ldnew->statistics.start_time    = WallTime();
1873   ldnew->prolog_flag.mask	  = ldold->prolog_flag.mask;
1874   ldnew->prolog_flag.occurs_check = ldold->prolog_flag.occurs_check;
1875   ldnew->prolog_flag.access_level = ldold->prolog_flag.access_level;
1876 #ifdef O_GMP
1877   ldnew->arith.rat                = ldold->arith.rat;
1878 #endif
1879   ldnew->arith.f                  = ldold->arith.f;
1880   if ( ldold->prolog_flag.table )
1881   { PL_LOCK(L_PLFLAG);
1882     ldnew->prolog_flag.table	  = copyHTable(ldold->prolog_flag.table);
1883     PL_UNLOCK(L_PLFLAG);
1884   }
1885   ldnew->tabling.restraint        = ldold->tabling.restraint;
1886   if ( !ldnew->thread.info->debug )
1887   { ldnew->_debugstatus.tracing   = FALSE;
1888     ldnew->_debugstatus.debugging = DBG_OFF;
1889     set(&ldnew->prolog_flag.mask, PLFLAG_LASTCALL);
1890   }
1891   init_message_queue(&ldnew->thread.messages, max_queue_size);
1892   init_predicate_references(ldnew);
1893 }
1894 
1895 
1896 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1897 The pthread stacksize must be  a  multiple   of  the  page  size on some
1898 systems. We do the rounding here. If we do not know the page size we use
1899 8192, which should typically be a multiple of the page size.
1900 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
1901 
1902 static size_t
round_pages(size_t n)1903 round_pages(size_t n)
1904 { size_t psize;
1905 
1906 #if defined(HAVE_SYSCONF) && defined(_SC_PAGESIZE)
1907   if ( (psize = sysconf(_SC_PAGESIZE)) == (size_t)-1 )
1908     psize = 8192;
1909 #else
1910   psize = 8192;
1911 #endif
1912 
1913   return ROUND(n, psize);
1914 }
1915 
1916 
1917 #if defined(HAVE_PTHREAD_ATTR_SETAFFINITY_NP) || defined(HAVE_SCHED_SETAFFINITY)
1918 
1919 static int
get_cpuset(term_t affinity,cpu_set_t * set)1920 get_cpuset(term_t affinity, cpu_set_t *set)
1921 { GET_LD
1922   term_t head, tail;
1923   int n=0;
1924   int cpu_count = CpuCount();
1925 
1926   if ( !(tail = PL_copy_term_ref(affinity)) ||
1927        !(head = PL_new_term_ref()) )
1928     return FALSE;
1929 
1930   CPU_ZERO(set);
1931   while(PL_get_list_ex(tail, head, tail))
1932   { int i;
1933 
1934     if ( !PL_get_integer_ex(head, &i) )
1935       return FALSE;
1936     if ( i < 0 )
1937       return PL_domain_error("not_less_than_zero", head);
1938     if ( i >= cpu_count )
1939       return PL_existence_error("cpu", head);
1940 
1941     CPU_SET(i, set);
1942 
1943     if ( n++ == 100 && !PL_is_acyclic(tail) )
1944       return PL_type_error("list", tail);
1945   }
1946   if ( !PL_get_nil_ex(tail) )
1947     return FALSE;
1948 
1949   if ( n == 0 )
1950     return PL_domain_error("cpu_affinity", affinity);
1951 
1952   return TRUE;
1953 }
1954 
1955 #endif /*defined(HAVE_PTHREAD_ATTR_SETAFFINITY_NP) || defined(HAVE_SCHED_SETAFFINITY)*/
1956 
1957 static int
set_affinity(term_t affinity,pthread_attr_t * attr)1958 set_affinity(term_t affinity, pthread_attr_t *attr)
1959 {
1960 #ifdef HAVE_PTHREAD_ATTR_SETAFFINITY_NP
1961   cpu_set_t cpuset;
1962 
1963   if ( !get_cpuset(affinity, &cpuset) )
1964     return EINVAL;
1965 
1966   return pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
1967 #endif
1968 
1969   return 0;
1970 }
1971 
1972 
1973 word
pl_thread_create(term_t goal,term_t id,term_t options)1974 pl_thread_create(term_t goal, term_t id, term_t options)
1975 { GET_LD
1976   PL_thread_info_t *info;
1977   thread_handle *th;
1978   PL_local_data_t *ldnew, *ldold = LD;
1979   atom_t alias = NULL_ATOM, idname;
1980   pthread_attr_t attr;
1981   size_t stack = 0;
1982   size_t c_stack = 0;
1983   term_t inherit_from = 0;
1984   term_t at_exit = 0;
1985   term_t affinity = 0;
1986   size_t queue_max_size = 0;
1987   int rc = 0;
1988   const char *func;
1989   int debug = -1;
1990   int detached = FALSE;
1991 
1992   if ( !PL_is_callable(goal) )
1993     return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_callable, goal);
1994 
1995   if ( !GD->thread.enabled || GD->cleaning != CLN_NORMAL )
1996     return PL_error(NULL, 0, "threading disabled",
1997 		      ERR_PERMISSION,
1998 		      ATOM_create, ATOM_thread, goal);
1999 
2000   if ( !(info = alloc_thread()) )
2001     return PL_error(NULL, 0, NULL, ERR_RESOURCE, ATOM_threads);
2002 
2003   ldnew = info->thread_data;
2004 
2005   if ( !scan_options(options, 0, /*OPT_ALL,*/
2006 		     ATOM_thread_option, make_thread_options,
2007 		     &alias,
2008 		     &debug,
2009 		     &detached,
2010 		     &stack,		/* stack */
2011 		     &c_stack,		/* c_stack */
2012 		     &at_exit,
2013 		     &inherit_from,
2014 		     &affinity,
2015 		     &queue_max_size) )
2016   { free_thread_info(info);
2017     fail;
2018   }
2019   info->detached = detached;
2020   if ( at_exit && !PL_is_callable(at_exit) )
2021   { free_thread_info(info);
2022     return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_callable, at_exit);
2023   }
2024   if ( inherit_from )
2025   { PL_thread_info_t *oinfo;
2026 
2027     if ( get_thread(inherit_from, &oinfo, TRUE) )
2028     { ldold = oinfo->thread_data;
2029     } else
2030     { free_thread_info(info);
2031       return FALSE;
2032     }
2033   }
2034   if ( debug >= 0 )
2035     info->debug = debug;
2036   else
2037     info->debug = ldold->thread.info->debug;
2038 
2039   if ( !PL_is_variable(id) &&
2040        !(PL_get_atom(id, &idname) && idname == alias) )
2041   { free_thread_info(info);
2042     return PL_error("thread_create", 3, NULL, ERR_UNINSTANTIATION, 2, id);
2043   }
2044 
2045   if ( stack )
2046     info->stack_limit = stack;
2047   else
2048     info->stack_limit = ldold->stacks.limit;
2049 
2050   th = create_thread_handle(info);
2051   if ( alias )
2052   { if ( !aliasThread(info->pl_tid, ATOM_thread, alias) )
2053     { free_thread_info(info);
2054       fail;
2055     }
2056   }
2057   if ( !unify_thread_id(id, info) )
2058   { free_thread_info(info);
2059 
2060     if ( !PL_exception(0) )
2061       return PL_uninstantiation_error(id);
2062 
2063     fail;
2064   }
2065   if ( !info->detached )
2066     PL_unregister_atom(th->symbol);
2067 
2068   info->goal = PL_record(goal);
2069   info->module = PL_context();
2070   copy_local_data(ldnew, ldold, queue_max_size);
2071   if ( at_exit )
2072     register_event_hook(&ldnew->event.hook.onthreadexit, FALSE, at_exit, 0);
2073 
2074   pthread_attr_init(&attr);
2075   if ( info->detached )
2076   { func = "pthread_attr_setdetachstate";
2077     rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
2078   }
2079   if ( rc == 0 && affinity )
2080     rc = set_affinity(affinity, &attr);
2081   if ( rc == 0 )
2082   {
2083 #ifdef USE_COPY_STACK_SIZE
2084     struct rlimit rlim;
2085     if ( !stack && getrlimit(RLIMIT_STACK, &rlim) == 0 )
2086     { if ( rlim.rlim_cur != RLIM_INFINITY )
2087 	stack = rlim.rlim_cur;
2088 					/* What is an infinite stack!? */
2089     }
2090 #endif
2091     if ( c_stack )
2092     { stack = round_pages(c_stack);
2093       func = "pthread_attr_setstacksize";
2094       rc = pthread_attr_setstacksize(&attr, c_stack);
2095       info->c_stack_size = c_stack;
2096     } else
2097     { pthread_attr_getstacksize(&attr, &info->c_stack_size);
2098     }
2099   }
2100   if ( rc == 0 )
2101   { PL_LOCK(L_THREAD);
2102     info->status = PL_THREAD_CREATED;
2103     assert(info->goal);
2104     func = "pthread_create";
2105     rc = pthread_create(&info->tid, &attr, start_thread, info);
2106     PL_UNLOCK(L_THREAD);
2107   }
2108   pthread_attr_destroy(&attr);
2109 
2110   if ( rc != 0 )
2111   { free_thread_info(info);
2112     if ( !PL_exception(0) )
2113       PL_error(NULL, 0, ThError(rc),
2114 	       ERR_SYSCALL, func);
2115     return FALSE;
2116   }
2117 
2118   return TRUE;
2119 }
2120 
2121 
2122 static thread_handle *
symbol_thread_handle(atom_t a)2123 symbol_thread_handle(atom_t a)
2124 { void *data;
2125   size_t len;
2126   PL_blob_t *type;
2127 
2128   if ( a && (data=PL_blob_data(a, &len, &type)) && type == &thread_blob )
2129   { thread_handle **erd = data;
2130 
2131     return *erd;
2132   }
2133 
2134   return NULL;
2135 }
2136 
2137 
2138 static int
get_thread(term_t t,PL_thread_info_t ** info,int warn)2139 get_thread(term_t t, PL_thread_info_t **info, int warn)
2140 { GET_LD
2141   int i = -1;
2142   atom_t a;
2143 
2144   if ( PL_get_atom(t, &a) )
2145   { thread_handle *er;
2146     atom_t symbol = a;
2147 
2148   from_symbol:
2149     if ( (er=symbol_thread_handle(symbol)) )
2150     { if ( er->info && !THREAD_STATUS_INVALID(er->info->status) )
2151       { i = er->engine_id;
2152       } else
2153       { no_thread:
2154 	if ( warn )
2155 	  PL_existence_error("thread", t);
2156 	return FALSE;
2157       }
2158     } else if ( isTextAtom(symbol) )	/* alias name? */
2159     { word w;
2160 
2161       if ( (w = (word)lookupHTable(threadTable, (void *)symbol)) )
2162       { symbol = w;
2163 	goto from_symbol;
2164       } else
2165 	goto no_thread;
2166     } else
2167     { if ( warn )
2168 	PL_type_error("thread", t);
2169       return FALSE;
2170     }
2171   } else if ( !PL_get_integer(t, &i) )
2172   { if ( warn )
2173       PL_type_error("thread", t);
2174     return FALSE;
2175   }
2176 
2177   if ( i < 1 ||
2178        i > GD->thread.highest_id ||
2179        THREAD_STATUS_INVALID(GD->thread.threads[i]->status) )
2180   { goto no_thread;
2181   }
2182 
2183   *info = GD->thread.threads[i];
2184 
2185   return TRUE;
2186 }
2187 
2188 
2189 static thread_handle *
create_thread_handle(PL_thread_info_t * info)2190 create_thread_handle(PL_thread_info_t *info)
2191 { atom_t symbol;
2192 
2193   if ( (symbol=info->symbol) )
2194   { return symbol_thread_handle(symbol);
2195   } else
2196   { thread_handle *ref;
2197 
2198     if ( info->is_engine )
2199       ref = PL_malloc(sizeof(*ref)+sizeof(simpleMutex));
2200     else
2201       ref = PL_malloc(sizeof(*ref));
2202 
2203     if ( ref )
2204     { int new;
2205 
2206       memset(ref, 0, sizeof(*ref));
2207       if ( info->is_engine )
2208       { ref->interactor.mutex = (simpleMutex*)(ref+1);
2209 	simpleMutexInit(ref->interactor.mutex);
2210       }
2211 
2212       ref->info      = info;
2213       ref->engine_id = info->pl_tid;
2214       ref->symbol    = lookupBlob((char*)&ref, sizeof(ref), &thread_blob, &new);
2215       assert(new);
2216       info->symbol   = ref->symbol;
2217     }
2218 
2219     return ref;
2220   }
2221 }
2222 
2223 
2224 int
unify_thread_id(term_t id,PL_thread_info_t * info)2225 unify_thread_id(term_t id, PL_thread_info_t *info)
2226 { GET_LD
2227   thread_handle *th;
2228 
2229   if ( (th = create_thread_handle(info)) )
2230   { atom_t name = th->alias ? th->alias : th->symbol;
2231 
2232     return PL_unify_atom(id, name);
2233   } else					/* during destruction? */
2234     return PL_unify_integer(id, info->pl_tid);
2235 }
2236 
2237 
2238 /* If lock = TRUE, this is used from thread_property/2 and we must
2239    be careful that the thread may vanish during the process if it
2240    is a detached thread.  Note that we only avoid crashes.  The fact
2241    that the value may not be true at the moment it is requested is
2242    simply a limitation of status pulling.
2243 */
2244 
2245 static int
unify_engine_status(term_t status,PL_thread_info_t * info ARG_LD)2246 unify_engine_status(term_t status, PL_thread_info_t *info ARG_LD)
2247 { return PL_unify_atom(status, ATOM_suspended);
2248 }
2249 
2250 
2251 static int
unify_thread_status(term_t status,PL_thread_info_t * info,thread_status stat,int lock)2252 unify_thread_status(term_t status, PL_thread_info_t *info,
2253 		    thread_status stat, int lock)
2254 { GET_LD
2255 
2256   switch(stat)
2257   { case PL_THREAD_CREATED:
2258     case PL_THREAD_RUNNING:
2259     { int rc = FALSE;
2260       if ( info->is_engine )
2261       { if ( lock ) PL_LOCK(L_THREAD);
2262 	if ( !info->has_tid )
2263 	  rc = unify_engine_status(status, info PASS_LD);
2264 	if ( lock ) PL_UNLOCK(L_THREAD);
2265       }
2266       return rc || PL_unify_atom(status, ATOM_running);
2267     }
2268     case PL_THREAD_EXITED:
2269     { term_t tmp = PL_new_term_ref();
2270       int rc = TRUE;
2271 
2272       if ( info->return_value )
2273       { if ( lock ) PL_LOCK(L_THREAD);
2274 	if ( info->return_value )
2275 	  rc = PL_recorded(info->return_value, tmp);
2276 	if ( lock ) PL_UNLOCK(L_THREAD);
2277       }
2278 
2279       if ( !rc )
2280 	return raiseStackOverflow(GLOBAL_OVERFLOW);
2281       else
2282 	return PL_unify_term(status,
2283 			     PL_FUNCTOR, FUNCTOR_exited1,
2284 			       PL_TERM, tmp);
2285     }
2286     case PL_THREAD_SUCCEEDED:
2287       return PL_unify_atom(status, ATOM_true);
2288     case PL_THREAD_FAILED:
2289       return PL_unify_atom(status, ATOM_false);
2290     case PL_THREAD_EXCEPTION:
2291     { term_t tmp = PL_new_term_ref();
2292       int rc = TRUE;
2293 
2294       if ( lock ) PL_LOCK(L_THREAD);
2295       if ( info->return_value )
2296 	rc = PL_recorded(info->return_value, tmp);
2297       if ( lock ) PL_UNLOCK(L_THREAD);
2298       if ( !rc )
2299 	return raiseStackOverflow(GLOBAL_OVERFLOW);
2300       else
2301 	return PL_unify_term(status,
2302 			     PL_FUNCTOR, FUNCTOR_exception1,
2303 			       PL_TERM, tmp);
2304     }
2305     case PL_THREAD_NOMEM:
2306     { return PL_unify_term(status,
2307 			   PL_FUNCTOR, FUNCTOR_exception1,
2308 			     PL_FUNCTOR, FUNCTOR_error2,
2309 			       PL_FUNCTOR, FUNCTOR_resource_error1,
2310 			         PL_ATOM, ATOM_memory,
2311 			       PL_VARIABLE);
2312     }
2313     default:
2314       DEBUG(MSG_THREAD, Sdprintf("info->status = %d\n", info->status));
2315       fail;				/* can happen in current_thread/2 */
2316   }
2317 }
2318 
2319 
2320 word
pl_thread_self(term_t self)2321 pl_thread_self(term_t self)
2322 { GET_LD
2323 
2324   return unify_thread_id(self, LD->thread.info);
2325 }
2326 
2327 static void
unalias_thread(thread_handle * th)2328 unalias_thread(thread_handle *th)
2329 { atom_t name;
2330 
2331   if ( (name=th->alias) )
2332   { atom_t symbol;
2333 
2334     if ( (symbol=(word)deleteHTable(threadTable, (void *)th->alias)) )
2335     { th->alias = NULL_ATOM;
2336       PL_unregister_atom(name);
2337       PL_unregister_atom(symbol);
2338     }
2339   }
2340 }
2341 
2342 
2343 static void
free_thread_info(PL_thread_info_t * info)2344 free_thread_info(PL_thread_info_t *info)
2345 { record_t rec_rv, rec_g;
2346   PL_thread_info_t *freelist;
2347 
2348   assert(info->status != PL_THREAD_UNUSED);
2349   info->status = PL_THREAD_UNUSED;
2350 
2351   if ( info->thread_data )
2352   { info->detached = FALSE;	/* avoid recursion and we are dead anyway */
2353     free_prolog_thread(info->thread_data);
2354   }
2355 
2356   PL_LOCK(L_THREAD);
2357   if ( info->symbol )
2358   { thread_handle *th;
2359 
2360     if ( (th=symbol_thread_handle(info->symbol)) )
2361     { th->info = NULL;
2362       if ( th->alias && !info->is_engine )
2363 	unalias_thread(th);
2364     }
2365 
2366     if ( info->detached && !info->is_engine )
2367       PL_unregister_atom(info->symbol);
2368   }
2369 
2370   if ( (rec_rv=info->return_value) )	/* sync with unify_thread_status() */
2371     info->return_value = NULL;
2372   if ( (rec_g=info->goal) )
2373     info->goal = NULL;
2374 
2375   if ( info->pl_tid == GD->thread.highest_id )
2376   { int i;
2377 
2378     for(i=info->pl_tid-1; i>1; i--)
2379     { PL_thread_info_t *ih = GD->thread.threads[i];
2380       if ( ih && ih->status != PL_THREAD_UNUSED )
2381 	break;
2382     }
2383 
2384     GD->thread.highest_id = i;
2385   }
2386   PL_UNLOCK(L_THREAD);
2387 
2388   do
2389   { freelist = GD->thread.free;
2390     info->next_free = freelist;
2391   } while( !COMPARE_AND_SWAP_PTR(&GD->thread.free, freelist, info) );
2392 
2393   if ( rec_rv ) PL_erase(rec_rv);
2394   if ( rec_g )  PL_erase(rec_g);
2395 }
2396 
2397 
2398 static int
pthread_join_interruptible(pthread_t thread,void ** retval)2399 pthread_join_interruptible(pthread_t thread, void **retval)
2400 {
2401 #ifdef HAVE_PTHREAD_TIMEDJOIN_NP
2402   for(;;)
2403   { struct timespec deadline;
2404     int rc;
2405 
2406     get_current_timespec(&deadline);
2407     deadline.tv_nsec += 250000000;
2408     carry_timespec_nanos(&deadline);
2409 
2410     if ( (rc=pthread_timedjoin_np(thread, retval, &deadline)) == ETIMEDOUT )
2411     { if ( PL_handle_signals() < 0 )
2412 	return EINTR;
2413     } else
2414       return rc;
2415   }
2416 #else
2417   return pthread_join(thread, retval);
2418 #endif
2419 }
2420 
2421 
2422 
2423 static
2424 PRED_IMPL("thread_join", 2, thread_join, 0)
2425 { PRED_LD
2426   PL_thread_info_t *info;
2427   void *r;
2428   word rval;
2429   int rc;
2430   thread_status status;
2431 
2432   term_t thread = A1;
2433   term_t retcode = A2;
2434 
2435   if ( !get_thread(thread, &info, TRUE) )
2436     return FALSE;
2437 
2438   if ( info == LD->thread.info || info->detached )
2439   { return PL_error("thread_join", 2,
2440 		    info->detached ? "Cannot join detached thread"
2441 				   : "Cannot join self",
2442 		    ERR_PERMISSION, ATOM_join, ATOM_thread, thread);
2443   }
2444 
2445   rc = pthread_join_interruptible(info->tid, &r);
2446 
2447   switch(rc)
2448   { case 0:
2449       break;
2450     case EINTR:
2451       return FALSE;
2452     case ESRCH:
2453       Sdprintf("Join %s: ESRCH from %d\n",
2454 	       threadName(info->pl_tid), info->tid);
2455       return PL_error("thread_join", 2, NULL,
2456 		      ERR_EXISTENCE, ATOM_thread, thread);
2457     default:
2458       return PL_error("thread_join", 2, ThError(rc),
2459 		      ERR_SYSCALL, "pthread_join");
2460   }
2461 
2462   status = info->status;
2463   if ( !THREAD_STATUS_INVALID(status) &&
2464        COMPARE_AND_SWAP_INT((int*)&info->status, (int)status, (int)PL_THREAD_JOINED) )
2465   { rval = unify_thread_status(retcode, info, status, FALSE);
2466 
2467     free_thread_info(info);
2468   } else
2469   { rval = PL_error(NULL, 0, "already joined",
2470 		    ERR_EXISTENCE, ATOM_thread, thread);
2471   }
2472 
2473   return rval;
2474 }
2475 
2476 
2477 word
pl_thread_exit(term_t retcode)2478 pl_thread_exit(term_t retcode)
2479 { GET_LD
2480   PL_thread_info_t *info = LD->thread.info;
2481 
2482   PL_LOCK(L_THREAD);
2483   info->status = PL_THREAD_EXITED;
2484   info->return_value = PL_record(retcode);
2485   PL_UNLOCK(L_THREAD);
2486 
2487   DEBUG(MSG_THREAD, Sdprintf("thread_exit(%d)\n", info->pl_tid));
2488 
2489   pthread_exit(NULL);
2490   assert(0);
2491   fail;
2492 }
2493 
2494 
2495 static
2496 PRED_IMPL("thread_detach", 1, thread_detach, 0)
2497 { PL_thread_info_t *info;
2498   PL_thread_info_t *release = NULL;
2499 
2500   PL_LOCK(L_THREAD);
2501   if ( !get_thread(A1, &info, TRUE) )
2502   { PL_UNLOCK(L_THREAD);
2503     fail;
2504   }
2505 
2506   if ( !info->detached )
2507   { int rc;
2508 
2509     if ( (rc=pthread_detach(info->tid)) )
2510     { assert(rc == ESRCH);
2511 
2512       release = info;
2513     } else
2514     { PL_register_atom(info->symbol);
2515       info->detached = TRUE;
2516     }
2517   }
2518 
2519   PL_UNLOCK(L_THREAD);
2520 
2521   if ( release )
2522     free_thread_info(release);
2523 
2524   succeed;
2525 }
2526 
2527 
2528 static
2529 PRED_IMPL("thread_alias", 1, thread_alias, 0)
2530 { PRED_LD
2531   PL_thread_info_t *info = LD->thread.info;
2532   thread_handle *th;
2533   atom_t alias;
2534 
2535   if ( (th = create_thread_handle(info)) &&
2536        th->alias )
2537   { term_t ex = PL_new_term_ref();
2538 
2539     return ( unify_thread_id(ex, info) &&
2540 	     PL_permission_error("re-alias", "thread", ex) );
2541   }
2542 
2543   return ( PL_get_atom_ex(A1, &alias) &&
2544 	   aliasThread(PL_thread_self(), ATOM_thread, alias) );
2545 }
2546 
2547 
2548 static size_t
sizeof_thread(PL_thread_info_t * info)2549 sizeof_thread(PL_thread_info_t *info)
2550 { size_t size = sizeof(*info);
2551   struct PL_local_data *ld = info->thread_data;
2552 
2553   if ( info->status != PL_THREAD_RUNNING )
2554     return 0;
2555 
2556   if ( ld )
2557   { size += sizeof(*ld);
2558     size += sizeStackP(&ld->stacks.global)   + ld->stacks.global.spare;
2559     size += sizeStackP(&ld->stacks.local)    + ld->stacks.local.spare;
2560     size += sizeStackP(&ld->stacks.trail)    + ld->stacks.trail.spare;
2561     size += sizeStackP(&ld->stacks.argument);
2562 
2563     size += sizeof_message_queue(&ld->thread.messages);
2564     size += sizeof_local_definitions(ld);
2565 
2566     if ( ld->tabling.node_pool )
2567       size += ld->tabling.node_pool->size;
2568   }
2569 
2570   return size;
2571 }
2572 
2573 
2574 		 /*******************************
2575 		 *	  THREAD PROPERTY	*
2576 		 *******************************/
2577 
2578 static atom_t
symbol_alias(atom_t symbol)2579 symbol_alias(atom_t symbol)
2580 { thread_handle *th;
2581 
2582   if ( (th=symbol_thread_handle(symbol)) )
2583     return th->alias;
2584 
2585   return NULL_ATOM;
2586 }
2587 
2588 static int
thread_id_propery(PL_thread_info_t * info,term_t prop ARG_LD)2589 thread_id_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2590 { return PL_unify_integer(prop, info->pl_tid);
2591 }
2592 
2593 static int
thread_alias_propery(PL_thread_info_t * info,term_t prop ARG_LD)2594 thread_alias_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2595 { atom_t symbol, alias;
2596 
2597   if ( (symbol=info->symbol) &&
2598        (alias=symbol_alias(symbol)) )
2599     return PL_unify_atom(prop, alias);
2600 
2601   fail;
2602 }
2603 
2604 static int
thread_status_propery(PL_thread_info_t * info,term_t prop ARG_LD)2605 thread_status_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2606 { IGNORE_LD
2607 
2608   return unify_thread_status(prop, info, info->status, TRUE);
2609 }
2610 
2611 static int
thread_detached_propery(PL_thread_info_t * info,term_t prop ARG_LD)2612 thread_detached_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2613 { IGNORE_LD
2614 
2615   return PL_unify_bool_ex(prop, info->detached);
2616 }
2617 
2618 static int
thread_debug_propery(PL_thread_info_t * info,term_t prop ARG_LD)2619 thread_debug_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2620 { IGNORE_LD
2621 
2622   return PL_unify_bool_ex(prop, info->debug);
2623 }
2624 
2625 static int
thread_engine_propery(PL_thread_info_t * info,term_t prop ARG_LD)2626 thread_engine_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2627 { IGNORE_LD
2628 
2629   return PL_unify_bool_ex(prop, info->is_engine);
2630 }
2631 
2632 static int
thread_thread_propery(PL_thread_info_t * info,term_t prop ARG_LD)2633 thread_thread_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2634 { if ( info->is_engine )
2635   { thread_handle *th = symbol_thread_handle(info->symbol);
2636 
2637     if ( th->interactor.thread )
2638     { atom_t alias = symbol_alias(th->interactor.thread);
2639 
2640       return PL_unify_atom(prop, alias ? alias : th->interactor.thread);
2641     }
2642   }
2643 
2644   return FALSE;
2645 }
2646 
2647 static int
thread_tid_propery(PL_thread_info_t * info,term_t prop ARG_LD)2648 thread_tid_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2649 { IGNORE_LD
2650 
2651   if ( info->has_tid )
2652   { intptr_t tid = system_thread_id(info);
2653 
2654     if ( tid != -1 )
2655       return PL_unify_integer(prop, system_thread_id(info));
2656   }
2657 
2658   return FALSE;
2659 }
2660 
2661 static int
thread_size_propery(PL_thread_info_t * info,term_t prop ARG_LD)2662 thread_size_propery(PL_thread_info_t *info, term_t prop ARG_LD)
2663 { size_t size;
2664 
2665   PL_LOCK(L_THREAD);
2666   size = sizeof_thread(info);
2667   PL_UNLOCK(L_THREAD);
2668 
2669   if ( size )
2670     return PL_unify_int64(prop, size);
2671 
2672   return FALSE;
2673 }
2674 
2675 static const tprop tprop_list [] =
2676 { { FUNCTOR_id1,	       thread_id_propery },
2677   { FUNCTOR_alias1,	       thread_alias_propery },
2678   { FUNCTOR_status1,	       thread_status_propery },
2679   { FUNCTOR_detached1,	       thread_detached_propery },
2680   { FUNCTOR_debug1,	       thread_debug_propery },
2681   { FUNCTOR_engine1,	       thread_engine_propery },
2682   { FUNCTOR_thread1,	       thread_thread_propery },
2683   { FUNCTOR_system_thread_id1, thread_tid_propery },
2684   { FUNCTOR_size1,	       thread_size_propery },
2685   { 0,			       NULL }
2686 };
2687 
2688 
2689 typedef struct
2690 { int		tid;
2691   const tprop  *p;
2692   int		enum_threads;
2693   int		enum_properties;
2694 } tprop_enum;
2695 
2696 
2697 static int
advance_state(tprop_enum * state)2698 advance_state(tprop_enum *state)
2699 { if ( state->enum_properties )
2700   { state->p++;
2701     if ( state->p->functor )
2702       succeed;
2703 
2704     state->p = tprop_list;
2705   }
2706   if ( state->enum_threads )
2707   { do
2708     { state->tid++;
2709       if ( state->tid > GD->thread.highest_id )
2710 	fail;
2711     } while ( GD->thread.threads[state->tid]->status == PL_THREAD_UNUSED ||
2712               GD->thread.threads[state->tid]->status == PL_THREAD_RESERVED );
2713 
2714     succeed;
2715   }
2716 
2717   fail;
2718 }
2719 
2720 
2721 static
2722 PRED_IMPL("thread_property", 2, thread_property, PL_FA_NONDETERMINISTIC)
2723 { PRED_LD
2724   term_t thread = A1;
2725   term_t property = A2;
2726   tprop_enum statebuf;
2727   tprop_enum *state;
2728 
2729   switch( CTX_CNTRL )
2730   { case FRG_FIRST_CALL:
2731     { PL_thread_info_t *info;
2732 
2733       memset(&statebuf, 0, sizeof(statebuf));
2734       state = &statebuf;
2735 
2736       if ( PL_is_variable(thread) )
2737       { switch( get_prop_def(property, ATOM_thread_property,
2738 			     tprop_list, &statebuf.p) )
2739 	{ case 1:
2740 	    state->tid = 1;
2741 	    state->enum_threads = TRUE;
2742 	    goto enumerate;
2743 	  case 0:
2744 	    state->p = tprop_list;
2745 	    state->tid = 1;
2746 	    state->enum_threads = TRUE;
2747 	    state->enum_properties = TRUE;
2748 	    goto enumerate;
2749 	  case -1:
2750 	    fail;
2751 	}
2752       } else if ( get_thread(thread, &info, TRUE) )
2753       { state->tid = info->pl_tid;
2754 
2755 	switch( get_prop_def(property, ATOM_thread_property,
2756 			     tprop_list, &statebuf.p) )
2757 	{ case 1:
2758 	    goto enumerate;
2759 	  case 0:
2760 	    state->p = tprop_list;
2761 	    state->enum_properties = TRUE;
2762 	    goto enumerate;
2763 	  case -1:
2764 	    fail;
2765 	}
2766       } else
2767       { fail;
2768       }
2769     }
2770     case FRG_REDO:
2771       state = CTX_PTR;
2772       break;
2773     case FRG_CUTTED:
2774       state = CTX_PTR;
2775       freeForeignState(state, sizeof(*state));
2776       succeed;
2777     default:
2778       assert(0);
2779       fail;
2780   }
2781 
2782 enumerate:
2783   { term_t arg = PL_new_term_ref();
2784 
2785     if ( !state->enum_properties )
2786       _PL_get_arg(1, property, arg);
2787 
2788     for(;;)
2789     { PL_thread_info_t *info = GD->thread.threads[state->tid];
2790 
2791       if ( info && (*state->p->function)(info, arg PASS_LD) )
2792       { if ( state->enum_properties )
2793 	{ if ( !PL_unify_term(property,
2794 			      PL_FUNCTOR, state->p->functor,
2795 			        PL_TERM, arg) )
2796 	    goto error;
2797 	}
2798 	if ( state->enum_threads )
2799 	{ if ( !unify_thread_id(thread, info) )
2800 	    goto error;
2801 	}
2802 
2803 	if ( advance_state(state) )
2804 	{ if ( state == &statebuf )
2805 	  { tprop_enum *copy = allocForeignState(sizeof(*copy));
2806 
2807 	    *copy = *state;
2808 	    state = copy;
2809 	  }
2810 
2811 	  ForeignRedoPtr(state);
2812 	}
2813 
2814 	if ( state != &statebuf )
2815 	  freeForeignState(state, sizeof(*state));
2816 	succeed;
2817       }
2818 
2819       if ( !advance_state(state) )
2820       { error:
2821 	if ( state != &statebuf )
2822 	  freeForeignState(state, sizeof(*state));
2823 	fail;
2824       }
2825     }
2826   }
2827 }
2828 
2829 
2830 static
2831 PRED_IMPL("is_thread", 1, is_thread, 0)
2832 { PL_thread_info_t *info;
2833 
2834   return get_thread(A1, &info, FALSE);
2835 }
2836 
2837 
2838 static
2839 PRED_IMPL("thread_setconcurrency", 2, thread_setconcurrency, 0)
2840 { PRED_LD
2841 
2842 #ifdef HAVE_PTHREAD_SETCONCURRENCY
2843   int val = pthread_getconcurrency();
2844   int rc;
2845 
2846   if ( PL_unify_integer(A1, val) )
2847   { if ( PL_compare(A1, A2) != 0  )
2848     { if ( PL_get_integer_ex(A2, &val) )
2849       { if ( (rc=pthread_setconcurrency(val)) != 0 )
2850 	  return PL_error(NULL, 0, ThError(rc),
2851 			  ERR_SYSCALL, "pthread_setconcurrency");
2852       }
2853     }
2854   }
2855 
2856   succeed;
2857 #else
2858   return PL_unify_integer(A1, 0);
2859 #endif
2860 }
2861 
2862 #if defined(HAVE_SCHED_SETAFFINITY) && defined(PID_IDENTIFIES_THREAD)
2863 #define HAVE_PRED_THREAD_AFFINITY 1
2864 static
2865 PRED_IMPL("thread_affinity", 3, thread_affinity, 0)
2866 { PRED_LD
2867   PL_thread_info_t *info;
2868   int rc;
2869 
2870   PL_LOCK(L_THREAD);
2871   if ( (rc=get_thread(A1, &info, TRUE)) )
2872   { cpu_set_t cpuset;
2873 
2874     if ( (rc=sched_getaffinity(info->pid, sizeof(cpuset), &cpuset)) == 0 )
2875     { int count = CPU_COUNT(&cpuset);
2876       int i, n;
2877       term_t tail = PL_copy_term_ref(A2);
2878       term_t head = PL_new_term_ref();
2879 
2880       for(i=0, n=0; n<count; i++)
2881       { if ( CPU_ISSET(i, &cpuset) )
2882 	{ n++;
2883 	  if ( !PL_unify_list_ex(tail, head, tail) ||
2884 	       !PL_unify_integer(head, i) )
2885 	    goto error;			/* rc == 0 (FALSE) */
2886 	}
2887       }
2888       if ( !PL_unify_nil_ex(tail) )
2889 	goto error;			/* rc == 0 (FALSE) */
2890     } else
2891     { rc = PL_error(NULL, 0, ThError(rc),
2892 		    ERR_SYSCALL, "sched_getaffinity");
2893       goto error;
2894     }
2895 
2896     if ( PL_compare(A2, A3) != 0 )
2897     { if ( (rc=get_cpuset(A3, &cpuset)) )
2898       { if ( (rc=sched_setaffinity(info->pid, sizeof(cpuset), &cpuset)) == 0 )
2899 	{ rc = TRUE;
2900 	} else
2901 	{ rc = PL_error(NULL, 0, ThError(rc),
2902 			ERR_SYSCALL, "sched_setaffinity");
2903 	}
2904       }
2905     } else
2906     { rc = TRUE;
2907     }
2908   }
2909 error:
2910   PL_UNLOCK(L_THREAD);
2911 
2912   return rc;
2913 }
2914 #endif /*HAVE_SCHED_SETAFFINITY*/
2915 
2916 
2917 		 /*******************************
2918 		 *	     CLEANUP		*
2919 		 *******************************/
2920 
2921 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
2922 Request a function to run when the Prolog thread is about to detach, but
2923 still capable of running Prolog queries.
2924 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
2925 
2926 int
PL_thread_at_exit(void (* function)(void *),void * closure,int global)2927 PL_thread_at_exit(void (*function)(void *), void *closure, int global)
2928 { GET_LD
2929   event_list **list = global ? &GD->event.hook.onthreadexit
2930 			     : &LD->event.hook.onthreadexit;
2931   int (*func)() = (void *)function;
2932 
2933   return register_event_function(list, FALSE, func, closure, 0);
2934 }
2935 
2936 		 /*******************************
2937 		 *	   THREAD SIGNALS	*
2938 		 *******************************/
2939 
2940 typedef struct _thread_sig
2941 { struct _thread_sig *next;		/* Next in queue */
2942   Module   module;			/* Module for running goal */
2943   record_t goal;			/* Goal to run */
2944 } thread_sig;
2945 
2946 
2947 static int
is_alive(int status)2948 is_alive(int status)
2949 { switch(status)
2950   { case PL_THREAD_CREATED:
2951     case PL_THREAD_RUNNING:
2952       succeed;
2953     default:
2954       fail;
2955   }
2956 }
2957 
2958 
2959 foreign_t
pl_thread_signal(term_t thread,term_t goal)2960 pl_thread_signal(term_t thread, term_t goal)
2961 { GET_LD
2962   Module m = NULL;
2963   thread_sig *sg;
2964   PL_thread_info_t *info;
2965   PL_local_data_t *ld;
2966 
2967   if ( !PL_strip_module(goal, &m, goal) )
2968     return FALSE;
2969 
2970   PL_LOCK(L_THREAD);
2971   if ( !get_thread(thread, &info, TRUE) )
2972   { PL_UNLOCK(L_THREAD);
2973     fail;
2974   }
2975   if ( !is_alive(info->status) )
2976   { error:
2977     PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_thread, thread);
2978     PL_UNLOCK(L_THREAD);
2979     fail;
2980   }
2981 
2982   sg = allocHeapOrHalt(sizeof(*sg));
2983   sg->next = NULL;
2984   sg->module = m;
2985   sg->goal = PL_record(goal);
2986 
2987   ld = info->thread_data;
2988   if ( !ld->thread.sig_head )
2989     ld->thread.sig_head = ld->thread.sig_tail = sg;
2990   else
2991   { ld->thread.sig_tail->next = sg;
2992     ld->thread.sig_tail = sg;
2993   }
2994   raiseSignal(ld, SIG_THREAD_SIGNAL);
2995   if ( info->has_tid && !alertThread(info) )
2996     goto error;
2997 
2998   PL_UNLOCK(L_THREAD);
2999 
3000   succeed;
3001 }
3002 
3003 
3004 void
executeThreadSignals(int sig)3005 executeThreadSignals(int sig)
3006 { GET_LD
3007   thread_sig *sg, *next;
3008   fid_t fid;
3009   (void)sig;
3010 
3011   if ( !is_alive(LD->thread.info->status) )
3012     return;
3013 
3014   PL_LOCK(L_THREAD);
3015   sg = LD->thread.sig_head;
3016   LD->thread.sig_head = LD->thread.sig_tail = NULL;
3017   PL_UNLOCK(L_THREAD);
3018 
3019   fid = PL_open_foreign_frame();
3020 
3021   for( ; sg; sg = next)
3022   { term_t goal = PL_new_term_ref();
3023     Module gm;
3024     term_t ex;
3025     int rval;
3026 
3027     next = sg->next;
3028     rval = PL_recorded(sg->goal, goal);
3029     PL_erase(sg->goal);
3030     gm = sg->module;
3031     freeHeap(sg, sizeof(*sg));
3032 
3033     DEBUG(MSG_THREAD,
3034 	  Sdprintf("[%d] Executing thread signal\n", PL_thread_self()));
3035     if ( rval )
3036     {
3037 #ifdef O_LIMIT_DEPTH
3038       uintptr_t olimit = depth_limit;
3039       depth_limit = DEPTH_NO_LIMIT;
3040 #endif
3041       rval = callProlog(gm, goal, PL_Q_CATCH_EXCEPTION, &ex);
3042 #ifdef O_LIMIT_DEPTH
3043       depth_limit = olimit;
3044 #endif
3045     } else
3046     { rval = raiseStackOverflow(GLOBAL_OVERFLOW);
3047       ex = exception_term;
3048     }
3049 
3050     if ( !rval && ex )
3051     { PL_raise_exception(ex);
3052       PL_close_foreign_frame(fid);
3053 
3054       DEBUG(MSG_THREAD,
3055 	    { print_trace(8);
3056 	      Sdprintf("[%d]: Prolog backtrace:\n", PL_thread_self());
3057 	      PL_backtrace(5, 0);
3058 	      Sdprintf("[%d]: end Prolog backtrace:\n", PL_thread_self());
3059 	    });
3060 
3061       for(sg = next; sg; sg=next)
3062       { next = sg->next;
3063 	PL_erase(sg->goal);
3064 	freeHeap(sg, sizeof(*sg));
3065       }
3066 
3067       return;
3068     }
3069 
3070     PL_rewind_foreign_frame(fid);
3071   }
3072 
3073   PL_discard_foreign_frame(fid);
3074 }
3075 
3076 
3077 static void
freeThreadSignals(PL_local_data_t * ld)3078 freeThreadSignals(PL_local_data_t *ld)
3079 { thread_sig *sg;
3080   thread_sig *next;
3081 
3082   for( sg = ld->thread.sig_head; sg; sg = next )
3083   { next = sg->next;
3084 
3085     PL_erase(sg->goal);
3086     freeHeap(sg, sizeof(*sg));
3087   }
3088 }
3089 
3090 
3091 		 /*******************************
3092 		 *	    INTERACTORS		*
3093 		 *******************************/
3094 
3095 static int
get_interactor(term_t t,thread_handle ** thp,int warn ARG_LD)3096 get_interactor(term_t t, thread_handle **thp, int warn ARG_LD)
3097 { atom_t a;
3098 
3099   if ( PL_get_atom(t, &a) )
3100   { thread_handle *th;
3101     atom_t symbol = a;
3102 
3103   from_symbol:
3104     if ( (th=symbol_thread_handle(symbol)) &&
3105 	 true(th, TH_IS_INTERACTOR) )
3106     { if ( th->info || true(th, (TH_INTERACTOR_NOMORE|TH_INTERACTOR_DONE)) )
3107       { *thp = th;
3108 	return TRUE;
3109       }
3110       if ( warn )
3111 	PL_existence_error("engine", t);
3112     } else if ( isTextAtom(symbol) )
3113     { word w;
3114 
3115       if ( (w = (word)lookupHTable(threadTable, (void *)symbol)) )
3116       { symbol = w;
3117 	goto from_symbol;
3118       }
3119       if ( warn )
3120 	PL_existence_error("engine", t);
3121     } else
3122     { if ( warn )
3123 	PL_type_error("engine", t);
3124     }
3125   } else
3126   { if ( warn )
3127       PL_type_error("engine", t);
3128   }
3129 
3130   return FALSE;
3131 }
3132 
3133 
3134 /** '$engine_create'(-Handle, +GoalAndTemplate, +Options)
3135 */
3136 
3137 static const opt_spec make_engine_options[] =
3138 { { ATOM_stack_limit,	OPT_SIZE|OPT_INF },
3139   { ATOM_alias,		OPT_ATOM },
3140   { ATOM_inherit_from,	OPT_TERM },
3141   { NULL_ATOM,		0 }
3142 };
3143 
3144 
3145 static
3146 PRED_IMPL("$engine_create", 3, engine_create, 0)
3147 { PRED_LD
3148   PL_engine_t new;
3149   PL_thread_attr_t attrs;
3150   size_t stack	      =	0;
3151   atom_t alias	      =	NULL_ATOM;
3152   term_t inherit_from =	0;
3153 
3154   memset(&attrs, 0, sizeof(attrs));
3155   if ( !scan_options(A3, 0,
3156 		     ATOM_engine_option, make_engine_options,
3157 		     &stack,
3158 		     &alias,
3159 		     &inherit_from) )
3160     return FALSE;
3161 
3162   if ( stack )
3163     attrs.stack_limit = stack;
3164   else
3165     attrs.stack_limit = LD->stacks.limit;
3166 
3167   if ( (new = PL_create_engine(&attrs)) )
3168   { PL_engine_t me;
3169     static predicate_t pred = NULL;
3170     record_t r;
3171     thread_handle *th;
3172     term_t t;
3173     int rc;
3174 
3175     new->thread.info->is_engine = TRUE;
3176     th = create_thread_handle(new->thread.info);
3177     set(th, TH_IS_INTERACTOR);
3178     ATOMIC_INC(&GD->statistics.engines_created);
3179 
3180     if ( alias )
3181     { if ( !aliasThread(new->thread.info->pl_tid, ATOM_engine, alias) )
3182       { destroy_interactor(th, FALSE);
3183 	return FALSE;
3184       }
3185     }
3186 
3187     if ( !(rc = unify_thread_id(A1, new->thread.info)) )
3188     { destroy_interactor(th, FALSE);
3189       if ( !PL_exception(0) )
3190 	return PL_uninstantiation_error(A1);
3191 
3192       return FALSE;
3193     }
3194     PL_unregister_atom(th->symbol);
3195 
3196     if ( !pred )
3197       pred = PL_predicate("call", 1, "system");
3198 
3199     r = PL_record(A2);
3200     rc = PL_set_engine(new, &me);
3201     assert(rc == PL_ENGINE_SET);
3202     LOCAL_LD = new;
3203 
3204     if  ( (t = PL_new_term_ref()) &&
3205 	  (th->interactor.argv = PL_new_term_refs(2)) &&
3206 	  PL_recorded(r, t) &&
3207 	  PL_get_arg(1, t, th->interactor.argv+0) &&
3208 	  PL_get_arg(2, t, th->interactor.argv+1) )
3209     { th->interactor.query = PL_open_query(NULL,
3210 					   PL_Q_CATCH_EXCEPTION|
3211 					   PL_Q_ALLOW_YIELD|
3212 					   PL_Q_EXT_STATUS,
3213 					   pred, th->interactor.argv+1);
3214       PL_set_engine(me, NULL);
3215       LOCAL_LD = me;
3216     } else
3217     { assert(0);			/* TBD: copy exception */
3218     }
3219 
3220     PL_erase(r);
3221 
3222     return TRUE;
3223   }
3224 
3225   return PL_no_memory();
3226 }
3227 
3228 
3229 static void
destroy_interactor(thread_handle * th,int gc)3230 destroy_interactor(thread_handle *th, int gc)
3231 { if ( th->interactor.query )
3232   { PL_engine_t me;
3233 
3234     PL_set_engine(th->info->thread_data, &me);
3235     PL_close_query(th->interactor.query);
3236     PL_set_engine(me, NULL);
3237     th->interactor.query = 0;
3238   }
3239   if ( th->info )
3240   { PL_destroy_engine(th->info->thread_data);
3241     ATOMIC_INC(&GD->statistics.engines_finished);
3242     assert(th->info == NULL || gc);
3243   }
3244   if ( th->interactor.package )
3245   { PL_erase(th->interactor.package);
3246     th->interactor.package = 0;
3247   }
3248   clear(th, (TH_INTERACTOR_NOMORE|TH_INTERACTOR_DONE));
3249   simpleMutexDelete(th->interactor.mutex);
3250   th->interactor.mutex = NULL;
3251   unalias_thread(th);
3252 }
3253 
3254 
3255 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3256 Called when we computed the last result.   This means we can destroy the
3257 Prolog engine, but the reference (and  possible alias) must remain valid
3258 to deal with the engine_destroy/1  or   final  engine_next/1 if the last
3259 answer was deterministic.
3260 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3261 
3262 static void
done_interactor(thread_handle * th)3263 done_interactor(thread_handle *th)
3264 {
3265 #ifndef NDEBUG
3266   GET_LD
3267   PL_engine_t e = th->info->thread_data;
3268   assert(e->thread.info->open_count == 1);
3269   assert(e == LD);
3270 #endif
3271 
3272   set(th, TH_INTERACTOR_DONE);
3273   PL_thread_destroy_engine();
3274   ATOMIC_INC(&GD->statistics.engines_finished);
3275   assert(th->info == NULL);
3276 }
3277 
3278 
3279 static
3280 PRED_IMPL("engine_destroy", 1, engine_destroy, 0)
3281 { PRED_LD
3282   thread_handle *th;
3283 
3284   if ( get_interactor(A1, &th, TRUE PASS_LD) )
3285   { destroy_interactor(th, FALSE);
3286 
3287     return TRUE;
3288   }
3289 
3290   return FALSE;
3291 }
3292 
3293 
3294 static void
copy_debug_mode(PL_local_data_t * to,PL_local_data_t * from)3295 copy_debug_mode(PL_local_data_t *to, PL_local_data_t *from)
3296 { PL_local_data_t *current = PL_current_engine();
3297 
3298   if ( to->_debugstatus.debugging != from->_debugstatus.debugging )
3299   { TLD_set_LD(to);
3300     debugmode(from->_debugstatus.debugging, NULL);
3301     TLD_set_LD(current);
3302   }
3303   if ( to->_debugstatus.tracing != from->_debugstatus.tracing )
3304   { TLD_set_LD(to);
3305     tracemode(from->_debugstatus.tracing, NULL);
3306     TLD_set_LD(current);
3307   }
3308 }
3309 
3310 
3311 static PL_local_data_t *
activate_interactor(thread_handle * th)3312 activate_interactor(thread_handle *th)
3313 { PL_local_data_t *ld = th->info->thread_data;
3314 
3315   TLD_set_LD(ld);
3316   ld->thread.info->tid = pthread_self();
3317   ld->thread.info->has_tid = TRUE;
3318   set_system_thread_id(ld->thread.info);
3319 
3320   return ld;
3321 }
3322 
3323 
3324 static PL_local_data_t *
suspend_interactor(PL_engine_t me,thread_handle * th)3325 suspend_interactor(PL_engine_t me, thread_handle *th)
3326 { if ( th->info )
3327     detach_engine(th->info->thread_data);
3328   TLD_set_LD(me);
3329 
3330   return me;
3331 }
3332 
3333 
3334 #define YIELD_ENGINE_YIELD  256		/* keep in sync with engine_yield/1 */
3335 
3336 static int
interactor_post_answer_nolock(thread_handle * th,term_t ref,term_t package,term_t term ARG_LD)3337 interactor_post_answer_nolock(thread_handle *th,
3338 			      term_t ref, term_t package, term_t term ARG_LD)
3339 { PL_engine_t me = LD;
3340 
3341   if ( package && th->interactor.package )
3342     return PL_permission_error("post_to", "engine", ref);
3343 
3344   if ( !th->interactor.query )
3345   { if ( true(th, TH_INTERACTOR_NOMORE) )
3346     { clear(th, TH_INTERACTOR_NOMORE);
3347       set(th, TH_INTERACTOR_DONE);
3348       return FALSE;
3349     }
3350     return PL_existence_error("engine", ref);
3351   }
3352 
3353   if ( package )
3354     th->interactor.package = PL_record(package);
3355 
3356   if ( (LOCAL_LD = activate_interactor(th)) )
3357   { term_t t;
3358     int rc;
3359     record_t r;
3360 
3361     copy_debug_mode(LD, me);
3362     rc = PL_next_solution(th->interactor.query);
3363 
3364     switch( rc )
3365     { case PL_S_TRUE:
3366       { r = PL_record(th->interactor.argv+0);
3367 	break;
3368       }
3369       case PL_S_LAST:
3370       { r = PL_record(th->interactor.argv+0);
3371 	PL_close_query(th->interactor.query);
3372 	th->interactor.query = 0;
3373 	done_interactor(th);
3374 	set(th, TH_INTERACTOR_NOMORE);
3375 	break;
3376       }
3377       case PL_S_FALSE:
3378       { PL_close_query(th->interactor.query);
3379 	th->interactor.query = 0;
3380 	done_interactor(th);
3381 	LOCAL_LD = suspend_interactor(me, th);
3382 
3383 	return FALSE;
3384       }
3385       case PL_S_EXCEPTION:
3386       { record_t r = PL_record(PL_exception(th->interactor.query));
3387 	term_t ex;
3388 	int rc;
3389 
3390 	PL_close_query(th->interactor.query);
3391 	th->interactor.query = 0;
3392 	done_interactor(th);
3393 	LOCAL_LD = suspend_interactor(me, th);
3394 
3395 	rc = ( (ex = PL_new_term_ref()) &&
3396 	       PL_recorded(r, ex) &&
3397 	       PL_raise_exception(ex) );
3398 
3399 	PL_erase(r);
3400 	return rc;
3401       }
3402       case YIELD_ENGINE_YIELD:			/* engine_yield/1 */
3403       { r = PL_record(PL_yielded(th->interactor.query));
3404 	break;
3405       }
3406       default:
3407       { term_t ex = PL_new_term_ref();
3408 
3409 	return ( PL_put_integer(ex, rc) &&
3410 		 PL_domain_error("engine_yield_code", ex) );
3411       }
3412     }
3413 
3414     LOCAL_LD = suspend_interactor(me, th);
3415     rc = ( (t=PL_new_term_ref()) &&
3416 	   PL_recorded(r, t) &&
3417 	   PL_unify(term, t) );
3418     PL_erase(r);
3419 
3420     return rc;
3421   }
3422 
3423   assert(0);
3424   return FALSE;
3425 }
3426 
3427 
3428 static atom_t
thread_symbol(const PL_local_data_t * ld)3429 thread_symbol(const PL_local_data_t *ld)
3430 { if ( ld->thread.info->is_engine )
3431   { const thread_handle *th = symbol_thread_handle(ld->thread.info->symbol);
3432     return th->interactor.thread;
3433   } else
3434   { return ld->thread.info->symbol;
3435   }
3436 }
3437 
3438 
3439 static int
interactor_post_answer(term_t ref,term_t package,term_t term ARG_LD)3440 interactor_post_answer(term_t ref, term_t package, term_t term ARG_LD)
3441 { thread_handle *th;
3442 
3443   if ( get_interactor(ref, &th, TRUE PASS_LD) )
3444   { int rc;
3445 
3446     simpleMutexLock(th->interactor.mutex);
3447     th->interactor.thread = thread_symbol(LD);
3448     rc = interactor_post_answer_nolock(th, ref, package, term PASS_LD);
3449     th->interactor.thread = NULL_ATOM;
3450     simpleMutexUnlock(th->interactor.mutex);
3451 
3452     return rc;
3453   }
3454 
3455   return FALSE;
3456 }
3457 
3458 
3459 /** engine_next(+Engine, -Next)
3460 */
3461 
3462 static
3463 PRED_IMPL("engine_next", 2, engine_next, 0)
3464 { PRED_LD
3465 
3466   return interactor_post_answer(A1, 0, A2 PASS_LD);
3467 }
3468 
3469 /** engine_post(+Engine, +Term)
3470 */
3471 
3472 static
3473 PRED_IMPL("engine_post", 2, engine_post, 0)
3474 { PRED_LD
3475   thread_handle *th;
3476 
3477   if ( get_interactor(A1, &th, TRUE PASS_LD) )
3478   { int rc;
3479 
3480     simpleMutexLock(th->interactor.mutex);
3481     if ( !th->interactor.package )
3482     { if ( (th->interactor.package = PL_record(A2)) )
3483 	rc = TRUE;
3484       else
3485 	rc = FALSE;
3486     } else
3487     { rc = PL_permission_error("post_to", "engine", A1);
3488     }
3489     simpleMutexUnlock(th->interactor.mutex);
3490 
3491     return rc;
3492   }
3493 
3494   return FALSE;
3495 }
3496 
3497 /** engine_post(+Engine, +Term, -Next)
3498 */
3499 
3500 static
3501 PRED_IMPL("engine_post", 3, engine_post, 0)
3502 { PRED_LD
3503 
3504   return interactor_post_answer(A1, A2, A3 PASS_LD);
3505 }
3506 
3507 
3508 static
3509 PRED_IMPL("engine_fetch", 1, engine_fetch, 0)
3510 { PRED_LD
3511   PL_thread_info_t *info = LD->thread.info;
3512   term_t exv;
3513 
3514   if ( info->is_engine )
3515   { thread_handle *th = symbol_thread_handle(info->symbol);
3516 
3517     if ( th->interactor.package )
3518     { term_t tmp;
3519       int rc;
3520 
3521       rc = ( (tmp = PL_new_term_ref()) &&
3522 	     PL_recorded(th->interactor.package, tmp) &&
3523 	     PL_unify(A1, tmp) );
3524       PL_erase(th->interactor.package);
3525       th->interactor.package = 0;
3526 
3527       return rc;
3528     }
3529   }
3530 
3531   return ( (exv = PL_new_term_refs(2)) &&
3532 	    PL_unify_atom_chars(exv+0, "delivery") &&
3533 	    unify_thread_id(exv+1, info) &&
3534 	    PL_error("engine_fetch", 1, "Use engine_post/2,3", ERR_EXISTENCE3,
3535 		     ATOM_term, exv+0, exv+1) );
3536 }
3537 
3538 
3539 static
3540 PRED_IMPL("is_engine", 1, is_engine, 0)
3541 { PRED_LD
3542   thread_handle *th;
3543 
3544   return get_interactor(A1, &th, FALSE PASS_LD);
3545 }
3546 
3547 
3548 		 /*******************************
3549 		 *	  MESSAGE QUEUES	*
3550 		 *******************************/
3551 
3552 typedef enum
3553 { QUEUE_WAIT_READ,			/* wait for message */
3554   QUEUE_WAIT_DRAIN			/* wait for queue to drain */
3555 } queue_wait_type;
3556 
3557 #define MSG_WAIT_INTR		(-1)
3558 #define MSG_WAIT_TIMEOUT	(-2)
3559 #define MSG_WAIT_DESTROYED	(-3)
3560 
3561 static int dispatch_cond_wait(message_queue *queue,
3562 			      queue_wait_type wait,
3563 			      struct timespec *deadline ARG_LD);
3564 
3565 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3566 This code deals with telling other threads something.  The interface:
3567 
3568 	thread_get_message(-Message)
3569 	thread_send_message(+Id, +Message)
3570 
3571 Queues can be waited for by   multiple  threads using different (partly)
3572 instantiated patterns for Message. For this   reason all waiting threads
3573 should be restarted using pthread_cond_broadcast().   However,  if there
3574 are a large number of workers only   waiting for `any' message this will
3575 cause all of them to wakeup for only   one  to grab the message. This is
3576 highly undesirable and therefore the queue  keeps two counts: the number
3577 of waiting threads and the number waiting  with a variable. Only if they
3578 are not equal and there are multiple waiters we must be using broadcast.
3579 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3580 
3581 typedef struct thread_message
3582 { struct thread_message *next;		/* next in queue */
3583   record_t            message;		/* message in queue */
3584   word		      key;		/* Indexing key */
3585   uint64_t	      sequence_id;	/* Numbered sequence */
3586 } thread_message;
3587 
3588 
3589 static thread_message *
create_thread_message(term_t msg ARG_LD)3590 create_thread_message(term_t msg ARG_LD)
3591 { thread_message *msgp;
3592   record_t rec;
3593 
3594   if ( !(rec=compileTermToHeap(msg, R_NOLOCK)) )
3595     return NULL;
3596 
3597   if ( (msgp = allocHeap(sizeof(*msgp))) )
3598   { msgp->next    = NULL;
3599     msgp->message = rec;
3600     msgp->key     = getIndexOfTerm(msg);
3601   } else
3602   { freeRecord(rec);
3603   }
3604 
3605   return msgp;
3606 }
3607 
3608 
3609 static void
free_thread_message(thread_message * msg)3610 free_thread_message(thread_message *msg)
3611 { if ( msg->message )
3612     freeRecord(msg->message);
3613 
3614   freeHeap(msg, sizeof(*msg));
3615 }
3616 
3617 
3618 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3619 queue_message() adds a message to a message queue.  The caller must hold
3620 the queue-mutex.
3621 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3622 
3623 static int
queue_message(message_queue * queue,thread_message * msgp,struct timespec * deadline ARG_LD)3624 queue_message(message_queue *queue, thread_message *msgp,
3625 	      struct timespec *deadline ARG_LD)
3626 { if ( queue->max_size > 0 && queue->size >= queue->max_size )
3627   { queue->wait_for_drain++;
3628 
3629     while ( queue->size >= queue->max_size )
3630     { switch ( dispatch_cond_wait(queue, QUEUE_WAIT_DRAIN, deadline PASS_LD) )
3631       { case CV_INTR:
3632 	{ if ( !LD )			/* needed for clean exit */
3633 	  { Sdprintf("Forced exit from queue_message()\n");
3634 	    exit(1);
3635 	  }
3636 
3637 	  if ( is_signalled(LD) )			/* thread-signal */
3638 	  { queue->wait_for_drain--;
3639 	    return MSG_WAIT_INTR;
3640 	  }
3641 	  break;
3642 	}
3643 	case CV_TIMEDOUT:
3644 	  queue->wait_for_drain--;
3645 	  return MSG_WAIT_TIMEOUT;
3646 	case CV_READY:
3647 	case CV_MAYBE:
3648 	  break;
3649 	default:
3650 	  assert(0); // should never happen
3651       }
3652       if ( queue->destroyed )
3653       { queue->wait_for_drain--;
3654 	return MSG_WAIT_DESTROYED;
3655       }
3656     }
3657 
3658     queue->wait_for_drain--;
3659   }
3660 
3661   msgp->sequence_id = ++queue->sequence_next;
3662   if ( !queue->head )
3663   { queue->head = queue->tail = msgp;
3664   } else
3665   { queue->tail->next = msgp;
3666     queue->tail = msgp;
3667   }
3668   queue->size++;
3669 
3670   if ( queue->waiting )
3671   { if ( queue->waiting > queue->waiting_var && queue->waiting > 1 )
3672     { DEBUG(MSG_QUEUE,
3673 	    Sdprintf("%d: %d of %d non-var waiters on %p; broadcasting\n",
3674 		     PL_thread_self(),
3675 		     queue->waiting - queue->waiting_var,
3676 		     queue->waiting,
3677 		     queue));
3678       cv_broadcast(&queue->cond_var);
3679     } else
3680     { DEBUG(MSG_QUEUE, Sdprintf("%d: %d waiters on %p; signalling\n",
3681 				PL_thread_self(), queue->waiting, queue));
3682       cv_signal(&queue->cond_var);
3683     }
3684   } else
3685   { DEBUG(MSG_QUEUE, Sdprintf("%d: no waiters on %p\n",
3686 			      PL_thread_self(), queue));
3687   }
3688 
3689   return TRUE;
3690 }
3691 
3692 
3693 		 /*******************************
3694 		 *     READING FROM A QUEUE	*
3695 		 *******************************/
3696 
3697 #if TIME_WITH_SYS_TIME
3698 # include <sys/time.h>
3699 # include <time.h>
3700 #else
3701 # if HAVE_SYS_TIME_H
3702 #  include <sys/time.h>
3703 # else
3704 #  include <time.h>
3705 # endif
3706 #endif
3707 #ifdef HAVE_FTIME
3708 #include <sys/timeb.h>
3709 #endif
3710 
3711 static void
timespec_diff(struct timespec * diff,const struct timespec * a,const struct timespec * b)3712 timespec_diff(struct timespec *diff,
3713 	      const struct timespec *a, const struct timespec *b)
3714 { diff->tv_sec  = a->tv_sec - b->tv_sec;
3715   diff->tv_nsec = a->tv_nsec - b->tv_nsec;
3716   if ( diff->tv_nsec < 0 )
3717   { --diff->tv_sec;
3718     diff->tv_nsec += 1000000000;
3719   }
3720 }
3721 
3722 
3723 static int
timespec_sign(const struct timespec * t)3724 timespec_sign(const struct timespec *t)
3725 { return ( t->tv_sec > 0 ?  1 :
3726 	   t->tv_sec < 0 ? -1 :
3727 	   t->tv_nsec > 0 ? 1 :
3728                             0 );
3729 
3730 }
3731 
3732 
3733 static int
timespec_cmp(const struct timespec * a,const struct timespec * b)3734 timespec_cmp(const struct timespec *a, const struct timespec *b)
3735 { struct timespec diff;
3736 
3737   timespec_diff(&diff, a, b);
3738 
3739   return timespec_sign(&diff);
3740 }
3741 
3742 
3743 void
get_current_timespec(struct timespec * time)3744 get_current_timespec(struct timespec *time)
3745 {
3746 #ifdef HAVE_CLOCK_GETTIME
3747   clock_gettime(CLOCK_REALTIME, time);
3748 #else
3749 #ifdef HAVE_GETTIMEOFDAY
3750   struct timeval now;
3751 
3752   gettimeofday(&now, NULL);
3753   time->tv_sec  = now.tv_sec;
3754   time->tv_nsec = now.tv_usec*1000;
3755 #else						/* HAVE_FTIME */
3756   struct timeb now;
3757 
3758   ftime(&now);
3759   time->tv_sec  = now.time;
3760   time->tv_nsec = now.millitm*1000000;
3761 #endif
3762 #endif
3763 }
3764 
3765 
3766 void
carry_timespec_nanos(struct timespec * time)3767 carry_timespec_nanos(struct timespec *time)
3768 { while ( time->tv_nsec >= 1000000000 )
3769   { time->tv_nsec -= 1000000000;
3770     time->tv_sec += 1;
3771   }
3772 }
3773 
3774 #ifdef __WINDOWS__
3775 
3776 int
cv_timedwait(message_queue * queue,CONDITION_VARIABLE * cond,CRITICAL_SECTION * mutex,struct timespec * deadline)3777 cv_timedwait(message_queue *queue,
3778 	     CONDITION_VARIABLE *cond, CRITICAL_SECTION *mutex,
3779 	     struct timespec *deadline)
3780 { GET_LD
3781   struct timespec tmp_timeout;
3782   DWORD api_timeout = 250;
3783   int last = FALSE;
3784   int rc;
3785 
3786   for(;;)
3787   { get_current_timespec(&tmp_timeout);
3788     tmp_timeout.tv_nsec += 250000000;
3789     carry_timespec_nanos(&tmp_timeout);
3790 
3791     if ( deadline && timespec_cmp(&tmp_timeout, deadline) >= 0 )
3792     { struct timespec d;
3793 
3794       get_current_timespec(&tmp_timeout);
3795       timespec_diff(&d, deadline, &tmp_timeout);
3796       if ( timespec_sign(&d) > 0 )
3797 	api_timeout = (d.tv_nsec + 1000000 - 1) / 1000000;
3798       else
3799 	return CV_TIMEDOUT;
3800 
3801       last = TRUE;
3802     }
3803 
3804     rc = SleepConditionVariableCS(cond, mutex, api_timeout);
3805 
3806     if ( is_signalled(LD) )
3807       return CV_INTR;
3808     if ( !rc )
3809       return last ? CV_TIMEDOUT : CV_MAYBE;
3810     if ( rc )
3811       return CV_READY;
3812   }
3813 }
3814 
3815 #else /*__WINDOWS__*/
3816 
3817 /* return: 0: ok, EINTR: interrupted, ETIMEDOUT: timeout
3818 */
3819 
3820 int
cv_timedwait(message_queue * queue,pthread_cond_t * cond,pthread_mutex_t * mutex,struct timespec * deadline)3821 cv_timedwait(message_queue *queue,
3822 	     pthread_cond_t *cond, pthread_mutex_t *mutex,
3823 	     struct timespec *deadline)
3824 { GET_LD
3825   struct timespec tmp_timeout;
3826   struct timespec *api_timeout = &tmp_timeout;
3827   int rc;
3828 
3829   for(;;)
3830   { get_current_timespec(&tmp_timeout);
3831     tmp_timeout.tv_nsec += 250000000;
3832     carry_timespec_nanos(&tmp_timeout);
3833 
3834     if ( deadline && timespec_cmp(&tmp_timeout, deadline) >= 0 )
3835       api_timeout = deadline;
3836 
3837     rc = pthread_cond_timedwait(cond, mutex, api_timeout);
3838     DEBUG(MSG_QUEUE_WAIT,
3839 	  Sdprintf("%d: wait on %p returned %d; size = %ld (%s)\n",
3840 		   PL_thread_self(), queue, rc,
3841 		   queue ? (long)queue->size : 0,
3842 		   api_timeout == deadline ? "final" : "0.25sec"));
3843 
3844     switch( rc )
3845     { case ETIMEDOUT:
3846 	if ( is_signalled(LD) )
3847 	  return CV_INTR;
3848 	if ( api_timeout == deadline )
3849 	  return CV_TIMEDOUT;
3850 	return CV_MAYBE;
3851       case EINTR:			/* can not happen in POSIX, but can in */
3852       case 0:				/* legacy systems */
3853 	if ( is_signalled(LD) )
3854 	  return CV_INTR;
3855         return CV_READY;
3856       default:
3857 	assert(0);
3858 	return CV_READY;
3859     }
3860   }
3861 }
3862 
3863 #endif /*__WINDOWS__*/
3864 
3865 static int
dispatch_cond_wait(message_queue * queue,queue_wait_type wait,struct timespec * deadline ARG_LD)3866 dispatch_cond_wait(message_queue *queue, queue_wait_type wait,
3867 		   struct timespec *deadline ARG_LD)
3868 { int rc;
3869 
3870   LD->thread.alert.obj.queue = queue;
3871   LD->thread.alert.type	     = wait == QUEUE_WAIT_READ ? ALERT_QUEUE_RD
3872 						       : ALERT_QUEUE_WR;
3873 
3874   rc = cv_timedwait(queue,
3875 		    (wait == QUEUE_WAIT_READ ? &queue->cond_var
3876 					     : &queue->drain_var),
3877 		    &queue->mutex,
3878 		    deadline);
3879 
3880   PL_LOCK(L_ALERT);
3881   LD->thread.alert.type = 0;
3882   LD->thread.alert.obj.queue = NULL;
3883   PL_UNLOCK(L_ALERT);
3884 
3885   return rc;
3886 }
3887 
3888 #ifdef O_QUEUE_STATS
3889 static uint64_t getmsg  = 0;
3890 static uint64_t unified = 0;
3891 static uint64_t skipped = 0;
3892 
3893 static void
msg_statistics(void)3894 msg_statistics(void)
3895 { Sdprintf("get_message: %lld, unified: %lld, skipped: %lld\n",
3896 	   getmsg, unified, skipped);
3897 }
3898 #define QSTAT(n) (n++)
3899 #else
3900 #define QSTAT(n) ((void)0)
3901 #endif
3902 
3903 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
3904 get_message() reads the next message from the  message queue. It must be
3905 called with queue->mutex locked.  It returns one of
3906 
3907 	* TRUE
3908 	* FALSE
3909 	* MSG_WAIT_INTR
3910 	  Got EINTR while waiting.
3911 	* MSG_WAIT_TIMEOUT
3912 	  Got timeout while waiting
3913 	* MSG_WAIT_DESTROYED
3914 	  Queue was destroyed while waiting
3915 
3916 (*) We need  to lock  because AGC  marks our atoms  while the  thread is
3917 running.  The thread may pick   a  message containing  an atom  from the
3918 queue,  which now has not  been   marked  and is  no longer part  of the
3919 queue, so it isn't marked in the queue either.
3920 
3921 Originally, I thought we  only  need  this   for  queues  that  are  not
3922 associated to threads. Those associated  with   a  thread  mark both the
3923 stacks and the queue in one pass,  marking the atoms in either. However,
3924 we also need to lock to avoid  get_message() destroying the record while
3925 markAtomsMessageQueue() scans it. This fixes the reopened Bug#142.
3926 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
3927 
3928 static int
get_message(message_queue * queue,term_t msg,struct timespec * deadline ARG_LD)3929 get_message(message_queue *queue, term_t msg, struct timespec *deadline ARG_LD)
3930 { int isvar = PL_is_variable(msg) ? 1 : 0;
3931   word key = (isvar ? 0L : getIndexOfTerm(msg));
3932   fid_t fid = PL_open_foreign_frame();
3933   uint64_t seen = 0;
3934 
3935   QSTAT(getmsg);
3936 
3937   DEBUG(MSG_QUEUE,
3938 	{ Sdprintf("%d: get_message(%p) key 0x%lx for ",
3939 		   PL_thread_self(), queue, key);
3940 	  PL_write_term(Serror, msg, 1200, PL_WRT_QUOTED|PL_WRT_NEWLINE);
3941 	});
3942 
3943   for(;;)
3944   { int rc;
3945     thread_message *msgp = queue->head;
3946     thread_message *prev = NULL;
3947 
3948     if ( queue->destroyed )
3949       return MSG_WAIT_DESTROYED;
3950 
3951     DEBUG(MSG_QUEUE,
3952 	  Sdprintf("%d: queue size=%ld\n",
3953 		   PL_thread_self(), (long)queue->size));
3954 
3955     for( ; msgp; prev = msgp, msgp = msgp->next )
3956     { term_t tmp;
3957 
3958       if ( msgp->sequence_id < seen )
3959       { QSTAT(skipped);
3960 	DEBUG(MSG_QUEUE, Sdprintf("Already seen %ld (<%ld)\n",
3961 				  (long)msgp->sequence_id, (long)seen));
3962 	continue;
3963       }
3964       seen = msgp->sequence_id;
3965 
3966       if ( key && msgp->key && key != msgp->key )
3967       { DEBUG(MSG_QUEUE, Sdprintf("Message key mismatch\n"));
3968 	continue;			/* fast search */
3969       }
3970 
3971       QSTAT(unified);
3972       tmp = PL_new_term_ref();
3973       if ( !PL_recorded(msgp->message, tmp) )
3974       { PL_discard_foreign_frame(fid);
3975         return raiseStackOverflow(GLOBAL_OVERFLOW);
3976       }
3977       DEBUG(MSG_QUEUE,
3978 	    { Sdprintf("%d: found term ", PL_thread_self());
3979 	      PL_write_term(Serror, tmp, 1200, PL_WRT_QUOTED|PL_WRT_NEWLINE);
3980 	    });
3981 
3982       rc = PL_unify(msg, tmp);
3983 
3984       if ( rc )
3985       { term_t ex = PL_new_term_ref();
3986 
3987 	if ( !(rc=foreignWakeup(ex PASS_LD)) )
3988 	{ if ( !isVar(*valTermRef(ex)) )
3989 	    PL_raise_exception(ex);
3990 	}
3991       }
3992 
3993       if ( rc )
3994       { DEBUG(MSG_QUEUE, Sdprintf("%d: match\n", PL_thread_self()));
3995 
3996 	if (GD->atoms.gc_active)
3997 	  markAtomsRecord(msgp->message);
3998 
3999         simpleMutexLock(&queue->gc_mutex);	/* see (*) */
4000 	if ( prev )
4001 	{ if ( !(prev->next = msgp->next) )
4002 	    queue->tail = prev;
4003 	} else
4004 	{ if ( !(queue->head = msgp->next) )
4005 	    queue->tail = NULL;
4006 	}
4007         simpleMutexUnlock(&queue->gc_mutex);
4008 
4009 	free_thread_message(msgp);
4010 	queue->size--;
4011 	if ( queue->wait_for_drain )
4012 	{ DEBUG(MSG_QUEUE, Sdprintf("Queue drained. wakeup writers\n"));
4013 	  cv_signal(&queue->drain_var);
4014 	}
4015 
4016 	PL_close_foreign_frame(fid);
4017 	return TRUE;
4018       } else if ( exception_term )
4019       { PL_close_foreign_frame(fid);
4020 	return FALSE;
4021       }
4022 
4023       PL_rewind_foreign_frame(fid);
4024     }
4025 
4026     queue->waiting++;
4027     queue->waiting_var += isvar;
4028     DEBUG(MSG_QUEUE_WAIT, Sdprintf("%d: waiting on queue\n", PL_thread_self()));
4029     rc = dispatch_cond_wait(queue, QUEUE_WAIT_READ, deadline PASS_LD);
4030     switch ( rc )
4031     { case CV_INTR:
4032       { DEBUG(MSG_QUEUE_WAIT, Sdprintf("%d: CV_INTR\n", PL_thread_self()));
4033 
4034 	if ( !LD )			/* needed for clean exit */
4035 	{ Sdprintf("Forced exit from get_message()\n");
4036 	  exit(1);
4037 	}
4038 
4039 	if ( is_signalled(LD) )		/* thread-signal */
4040 	{ queue->waiting--;
4041 	  queue->waiting_var -= isvar;
4042 	  PL_discard_foreign_frame(fid);
4043 	  return MSG_WAIT_INTR;
4044 	}
4045 	break;
4046       }
4047       case CV_TIMEDOUT:
4048       { DEBUG(MSG_QUEUE_WAIT, Sdprintf("%d: CV_TIMEDOUT\n", PL_thread_self()));
4049 
4050 	queue->waiting--;
4051 	queue->waiting_var -= isvar;
4052 	PL_discard_foreign_frame(fid);
4053 	return MSG_WAIT_TIMEOUT;
4054       }
4055       case CV_READY:
4056       case CV_MAYBE:
4057 	DEBUG(MSG_QUEUE_WAIT,
4058 	      Sdprintf("%d: wakeup (%d) on queue\n",
4059 		       PL_thread_self(), rc));
4060 	break;
4061       default:
4062 	assert(0);
4063     }
4064     queue->waiting--;
4065     queue->waiting_var -= isvar;
4066   }
4067 }
4068 
4069 
4070 static int
peek_message(message_queue * queue,term_t msg ARG_LD)4071 peek_message(message_queue *queue, term_t msg ARG_LD)
4072 { thread_message *msgp;
4073   term_t tmp = PL_new_term_ref();
4074   word key = getIndexOfTerm(msg);
4075   fid_t fid = PL_open_foreign_frame();
4076 
4077   msgp = queue->head;
4078 
4079   for( msgp = queue->head; msgp; msgp = msgp->next )
4080   { if ( key && msgp->key && key != msgp->key )
4081       continue;
4082 
4083     if ( !PL_recorded(msgp->message, tmp) )
4084       return raiseStackOverflow(GLOBAL_OVERFLOW);
4085 
4086     if ( PL_unify(msg, tmp) )
4087       return TRUE;
4088     else if ( exception_term )
4089       return FALSE;
4090 
4091     PL_rewind_foreign_frame(fid);
4092   }
4093 
4094   return FALSE;
4095 }
4096 
4097 
4098 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
4099 Deletes the contents of the message-queue as well as the queue itself.
4100 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
4101 
4102 static void
destroy_message_queue(message_queue * queue)4103 destroy_message_queue(message_queue *queue)
4104 { thread_message *msgp;
4105   thread_message *next;
4106 
4107   if ( GD->cleaning || !queue->initialized )
4108     return;				/* deallocation is centralised */
4109   queue->initialized = FALSE;
4110 
4111   assert(!queue->waiting && !queue->wait_for_drain);
4112 
4113   for( msgp = queue->head; msgp; msgp = next )
4114   { next = msgp->next;
4115 
4116     free_thread_message(msgp);
4117   }
4118 
4119   simpleMutexDelete(&queue->gc_mutex);
4120   cv_destroy(&queue->cond_var);
4121   if ( queue->max_size > 0 )
4122     cv_destroy(&queue->drain_var);
4123   if ( !queue->anonymous )
4124     simpleMutexDelete(&queue->mutex);
4125 }
4126 
4127 
4128 /* destroy the input queue of a thread.  We have to take care of the case
4129    where our input queue is been waited for by another thread.  This is
4130    similar to message_queue_destroy/1.
4131  */
4132 
4133 static void
destroy_thread_message_queue(message_queue * q)4134 destroy_thread_message_queue(message_queue *q)
4135 { int done = FALSE;
4136 
4137   if (!q->initialized )
4138     return;
4139 
4140   while(!done)
4141   { simpleMutexLock(&q->mutex);
4142     q->destroyed = TRUE;
4143     if ( q->waiting || q->wait_for_drain )
4144     { if ( q->waiting )
4145 	cv_broadcast(&q->cond_var);
4146       if ( q->wait_for_drain )
4147 	cv_broadcast(&q->drain_var);
4148     } else
4149       done = TRUE;
4150     simpleMutexUnlock(&q->mutex);
4151   }
4152 
4153   destroy_message_queue(q);
4154 }
4155 
4156 
4157 static void
init_message_queue(message_queue * queue,size_t max_size)4158 init_message_queue(message_queue *queue, size_t max_size)
4159 { memset(queue, 0, sizeof(*queue));
4160   simpleMutexInit(&queue->mutex);
4161   simpleMutexInit(&queue->gc_mutex);
4162   cv_init(&queue->cond_var, NULL);
4163   queue->max_size = max_size;
4164   if ( queue->max_size != 0 )
4165     cv_init(&queue->drain_var, NULL);
4166   queue->initialized = TRUE;
4167 }
4168 
4169 
4170 static size_t
sizeof_message_queue(message_queue * queue)4171 sizeof_message_queue(message_queue *queue)
4172 { size_t size = 0;
4173   thread_message *msgp;
4174 
4175   simpleMutexLock(&queue->gc_mutex);
4176   for( msgp = queue->head; msgp; msgp = msgp->next )
4177   { size += sizeof(*msgp);
4178     size += msgp->message->size;
4179   }
4180   simpleMutexUnlock(&queue->gc_mutex);
4181 
4182   return size;
4183 }
4184 
4185 					/* Prolog predicates */
4186 
4187 static const opt_spec thread_get_message_options[] =
4188 { { ATOM_timeout,	OPT_DOUBLE },
4189   { ATOM_deadline,	OPT_DOUBLE },
4190   { NULL_ATOM,		0 }
4191 };
4192 
4193 #ifndef DBL_MAX
4194 #define DBL_MAX         1.7976931348623158e+308
4195 #endif
4196 
4197 /* This function is shared between thread_get_message/3 and
4198    thread_send_message/3.
4199 
4200    It extracts a deadline from the deadline/1 and timeout/1 options.
4201    In both cases, the deadline is passed through to dispatch_cond_wait().
4202    Semantics are relatively simple:
4203 
4204 	1. If neither option is given, the deadline is NULL, which
4205 	   corresponds to an indefinite wait, or a deadline in the
4206 	   infinite future.
4207 	2. A timeout is _exactly_ like a deadline of Now + Timeout,
4208 	   where Now is evaluated near the beginning of this function.
4209 	3. If both deadline and a timeout options are given, the
4210 	   earlier deadline is effective.
4211 	4. If the effective deadline is before Now, then return
4212 	   FALSE (leading to failure).
4213 */
4214 
4215 static int
process_deadline_options(term_t options,struct timespec * ts,struct timespec ** pts)4216 process_deadline_options(term_t options,
4217 			 struct timespec *ts, struct timespec **pts)
4218 { struct timespec now;
4219   struct timespec deadline;
4220   struct timespec timeout;
4221   struct timespec *dlop=NULL;
4222   double tmo = DBL_MAX;
4223   double dlo = DBL_MAX;
4224 
4225   if ( !scan_options(options, 0,
4226 		     ATOM_thread_get_message_option, thread_get_message_options,
4227 		     &tmo, &dlo) )
4228     return FALSE;
4229 
4230   get_current_timespec(&now);
4231 
4232   if ( dlo != DBL_MAX )
4233   { double ip, fp;
4234 
4235     fp = modf(dlo, &ip);
4236     deadline.tv_sec = (time_t)ip;
4237     deadline.tv_nsec = (long)(fp*1000000000.0);
4238     dlop = &deadline;
4239 
4240     if ( timespec_cmp(&deadline,&now) < 0 ) // if deadline in the past...
4241       return FALSE;                         // ... then FAIL
4242   }
4243 
4244   // timeout option is processed exactly as if the deadline
4245   // was set to now + timeout.
4246   if ( tmo != DBL_MAX )
4247   { if ( tmo > 0.0 )
4248     { double ip, fp=modf(tmo,&ip);
4249 
4250       timeout.tv_sec  = now.tv_sec + (time_t)ip;
4251       timeout.tv_nsec = now.tv_nsec + (long)(fp*1000000000.0);
4252       carry_timespec_nanos(&timeout);
4253       if ( dlop==NULL || timespec_cmp(&timeout,&deadline) < 0 )
4254 	dlop = &timeout;
4255     } else if ( tmo == 0.0 )
4256     { dlop = &now;				/* scan once */
4257     } else               // if timeout is negative ...
4258       return FALSE;      // ... then FAIL
4259   }
4260   if (dlop)
4261   { *ts  = *dlop;
4262     *pts = ts;
4263   } else
4264   { *pts=NULL;
4265   }
4266 
4267   return TRUE;
4268 }
4269 
4270 
4271 static int
wait_queue_message(term_t qterm,message_queue * q,thread_message * msg,struct timespec * deadline ARG_LD)4272 wait_queue_message(term_t qterm, message_queue *q, thread_message *msg,
4273 		   struct timespec *deadline ARG_LD)
4274 { int rc;
4275 
4276   for(;;)
4277   { rc = queue_message(q, msg, deadline PASS_LD);
4278 
4279     switch(rc)
4280     { case MSG_WAIT_INTR:
4281       { if ( PL_handle_signals() >= 0 )
4282 	  continue;
4283 	rc = FALSE;
4284 	break;
4285       }
4286       case MSG_WAIT_DESTROYED:
4287       { if ( qterm )
4288 	  PL_existence_error("message_queue", qterm);
4289 	rc = FALSE;
4290 	break;
4291       }
4292       case MSG_WAIT_TIMEOUT:
4293 	rc = FALSE;
4294         break;
4295       case TRUE:
4296 	break;
4297       default:
4298 	assert(0);
4299     }
4300 
4301     break;
4302   }
4303 
4304   return rc;
4305 }
4306 
4307 static int
thread_send_message__LD(term_t queue,term_t msgterm,struct timespec * deadline ARG_LD)4308 thread_send_message__LD(term_t queue, term_t msgterm,
4309 			struct timespec *deadline ARG_LD)
4310 { message_queue *q;
4311   thread_message *msg;
4312   int rc;
4313 
4314   if ( !(msg = create_thread_message(msgterm PASS_LD)) )
4315     return PL_no_memory();
4316 
4317   if ( !get_message_queue__LD(queue, &q PASS_LD) )
4318   { free_thread_message(msg);
4319     return FALSE;
4320   }
4321 
4322   rc = wait_queue_message(queue, q, msg, deadline PASS_LD);
4323   release_message_queue(q);
4324 
4325   if ( rc == FALSE )
4326     free_thread_message(msg);
4327 
4328   return rc;
4329 }
4330 
4331 static
4332 PRED_IMPL("thread_send_message", 2, thread_send_message, PL_FA_ISO)
4333 { PRED_LD
4334 
4335   return thread_send_message__LD(A1, A2, NULL PASS_LD);
4336 }
4337 
4338 static
4339 PRED_IMPL("thread_send_message", 3, thread_send_message, 0)
4340 { PRED_LD
4341   struct timespec deadline;
4342   struct timespec *dlop=NULL;
4343 
4344   return process_deadline_options(A3,&deadline,&dlop)
4345     &&   thread_send_message__LD(A1, A2, dlop PASS_LD);
4346 }
4347 
4348 
4349 
4350 static
4351 PRED_IMPL("thread_get_message", 1, thread_get_message, PL_FA_ISO)
4352 { PRED_LD
4353   int rc;
4354 
4355   for(;;)
4356   { simpleMutexLock(&LD->thread.messages.mutex);
4357     rc = get_message(&LD->thread.messages, A1, NULL PASS_LD);
4358     simpleMutexUnlock(&LD->thread.messages.mutex);
4359 
4360     if ( rc == MSG_WAIT_INTR )
4361     { if ( PL_handle_signals() >= 0 )
4362 	continue;
4363       rc = FALSE;
4364     }
4365 
4366     break;
4367   }
4368 
4369   return rc;
4370 }
4371 
4372 
4373 static
4374 PRED_IMPL("thread_peek_message", 1, thread_peek_message_1, PL_FA_ISO)
4375 { PRED_LD
4376   int rc;
4377 
4378   simpleMutexLock(&LD->thread.messages.mutex);
4379   rc = peek_message(&LD->thread.messages, A1 PASS_LD);
4380   simpleMutexUnlock(&LD->thread.messages.mutex);
4381 
4382   return rc;
4383 }
4384 
4385 
4386 		 /*******************************
4387 		 *     USER MESSAGE QUEUES	*
4388 		 *******************************/
4389 
4390 typedef struct mqref
4391 { message_queue *queue;
4392 } mqref;
4393 
4394 static int
write_message_queue_ref(IOSTREAM * s,atom_t aref,int flags)4395 write_message_queue_ref(IOSTREAM *s, atom_t aref, int flags)
4396 { mqref *ref = PL_blob_data(aref, NULL, NULL);
4397   (void)flags;
4398 
4399   Sfprintf(s, "<message_queue>(%p)", ref->queue);
4400   return TRUE;
4401 }
4402 
4403 
4404 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
4405 GC a message queue from the atom  garbage collector. This should be fine
4406 because atoms in messages do  not  have   locked  atoms,  so  we are not
4407 calling atom functions.
4408 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
4409 
4410 static int
release_message_queue_ref(atom_t aref)4411 release_message_queue_ref(atom_t aref)
4412 { mqref *ref = PL_blob_data(aref, NULL, NULL);
4413   message_queue *q;
4414 
4415   DEBUG(MSG_QUEUE_GC,
4416 	Sdprintf("GC message_queue %p\n", ref->queue));
4417 
4418   if ( (q=ref->queue) )
4419   { destroy_message_queue(q);			/* can be called twice */
4420     if ( !q->destroyed )
4421       deleteHTable(queueTable, (void *)q->id);
4422     simpleMutexDelete(&q->mutex);
4423     PL_free(q);
4424   }
4425 
4426   return TRUE;
4427 }
4428 
4429 
4430 static int
save_message_queue(atom_t aref,IOSTREAM * fd)4431 save_message_queue(atom_t aref, IOSTREAM *fd)
4432 { mqref *ref = PL_blob_data(aref, NULL, NULL);
4433   (void)fd;
4434 
4435   return PL_warning("Cannot save reference to <message_queue>(%p)", ref->queue);
4436 }
4437 
4438 
4439 static atom_t
load_message_queue(IOSTREAM * fd)4440 load_message_queue(IOSTREAM *fd)
4441 { (void)fd;
4442 
4443   return PL_new_atom("<saved-message-queue-ref>");
4444 }
4445 
4446 
4447 static PL_blob_t message_queue_blob =
4448 { PL_BLOB_MAGIC,
4449   PL_BLOB_UNIQUE,
4450   "message_queue",
4451   release_message_queue_ref,
4452   NULL,
4453   write_message_queue_ref,
4454   NULL,
4455   save_message_queue,
4456   load_message_queue
4457 };
4458 
4459 
4460 static void
initMessageQueues(void)4461 initMessageQueues(void)
4462 { message_queue_blob.atom_name = ATOM_message_queue;
4463   PL_register_blob_type(&message_queue_blob);
4464 }
4465 
4466 static int
unify_queue(term_t t,message_queue * q)4467 unify_queue(term_t t, message_queue *q)
4468 { GET_LD
4469 
4470   return PL_unify_atom(t, q->id);
4471 }
4472 
4473 
4474 static void
free_queue_symbol(void * name,void * value)4475 free_queue_symbol(void *name, void *value)
4476 { message_queue *q = value;
4477 
4478   destroy_message_queue(q);			/* must this be synced? */
4479   PL_free(q);
4480 }
4481 
4482 
4483 static message_queue *
unlocked_message_queue_create(term_t queue,long max_size)4484 unlocked_message_queue_create(term_t queue, long max_size)
4485 { GET_LD
4486   atom_t name = NULL_ATOM;
4487   message_queue *q;
4488   word id;
4489 
4490   if ( !queueTable )
4491   { simpleMutexInit(&queueTable_mutex);
4492     queueTable = newHTable(16);
4493     queueTable->free_symbol = free_queue_symbol;
4494   }
4495 
4496   if ( PL_get_atom(queue, &name) )
4497   { if ( lookupHTable(queueTable, (void *)name) ||
4498 	 lookupHTable(threadTable, (void *)name) )
4499     { PL_error("message_queue_create", 1, NULL, ERR_PERMISSION,
4500 	       ATOM_create, ATOM_message_queue, queue);
4501       return NULL;
4502     }
4503     id = name;
4504   } else if ( PL_is_variable(queue) )
4505   { id = 0;
4506   } else
4507   { PL_error("message_queue_create", 1, NULL,
4508 	     ERR_TYPE, ATOM_message_queue, queue);
4509     return NULL;
4510   }
4511 
4512   q = PL_malloc(sizeof(*q));
4513   init_message_queue(q, max_size);
4514   q->type = QTYPE_QUEUE;
4515   if ( !id )
4516   { mqref ref;
4517     int new;
4518 
4519     ref.queue = q;
4520     q->id = lookupBlob((void*)&ref, sizeof(ref), &message_queue_blob, &new);
4521     q->anonymous = TRUE;
4522   } else
4523   { q->id = id;
4524   }
4525   addNewHTable(queueTable, (void *)q->id, q);
4526 
4527   if ( unify_queue(queue, q) )
4528   { if ( q->anonymous )
4529       PL_unregister_atom(q->id);		/* reclaim on GC */
4530     else
4531       PL_register_atom(q->id);			/* protect against reclaim */
4532     return q;
4533   }
4534 
4535   return NULL;
4536 }
4537 
4538 
4539 /* MT: Caller must hold the L_THREAD mutex
4540 
4541    Note that this version does not deal with anonymous queues.  High
4542    level code must use get_message_queue__LD();
4543 */
4544 
4545 static int
get_message_queue_unlocked__LD(term_t t,message_queue ** queue ARG_LD)4546 get_message_queue_unlocked__LD(term_t t, message_queue **queue ARG_LD)
4547 { atom_t name;
4548   word id = 0;
4549   int tid = 0;
4550 
4551   if ( PL_get_atom(t, &name) )
4552   { thread_handle *er;
4553 
4554     if ( (er=symbol_thread_handle(name)) )
4555     { if ( er->info )
4556 	tid = er->engine_id;
4557       else
4558 	return PL_existence_error("thread", t);
4559     } else
4560       id = name;
4561   } else if ( !PL_get_integer(t, &tid) )
4562   { return PL_type_error("message_queue", t);
4563   }
4564 
4565   if ( tid > 0 )
4566   { have_tid:
4567     if ( tid >= 1 && tid <= GD->thread.highest_id )
4568     { PL_thread_info_t *info = GD->thread.threads[tid];
4569 
4570       if ( info->status == PL_THREAD_UNUSED ||
4571 	   info->status == PL_THREAD_RESERVED ||
4572 	   !info->thread_data )
4573 	return PL_existence_error("thread", t);
4574 
4575       *queue = &info->thread_data->thread.messages;
4576       return TRUE;
4577     }
4578 
4579     return PL_type_error("thread", t);
4580   }
4581 
4582   if ( !id )
4583     return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_message_queue, t);
4584 
4585   if ( queueTable )
4586   { message_queue *q;
4587     if ( (q = lookupHTable(queueTable, (void *)id)) )
4588     { *queue = q;
4589       return TRUE;
4590     }
4591   }
4592   if ( threadTable )
4593   { word w;
4594 
4595     if ( (w = (word)lookupHTable(threadTable, (void *)id)) )
4596     { thread_handle *th;
4597 
4598       if ( (th=symbol_thread_handle(w)) )
4599       { tid = th->engine_id;
4600 	goto have_tid;
4601       }
4602     }
4603   }
4604 
4605   return PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, t);
4606 }
4607 
4608 
4609 /* Get a message queue and lock it
4610 */
4611 
4612 static int
get_message_queue__LD(term_t t,message_queue ** queue ARG_LD)4613 get_message_queue__LD(term_t t, message_queue **queue ARG_LD)
4614 { int rc;
4615   message_queue *q;
4616   PL_blob_t *type;
4617   void *data;
4618 
4619   if ( PL_get_blob(t, &data, NULL, &type) && type == &message_queue_blob )
4620   { mqref *ref = data;
4621 
4622     q = ref->queue;
4623     simpleMutexLock(&q->mutex);
4624     if ( !q->destroyed )
4625     { *queue = q;
4626       return TRUE;
4627     }
4628     simpleMutexUnlock(&q->mutex);
4629     return PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, t);
4630   }
4631 
4632   PL_LOCK(L_THREAD);
4633   rc = get_message_queue_unlocked__LD(t, queue PASS_LD);
4634   if ( rc )
4635   { message_queue *q = *queue;
4636 
4637     simpleMutexLock(&q->mutex);
4638     if ( q->destroyed )
4639     { rc = PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, t);
4640       simpleMutexUnlock(&q->mutex);
4641     }
4642   }
4643   PL_UNLOCK(L_THREAD);
4644 
4645   return rc;
4646 }
4647 
4648 
4649 /* Release a message queue, deleting it if it is no longer needed
4650 */
4651 
4652 static void
release_message_queue(message_queue * queue)4653 release_message_queue(message_queue *queue)
4654 { int del = ( queue->destroyed &&
4655 	      !(queue->waiting || queue->wait_for_drain) &&
4656 	      queue->type != QTYPE_THREAD );
4657 
4658   simpleMutexUnlock(&queue->mutex);
4659 
4660   if ( del )
4661   { destroy_message_queue(queue);
4662     if ( !queue->anonymous )
4663       PL_free(queue);
4664   }
4665 }
4666 
4667 
4668 static
4669 PRED_IMPL("message_queue_create", 1, message_queue_create, 0)
4670 { int rval;
4671 
4672   PL_LOCK(L_THREAD);
4673   rval = (unlocked_message_queue_create(A1, 0) ? TRUE : FALSE);
4674   PL_UNLOCK(L_THREAD);
4675 
4676   return rval;
4677 }
4678 
4679 
4680 static const opt_spec message_queue_options[] =
4681 { { ATOM_alias,		OPT_ATOM },
4682   { ATOM_max_size,	OPT_SIZE },
4683   { NULL_ATOM,		0 }
4684 };
4685 
4686 
4687 static
4688 PRED_IMPL("message_queue_create", 2, message_queue_create2, 0)
4689 { PRED_LD
4690   atom_t alias = 0;
4691   size_t max_size = 0;			/* to be processed */
4692   message_queue *q;
4693 
4694   if ( !scan_options(A2, 0,
4695 		     ATOM_queue_option, message_queue_options,
4696 		     &alias,
4697 		     &max_size) )
4698     fail;
4699 
4700   if ( alias )
4701   { if ( !PL_unify_atom(A1, alias) )
4702       return PL_error(NULL, 0, NULL, ERR_TYPE, ATOM_variable, A1);
4703   }
4704 
4705   PL_LOCK(L_THREAD);
4706   q = unlocked_message_queue_create(A1, max_size);
4707   PL_UNLOCK(L_THREAD);
4708 
4709   return q ? TRUE : FALSE;
4710 }
4711 
4712 
4713 static
4714 PRED_IMPL("message_queue_destroy", 1, message_queue_destroy, 0)
4715 { PRED_LD
4716   message_queue *q;
4717 
4718   if ( !get_message_queue__LD(A1, &q PASS_LD) )
4719     return FALSE;
4720 
4721   if ( q->type == QTYPE_THREAD )
4722   { release_message_queue(q);
4723 
4724     return PL_error(NULL, 0, "is a thread-queue", ERR_PERMISSION,
4725 		    ATOM_destroy, ATOM_message_queue, A1);
4726   }
4727 
4728   simpleMutexLock(&queueTable_mutex);	/* see markAtomsMessageQueues() */
4729   deleteHTable(queueTable, (void*)q->id);
4730   simpleMutexUnlock(&queueTable_mutex);
4731 
4732   if ( !q->anonymous )
4733     PL_unregister_atom(q->id);
4734 
4735   simpleMutexLock(&q->gc_mutex);	/* see markAtomsMessageQueues() */
4736   q->destroyed = TRUE;
4737   simpleMutexUnlock(&q->gc_mutex);
4738 
4739   if ( q->waiting )
4740     cv_broadcast(&q->cond_var);
4741   if ( q->wait_for_drain )
4742     cv_broadcast(&q->drain_var);
4743 
4744   release_message_queue(q);
4745 
4746   return TRUE;
4747 }
4748 
4749 
4750 		 /*******************************
4751 		 *    MESSAGE QUEUE PROPERTY	*
4752 		 *******************************/
4753 
4754 static int		/* message_queue_property(Queue, alias(Name)) */
message_queue_alias_property(message_queue * q,term_t prop ARG_LD)4755 message_queue_alias_property(message_queue *q, term_t prop ARG_LD)
4756 { if ( !q->anonymous )
4757     return PL_unify_atom(prop, q->id);
4758 
4759   fail;
4760 }
4761 
4762 
4763 static int		/* message_queue_property(Queue, size(Size)) */
message_queue_size_property(message_queue * q,term_t prop ARG_LD)4764 message_queue_size_property(message_queue *q, term_t prop ARG_LD)
4765 { return PL_unify_integer(prop, q->size);
4766 }
4767 
4768 
4769 static int		/* message_queue_property(Queue, max_size(Size)) */
message_queue_max_size_property(message_queue * q,term_t prop ARG_LD)4770 message_queue_max_size_property(message_queue *q, term_t prop ARG_LD)
4771 { size_t ms;
4772 
4773   if ( (ms=q->max_size) > 0 )
4774     return PL_unify_integer(prop, ms);
4775 
4776   fail;
4777 }
4778 
4779 static int		/* message_queue_property(Queue, waiting(Count)) */
message_queue_waiting_property(message_queue * q,term_t prop ARG_LD)4780 message_queue_waiting_property(message_queue *q, term_t prop ARG_LD)
4781 { int waiting;
4782 
4783   if ( (waiting=q->waiting) > 0 )
4784     return PL_unify_integer(prop, waiting);
4785 
4786   fail;
4787 }
4788 
4789 static const tprop qprop_list [] =
4790 { { FUNCTOR_alias1,	    message_queue_alias_property },
4791   { FUNCTOR_size1,	    message_queue_size_property },
4792   { FUNCTOR_max_size1,	    message_queue_max_size_property },
4793   { FUNCTOR_waiting1,	    message_queue_waiting_property },
4794   { 0,			    NULL }
4795 };
4796 
4797 
4798 typedef struct
4799 { TableEnum e;				/* Enumerator on queue-table */
4800   message_queue *q;			/* current queue */
4801   const tprop *p;			/* Pointer in properties */
4802   int enum_properties;			/* Enumerate the properties */
4803 } qprop_enum;
4804 
4805 
4806 static int
advance_qstate(qprop_enum * state)4807 advance_qstate(qprop_enum *state)
4808 { if ( state->enum_properties )
4809   { state->p++;
4810     if ( state->p->functor )
4811       succeed;
4812 
4813     state->p = qprop_list;
4814   }
4815   if ( state->e )
4816   { message_queue *q;
4817 
4818     if ( advanceTableEnum(state->e, NULL, (void**)&q) )
4819     { state->q = q;
4820 
4821       succeed;
4822     }
4823   }
4824 
4825   fail;
4826 }
4827 
4828 
4829 static void
free_qstate(qprop_enum * state)4830 free_qstate(qprop_enum *state)
4831 { if ( state->e )
4832     freeTableEnum(state->e);
4833 
4834   freeForeignState(state, sizeof(*state));
4835 }
4836 
4837 
4838 static
4839 PRED_IMPL("message_queue_property", 2, message_property, PL_FA_NONDETERMINISTIC)
4840 { PRED_LD
4841   term_t queue = A1;
4842   term_t property = A2;
4843   qprop_enum statebuf;
4844   qprop_enum *state;
4845 
4846   switch( CTX_CNTRL )
4847   { case FRG_FIRST_CALL:
4848     { memset(&statebuf, 0, sizeof(statebuf));
4849       state = &statebuf;
4850 
4851       if ( PL_is_variable(queue) )
4852       { if ( !queueTable )
4853 	  return FALSE;
4854 
4855 	switch( get_prop_def(property, ATOM_message_queue_property,
4856 			     qprop_list, &state->p) )
4857 	{ case 1:
4858 	    state->e = newTableEnum(queueTable);
4859 	    goto enumerate;
4860 	  case 0:
4861 	    state->e = newTableEnum(queueTable);
4862 	    state->p = qprop_list;
4863 	    state->enum_properties = TRUE;
4864 	    goto enumerate;
4865 	  case -1:
4866 	    fail;
4867 	}
4868       } else if ( get_message_queue__LD(queue, &state->q PASS_LD) )
4869       { release_message_queue(state->q); /* FIXME: we need some form of locking! */
4870 
4871 	switch( get_prop_def(property, ATOM_message_queue_property,
4872 			     qprop_list, &state->p) )
4873 	{ case 1:
4874 	    goto enumerate;
4875 	  case 0:
4876 	    state->p = qprop_list;
4877 	    state->enum_properties = TRUE;
4878 	    goto enumerate;
4879 	  case -1:
4880 	    fail;
4881 	}
4882       } else
4883       { fail;
4884       }
4885     }
4886     case FRG_REDO:
4887       state = CTX_PTR;
4888       break;
4889     case FRG_CUTTED:
4890       state = CTX_PTR;
4891       free_qstate(state);
4892       succeed;
4893     default:
4894       assert(0);
4895       fail;
4896   }
4897 
4898 enumerate:
4899   if ( !state->q )			/* first time, enumerating queues */
4900   { message_queue *q;
4901 
4902     assert(state->e);
4903     if ( advanceTableEnum(state->e, NULL, (void**)&q) )
4904     { state->q = q;
4905     } else
4906     { freeTableEnum(state->e);
4907       assert(state == &statebuf);
4908       fail;
4909     }
4910   }
4911 
4912 
4913   { term_t a1 = PL_new_term_ref();
4914 
4915     if ( !state->enum_properties )
4916       _PL_get_arg(1, property, a1);
4917 
4918     for(;;)
4919     { if ( (*state->p->function)(state->q, a1 PASS_LD) )
4920       { if ( state->enum_properties )
4921 	{ if ( !PL_unify_term(property,
4922 			      PL_FUNCTOR, state->p->functor,
4923 			        PL_TERM, a1) )
4924 	    goto error;
4925 	}
4926 	if ( state->e )
4927 	{ if ( !unify_queue(queue, state->q) )
4928 	    goto error;
4929 	}
4930 
4931 	if ( advance_qstate(state) )
4932 	{ if ( state == &statebuf )
4933 	  { qprop_enum *copy = allocForeignState(sizeof(*copy));
4934 
4935 	    *copy = *state;
4936 	    state = copy;
4937 	  }
4938 
4939 	  ForeignRedoPtr(state);
4940 	}
4941 
4942 	if ( state != &statebuf )
4943 	  free_qstate(state);
4944         else if ( state->e )
4945           freeTableEnum(state->e);
4946 	succeed;
4947       }
4948 
4949       if ( !advance_qstate(state) )
4950       { error:
4951 	if ( state != &statebuf )
4952 	  free_qstate(state);
4953         else if ( state->e )
4954           freeTableEnum(state->e);
4955 	fail;
4956       }
4957     }
4958   }
4959 }
4960 
4961 static
4962 PRED_IMPL("message_queue_set", 2, message_queue_set, 0)
4963 { PRED_LD
4964   message_queue *q;
4965   atom_t name;
4966   size_t arity;
4967   int rc;
4968 
4969   if ( !get_message_queue__LD(A1, &q PASS_LD) )
4970     return FALSE;
4971 
4972   if ( PL_get_name_arity(A2, &name, &arity) && arity == 1 )
4973   { term_t a = PL_new_term_ref();
4974 
4975     _PL_get_arg(1, A2, a);
4976     if ( name == ATOM_max_size )
4977     { size_t mx;
4978 
4979       if ( (rc=PL_get_size_ex(a, &mx)) )
4980       { size_t omax = q->max_size;
4981 
4982 	q->max_size = mx;
4983 
4984 	if ( mx > omax && q->wait_for_drain )
4985 	{ DEBUG(MSG_QUEUE, Sdprintf("Queue drained. wakeup writers\n"));
4986 	  cv_signal(&q->drain_var);
4987 	}
4988 
4989 	rc = TRUE;
4990       }
4991     } else
4992     { rc = PL_domain_error("message_queue_property", A2);
4993     }
4994   } else
4995     rc = PL_type_error("compound", A2);
4996 
4997   release_message_queue(q);
4998 
4999   return rc;
5000 }
5001 
5002 
5003 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5004 thread_get_message(+Queue, -Message)
5005 thread_get_message(-Message)
5006     Get a message from a message queue. If the queue is not provided get
5007     a message from the queue implicitly associated to the thread.
5008 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
5009 
5010 static int
thread_get_message__LD(term_t queue,term_t msg,struct timespec * deadline ARG_LD)5011 thread_get_message__LD(term_t queue, term_t msg, struct timespec *deadline ARG_LD)
5012 { int rc;
5013 
5014   for(;;)
5015   { message_queue *q;
5016 
5017     if ( !get_message_queue__LD(queue, &q PASS_LD) )
5018       return FALSE;
5019 
5020     rc = get_message(q, msg, deadline PASS_LD);
5021     release_message_queue(q);
5022 
5023     switch(rc)
5024     { case MSG_WAIT_INTR:
5025 	if ( PL_handle_signals() >= 0 )
5026 	  continue;
5027 	rc = FALSE;
5028 	break;
5029       case MSG_WAIT_DESTROYED:
5030 	rc = PL_error(NULL, 0, NULL, ERR_EXISTENCE, ATOM_message_queue, queue);
5031         break;
5032       case MSG_WAIT_TIMEOUT:
5033 	rc = FALSE;
5034         break;
5035       default:
5036 	;
5037     }
5038 
5039     break;
5040   }
5041 
5042   return rc;
5043 }
5044 
5045 
5046 static
5047 PRED_IMPL("thread_get_message", 2, thread_get_message, 0)
5048 { PRED_LD
5049 
5050   return thread_get_message__LD(A1, A2, NULL PASS_LD);
5051 }
5052 
5053 
5054 static
5055 PRED_IMPL("thread_get_message", 3, thread_get_message, 0)
5056 { PRED_LD
5057   struct timespec deadline;
5058   struct timespec *dlop=NULL;
5059 
5060   return process_deadline_options(A3,&deadline,&dlop)
5061     &&   thread_get_message__LD(A1, A2, dlop PASS_LD);
5062 }
5063 
5064 
5065 static
5066 PRED_IMPL("thread_peek_message", 2, thread_peek_message_2, 0)
5067 { PRED_LD
5068   message_queue *q;
5069   int rc;
5070 
5071   if ( !get_message_queue__LD(A1, &q PASS_LD) )
5072     fail;
5073 
5074   rc = peek_message(q, A2 PASS_LD);
5075   release_message_queue(q);
5076   return rc;
5077 }
5078 
5079 
5080 		 /*******************************
5081 		 *	 MUTEX PRIMITIVES	*
5082 		 *******************************/
5083 
5084 #ifdef NEED_RECURSIVE_MUTEX_INIT
5085 
5086 #ifdef RECURSIVE_MUTEXES
5087 static int
recursive_attr(pthread_mutexattr_t ** ap)5088 recursive_attr(pthread_mutexattr_t **ap)
5089 { static int done;
5090   static pthread_mutexattr_t attr;
5091   int rc;
5092 
5093   if ( done )
5094   { *ap = &attr;
5095     return 0;
5096   }
5097 
5098   PL_LOCK(L_THREAD);
5099   if ( done )
5100   { PL_UNLOCK(L_THREAD);
5101 
5102     *ap = &attr;
5103     return 0;
5104   }
5105   if ( (rc=pthread_mutexattr_init(&attr)) )
5106     goto error;
5107 #ifdef HAVE_PTHREAD_MUTEXATTR_SETTYPE
5108   if ( (rc=pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)) )
5109     goto error;
5110 #else
5111 #ifdef HAVE_PTHREAD_MUTEXATTR_SETKIND_NP
5112   if ( (rc=pthread_mutexattr_setkind_np(&attr, PTHREAD_MUTEX_RECURSIVE_NP)) )
5113     goto error;
5114 #endif
5115 #endif
5116 
5117   done = TRUE;
5118   PL_UNLOCK(L_THREAD);
5119   *ap = &attr;
5120 
5121   return 0;
5122 
5123 error:
5124   PL_UNLOCK(L_THREAD);
5125   return rc;
5126 }
5127 #endif
5128 
5129 int
recursiveMutexInit(recursiveMutex * m)5130 recursiveMutexInit(recursiveMutex *m)
5131 {
5132 #ifdef RECURSIVE_MUTEXES
5133   pthread_mutexattr_t *attr = NULL;
5134   int rc;
5135 
5136   if ( (rc=recursive_attr(&attr)) )
5137     return rc;
5138 
5139   return pthread_mutex_init(m, attr);
5140 
5141 #else /*RECURSIVE_MUTEXES*/
5142 
5143   m->owner = 0;
5144   m->count = 0;
5145   return pthread_mutex_init(&(m->lock), NULL);
5146 
5147 #endif /* RECURSIVE_MUTEXES */
5148 }
5149 
5150 #endif /*NEED_RECURSIVE_MUTEX_INIT*/
5151 
5152 #ifdef NEED_RECURSIVE_MUTEX_DELETE
5153 
5154 int
recursiveMutexDelete(recursiveMutex * m)5155 recursiveMutexDelete(recursiveMutex *m)
5156 { if ( m->owner != 0 )
5157     return EBUSY;
5158 
5159   return pthread_mutex_destroy(&(m->lock));
5160 }
5161 
5162 #endif /*NEED_RECURSIVE_MUTEX_DELETE*/
5163 
5164 #ifndef RECURSIVE_MUTEXES
5165 int
recursiveMutexLock(recursiveMutex * m)5166 recursiveMutexLock(recursiveMutex *m)
5167 { int result = 0;
5168   pthread_t self = pthread_self();
5169 
5170   if ( pthread_equal(self, m->owner) )
5171     m->count++;
5172   else
5173   { result = pthread_mutex_lock(&(m->lock));
5174     m->owner = self;
5175     m->count = 1;
5176   }
5177 
5178   return result;
5179 }
5180 
5181 
5182 int
recursiveMutexTryLock(recursiveMutex * m)5183 recursiveMutexTryLock(recursiveMutex *m)
5184 { int result = 0;
5185   pthread_t self = pthread_self();
5186 
5187   if ( pthread_equal(self, m->owner) )
5188     m->count++;
5189   else
5190   { result = pthread_mutex_trylock(&(m->lock));
5191     if ( result == 0 )
5192     { m->owner = self;
5193       m->count = 1;
5194     }
5195   }
5196 
5197   return result;
5198 }
5199 
5200 
5201 int
recursiveMutexUnlock(recursiveMutex * m)5202 recursiveMutexUnlock(recursiveMutex *m)
5203 { int result = 0;
5204   pthread_t self = pthread_self();
5205 
5206   if ( pthread_equal(self,m->owner) )
5207   { if ( --m->count < 1 )
5208     { m->owner = 0;
5209       result = pthread_mutex_unlock(&(m->lock));
5210     }
5211   } else if ( !pthread_equal(m->owner, 0) )
5212   { Sdprintf("unlocking unowned mutex %p,not done!!\n", m);
5213     Sdprintf("\tlocking thread was %u , unlocking is %u\n",m->owner,self);
5214 
5215     result = -1;
5216   }
5217 
5218   return result;
5219 }
5220 
5221 #endif /*RECURSIVE_MUTEXES*/
5222 
5223 void
initSimpleMutex(counting_mutex * m,const char * name)5224 initSimpleMutex(counting_mutex *m, const char *name)
5225 { simpleMutexInit(&m->mutex);
5226   m->count = 0;
5227   m->lock_count = 0;
5228 #ifdef O_CONTENTION_STATISTICS
5229   m->collisions = 0;
5230 #endif
5231   m->name = name ? store_string(name) : (char*)NULL;
5232   m->prev = NULL;
5233 
5234   PL_LOCK(L_MUTEX);
5235   m->next = GD->thread.mutexes;
5236   GD->thread.mutexes = m;
5237   if ( m->next )
5238     m->next->prev = m;
5239   PL_UNLOCK(L_MUTEX);
5240 }
5241 
5242 
5243 counting_mutex *
allocSimpleMutex(const char * name)5244 allocSimpleMutex(const char *name)
5245 { counting_mutex *m = allocHeapOrHalt(sizeof(*m));
5246   initSimpleMutex(m, name);
5247 
5248   return m;
5249 }
5250 
5251 
5252 void
freeSimpleMutex(counting_mutex * m)5253 freeSimpleMutex(counting_mutex *m)
5254 { PL_LOCK(L_MUTEX);
5255   if ( m->next )
5256     m->next->prev = m->prev;
5257   if ( m->prev )
5258     m->prev->next = m->next;
5259   else
5260     GD->thread.mutexes = m->next;
5261   PL_UNLOCK(L_MUTEX);
5262 
5263   simpleMutexDelete(&m->mutex);
5264   remove_string((char *)m->name);
5265   freeHeap(m, sizeof(*m));
5266 }
5267 
5268 
5269 		 /*******************************
5270 		 *	FOREIGN INTERFACE	*
5271 		 *******************************/
5272 
5273 int
PL_thread_attach_engine(PL_thread_attr_t * attr)5274 PL_thread_attach_engine(PL_thread_attr_t *attr)
5275 { GET_LD
5276   PL_thread_info_t *info;
5277   PL_local_data_t *ldnew;
5278   PL_local_data_t *ldmain;
5279 
5280   if ( LD )
5281   { if ( LD->thread.info->open_count+1 == 0 )
5282     { errno = EINVAL;
5283       return -1;
5284     }
5285     LD->thread.info->open_count++;
5286     return LD->thread.info->pl_tid;
5287   }
5288 
5289   if ( !GD->thread.enabled || GD->cleaning != CLN_NORMAL )
5290   {
5291 #ifdef EPERM				/* FIXME: Better reporting */
5292     errno = EPERM;
5293 #endif
5294     return -1;
5295   }
5296 
5297   if ( !(info = alloc_thread()) )
5298     return -1;				/* out of threads */
5299 
5300   ldmain = GD->thread.threads[1]->thread_data;
5301   ldnew = info->thread_data;
5302 
5303   if ( attr )
5304   { if ( attr->stack_limit )
5305       info->stack_limit = attr->stack_limit;
5306 
5307     info->cancel = attr->cancel;
5308   }
5309 
5310   info->goal       = NULL;
5311   info->module     = MODULE_user;
5312   info->detached   = attr == NULL ||
5313                      (attr->flags & PL_THREAD_NOT_DETACHED) == 0;
5314   info->open_count = 1;
5315 
5316   copy_local_data(ldnew, ldmain, attr ? attr->max_queue_size : 0);
5317 
5318   if ( !initialise_thread(info) )
5319   { free_thread_info(info);
5320     errno = ENOMEM;
5321     return -1;
5322   }
5323   set_system_thread_id(info);
5324   PL_LOCK(L_THREAD);
5325   info->status = PL_THREAD_RUNNING;
5326   PL_UNLOCK(L_THREAD);
5327 
5328   if ( attr )
5329   { if ( attr->alias )
5330     { if ( !aliasThread(info->pl_tid, ATOM_thread, PL_new_atom(attr->alias)) )
5331       { free_thread_info(info);
5332 	errno = EPERM;
5333 	TLD_set_LD(NULL);
5334 	return -1;
5335       }
5336     }
5337     if ( true(attr, PL_THREAD_NO_DEBUG) )
5338     { ldnew->_debugstatus.tracing   = FALSE;
5339       ldnew->_debugstatus.debugging = DBG_OFF;
5340       set(&ldnew->prolog_flag.mask, PLFLAG_LASTCALL);
5341       info->debug = FALSE;
5342     }
5343   }
5344 
5345   updateAlerted(ldnew);
5346   PL_call_predicate(MODULE_system, PL_Q_NORMAL, PROCEDURE_dthread_init0, 0);
5347 
5348   return info->pl_tid;
5349 }
5350 
5351 
5352 int
PL_thread_destroy_engine(void)5353 PL_thread_destroy_engine(void)
5354 { GET_LD
5355 
5356   if ( LD )
5357   { if ( --LD->thread.info->open_count == 0 )
5358     { free_prolog_thread(LD);
5359       TLD_set_LD(NULL);
5360     }
5361 
5362     return TRUE;
5363   }
5364 
5365   return FALSE;				/* we had no thread */
5366 }
5367 
5368 
5369 int
attachConsole(void)5370 attachConsole(void)
5371 { GET_LD
5372   fid_t fid = PL_open_foreign_frame();
5373   int rval;
5374   predicate_t pred = PL_predicate("attach_console", 0, "user");
5375 
5376   rval = PL_call_predicate(NULL, PL_Q_NODEBUG, pred, 0);
5377 
5378   PL_discard_foreign_frame(fid);
5379 
5380   return rval;
5381 }
5382 
5383 
5384 		 /*******************************
5385 		 *	      ENGINES		*
5386 		 *******************************/
5387 
5388 static PL_engine_t
PL_current_engine(void)5389 PL_current_engine(void)
5390 { GET_LD
5391 
5392   return LD;
5393 }
5394 
5395 
5396 static void
detach_engine(PL_engine_t e)5397 detach_engine(PL_engine_t e)
5398 { PL_thread_info_t *info = e->thread.info;
5399 
5400   info->has_tid = FALSE;
5401 #ifdef __linux__
5402   info->pid = -1;
5403 #endif
5404 #ifdef __WINDOWS__
5405   info->w32id = 0;
5406 #endif
5407   memset(&info->tid, 0, sizeof(info->tid));
5408 }
5409 
5410 
5411 int
PL_set_engine(PL_engine_t new,PL_engine_t * old)5412 PL_set_engine(PL_engine_t new, PL_engine_t *old)
5413 { PL_engine_t current = PL_current_engine();
5414 
5415   if ( new != current && new != PL_ENGINE_CURRENT )
5416   { PL_LOCK(L_THREAD);
5417 
5418     if ( new )
5419     { if ( new == PL_ENGINE_MAIN )
5420 	new = &PL_local_data;
5421 
5422       if ( new->magic != LD_MAGIC )
5423       { PL_UNLOCK(L_THREAD);
5424 	return PL_ENGINE_INVAL;
5425       }
5426       if ( new->thread.info->has_tid )
5427       { PL_UNLOCK(L_THREAD);
5428 	return PL_ENGINE_INUSE;
5429       }
5430     }
5431 
5432     if ( current )
5433       detach_engine(current);
5434 
5435     if ( new )
5436     { TLD_set_LD(new);
5437       new->thread.info->tid = pthread_self();
5438 
5439       set_system_thread_id(new->thread.info);
5440     } else
5441     { TLD_set_LD(NULL);
5442     }
5443 
5444     PL_UNLOCK(L_THREAD);
5445   }
5446 
5447   if ( old )
5448   { *old = current;
5449   }
5450 
5451   return PL_ENGINE_SET;
5452 }
5453 
5454 
5455 PL_engine_t
PL_create_engine(PL_thread_attr_t * attributes)5456 PL_create_engine(PL_thread_attr_t *attributes)
5457 { PL_engine_t e, current;
5458 
5459   PL_set_engine(NULL, &current);
5460   if ( PL_thread_attach_engine(attributes) >= 0 )
5461   { e = PL_current_engine();
5462   } else
5463     e = NULL;
5464 
5465   PL_set_engine(current, NULL);
5466 
5467   return e;
5468 }
5469 
5470 
5471 int
PL_destroy_engine(PL_engine_t e)5472 PL_destroy_engine(PL_engine_t e)
5473 { int rc;
5474 
5475   if ( e == PL_current_engine() )
5476   { rc = PL_thread_destroy_engine();
5477   } else
5478   { PL_engine_t current;
5479 
5480     if ( PL_set_engine(e, &current) == PL_ENGINE_SET )
5481     { rc = PL_thread_destroy_engine();
5482       PL_set_engine(current, NULL);
5483 
5484     } else
5485       rc = FALSE;
5486   }
5487 
5488   return rc;
5489 }
5490 
5491 
5492 		 /*******************************
5493 		 *	      GC THREAD		*
5494 		 *******************************/
5495 
5496 static int GC_id = 0;
5497 static int GC_starting = 0;
5498 
5499 static rc_cancel cancelGCThread(int tid);
5500 
5501 static void *
GCmain(void * closure)5502 GCmain(void *closure)
5503 { PL_thread_attr_t attrs = {0};
5504 #ifdef HAVE_SIGPROCMASK
5505   sigset_t set;
5506   allSignalMask(&set);
5507   if ( GD->signals.sig_alert )
5508     sigdelset(&set, GD->signals.sig_alert);
5509   pthread_sigmask(SIG_BLOCK, &set, NULL);
5510 #endif
5511 
5512   attrs.alias = "gc";
5513   attrs.flags = PL_THREAD_NO_DEBUG|PL_THREAD_NOT_DETACHED;
5514   set_os_thread_name_from_charp("gc");
5515 
5516   if ( PL_thread_attach_engine(&attrs) > 0 )
5517   { GET_LD
5518     PL_thread_info_t *info = LD->thread.info;
5519     static predicate_t pred = 0;
5520     int rc;
5521 
5522     if ( !pred )
5523       pred = PL_predicate("$gc", 0, "system");
5524 
5525     GC_id = PL_thread_self();
5526     info->cancel = cancelGCThread;
5527     rc = PL_call_predicate(NULL, PL_Q_PASS_EXCEPTION, pred, 0);
5528     GC_id = 0;
5529 
5530     set_thread_completion(info, rc, exception_term);
5531     PL_thread_destroy_engine();
5532   }
5533 
5534   GC_starting = FALSE;
5535 
5536   return NULL;
5537 }
5538 
5539 
5540 static int
GCthread(void)5541 GCthread(void)
5542 { if ( GC_id <= 0 )
5543   { if ( COMPARE_AND_SWAP_INT(&GC_starting, FALSE, TRUE) )
5544     { pthread_attr_t attr;
5545       pthread_t thr;
5546       int rc;
5547 
5548       if ( !GD->thread.gc.initialized )
5549       { pthread_mutex_init(&GD->thread.gc.mutex, NULL);
5550 	pthread_cond_init(&GD->thread.gc.cond, NULL);
5551 	GD->thread.gc.initialized = TRUE;
5552       } else
5553       { GD->thread.gc.requests = 0;
5554       }
5555 
5556       pthread_attr_init(&attr);
5557       rc = pthread_create(&thr, &attr, GCmain, NULL);
5558       pthread_attr_destroy(&attr);
5559       if ( rc != 0 )
5560 	GC_starting = FALSE;
5561     }
5562   }
5563 
5564   return GC_id;
5565 }
5566 
5567 
5568 static int
gc_sig_request(int sig)5569 gc_sig_request(int sig)
5570 { switch(sig)
5571   { case SIG_ATOM_GC:
5572       return GCREQUEST_AGC;
5573     case SIG_CLAUSE_GC:
5574       return GCREQUEST_CGC;
5575     case SIG_PLABORT:
5576       return GCREQUEST_ABORT;
5577     default:
5578       assert(0);
5579       return 0;
5580   }
5581 }
5582 
5583 
5584 static int
signalGCThreadCond(int tid,int sig)5585 signalGCThreadCond(int tid, int sig)
5586 { (void)tid;
5587 
5588   pthread_mutex_lock(&GD->thread.gc.mutex);
5589   GD->thread.gc.requests |= gc_sig_request(sig);
5590   pthread_cond_signal(&GD->thread.gc.cond);
5591   pthread_mutex_unlock(&GD->thread.gc.mutex);
5592 
5593   return TRUE;
5594 }
5595 
5596 
5597 int
signalGCThread(int sig)5598 signalGCThread(int sig)
5599 { GET_LD
5600   int tid;
5601 
5602   if ( truePrologFlag(PLFLAG_GCTHREAD) &&
5603        !GD->bootsession &&
5604        (tid = GCthread()) > 0 &&
5605        signalGCThreadCond(tid, sig) )
5606     return TRUE;
5607 
5608   return raiseSignal(LD, sig);
5609 }
5610 
5611 
5612 static int
gc_running(void)5613 gc_running(void)
5614 { int tid;
5615   PL_thread_info_t *info;
5616 
5617   if ( (tid=GC_id) > 0 && (info = GD->thread.threads[tid]) &&
5618        info->status == PL_THREAD_RUNNING )
5619     return tid;
5620 
5621   return 0;
5622 }
5623 
5624 
5625 int
isSignalledGCThread(int sig ARG_LD)5626 isSignalledGCThread(int sig ARG_LD)
5627 { if ( gc_running() )
5628   { return (GD->thread.gc.requests & gc_sig_request(sig)) != 0;
5629   } else
5630   { return PL_pending(sig);
5631   }
5632 }
5633 
5634 
5635 static
5636 PRED_IMPL("$gc_wait", 1, gc_wait, 0)
5637 { PRED_LD
5638 
5639   for(;;)
5640   { unsigned int req;
5641     atom_t action;
5642 
5643     pthread_mutex_lock(&GD->thread.gc.mutex);
5644     req = GD->thread.gc.requests;
5645     if ( !req )
5646       pthread_cond_wait(&GD->thread.gc.cond, &GD->thread.gc.mutex);
5647     pthread_mutex_unlock(&GD->thread.gc.mutex);
5648 
5649     if ( (req&GCREQUEST_ABORT) )
5650       action = ATOM_abort;
5651     else if ( (req&GCREQUEST_AGC) )
5652       action = ATOM_garbage_collect_atoms;
5653     else if ( (req&GCREQUEST_CGC) )
5654       action = ATOM_garbage_collect_clauses;
5655     else
5656       continue;
5657 
5658     return PL_unify_atom(A1, action);
5659   }
5660 }
5661 
5662 
5663 static
5664 PRED_IMPL("$gc_clear", 1, gc_clear, 0)
5665 { PRED_LD
5666   atom_t action;
5667 
5668   if ( PL_get_atom_ex(A1, &action) )
5669   { unsigned int mask;
5670 
5671     if ( action == ATOM_garbage_collect_atoms )
5672       mask = GCREQUEST_AGC;
5673     else if ( action == ATOM_garbage_collect_clauses )
5674       mask = GCREQUEST_CGC;
5675     else
5676       return PL_domain_error("action", A1);
5677 
5678     pthread_mutex_lock(&GD->thread.gc.mutex);
5679     GD->thread.gc.requests &= ~mask;
5680     pthread_mutex_unlock(&GD->thread.gc.mutex);
5681 
5682     return TRUE;
5683   }
5684 
5685   return FALSE;
5686 }
5687 
5688 
5689 static rc_cancel
cancelGCThread(int tid)5690 cancelGCThread(int tid)
5691 { signalGCThreadCond(tid, SIG_PLABORT);
5692   return PL_THREAD_CANCEL_MUST_JOIN;
5693 }
5694 
5695 static
5696 PRED_IMPL("$gc_stop", 0, gc_stop, 0)
5697 { int tid;
5698 
5699   if ( (tid=gc_running()) )
5700     return cancelGCThread(tid) != PL_THREAD_CANCEL_FAILED;
5701 
5702   return FALSE;
5703 }
5704 
5705 
5706 		 /*******************************
5707 		 *	     STATISTICS		*
5708 		 *******************************/
5709 
5710 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5711 thread_statistics(+Thread, +Key, -Value)
5712     Same as statistics(+Key, -Value) but operates on another thread.
5713 
5714 statistics(heapused, X) walks along  all   threads  and  therefore locks
5715 L_THREAD. As it returns the same result   on  all threads we'll redirect
5716 this to run on our own thread.
5717 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
5718 
5719 static
5720 PRED_IMPL("thread_statistics", 3, thread_statistics, 0)
5721 { PRED_LD
5722   PL_thread_info_t *info;
5723   PL_local_data_t *ld;
5724   int rval;
5725   atom_t k;
5726 
5727   PL_LOCK(L_THREAD);
5728   if ( !get_thread(A1, &info, TRUE) )
5729   { PL_UNLOCK(L_THREAD);
5730     fail;
5731   }
5732 
5733   if ( !(ld=info->thread_data) )
5734   { PL_UNLOCK(L_THREAD);
5735     return PL_error(NULL, 0, NULL,
5736 		    ERR_PERMISSION,
5737 		    ATOM_statistics, ATOM_thread, A1);
5738   }
5739 
5740   if ( !PL_get_atom(A2, &k) )
5741     k = 0;
5742 
5743   if ( k == ATOM_heapused )
5744     ld = LD;
5745   else if ( k == ATOM_cputime || k == ATOM_runtime )
5746     ld->statistics.user_cputime = ThreadCPUTime(ld, CPU_USER);
5747   else if ( k == ATOM_system_time )
5748     ld->statistics.system_cputime = ThreadCPUTime(ld, CPU_SYSTEM);
5749 
5750   if ( LD == ld )		/* self: unlock first to avoid deadlock */
5751   { PL_UNLOCK(L_THREAD);
5752     return pl_statistics_ld(A2, A3, ld PASS_LD);
5753   }
5754 
5755   rval = pl_statistics_ld(A2, A3, ld PASS_LD);
5756   PL_UNLOCK(L_THREAD);
5757 
5758   return rval;
5759 }
5760 
5761 
5762 #ifdef __WINDOWS__
5763 
5764 /* How to make the memory visible?
5765 */
5766 
5767 					/* see also pl-nt.c */
5768 #define nano * 0.000000001
5769 #define ntick 100.0
5770 
5771 double
ThreadCPUTime(PL_local_data_t * ld,int which)5772 ThreadCPUTime(PL_local_data_t *ld, int which)
5773 { PL_thread_info_t *info = ld->thread.info;
5774   double t;
5775   FILETIME created, exited, kerneltime, usertime;
5776   HANDLE win_thread;
5777 
5778   if ( !info->has_tid )
5779     return 0.0;
5780 
5781   if ( !(win_thread = get_windows_thread(info)) )
5782     return 0.0;
5783 
5784   if ( GetThreadTimes(win_thread,
5785 		      &created, &exited, &kerneltime, &usertime) )
5786   { FILETIME *p;
5787 
5788     if ( which == CPU_SYSTEM )
5789       p = &kerneltime;
5790     else
5791       p = &usertime;
5792 
5793     t = (double)p->dwHighDateTime * (4294967296.0 * ntick nano);
5794     t += (double)p->dwLowDateTime  * (ntick nano);
5795   } else
5796     t = 0.0;
5797 
5798   close_windows_thread(win_thread);
5799 
5800   return t;
5801 }
5802 
5803 #else /*__WINDOWS__*/
5804 
5805 #define timespec_to_double(ts) \
5806 	((double)(ts).tv_sec + (double)(ts).tv_nsec/(double)1000000000.0)
5807 
5808 #ifdef PTHREAD_CPUCLOCKS
5809 #define NO_THREAD_SYSTEM_TIME 1
5810 
5811 double
ThreadCPUTime(PL_local_data_t * ld,int which)5812 ThreadCPUTime(PL_local_data_t *ld, int which)
5813 { PL_thread_info_t *info = ld->thread.info;
5814 
5815   if ( which == CPU_SYSTEM )
5816     return 0.0;
5817 
5818   if ( info->has_tid )
5819   { clockid_t clock_id;
5820     struct timespec ts;
5821     int rc;
5822 
5823     if ( (rc=pthread_getcpuclockid(info->tid, &clock_id)) == 0 )
5824     { if (clock_gettime(clock_id, &ts) == 0)
5825 	return timespec_to_double(ts);
5826     } else
5827     { DEBUG(MSG_THREAD,
5828 	    Sdprintf("Could not get thread time: %s\n", strerror(rc)));
5829     }
5830   }
5831 
5832   return 0.0;
5833 }
5834 
5835 #else /*PTHREAD_CPUCLOCKS*/
5836 
5837 #ifdef HAVE_MACH_THREAD_ACT_H	/* MacOS and other Mach based systems */
5838 
5839 #include <mach/thread_act.h>
5840 
5841 double
ThreadCPUTime(PL_local_data_t * ld,int which)5842 ThreadCPUTime(PL_local_data_t *ld, int which)
5843 { PL_thread_info_t *info = ld->thread.info;
5844 
5845   if ( info->has_tid )
5846   { kern_return_t error;
5847     struct thread_basic_info th_info;
5848     mach_msg_type_number_t th_info_count = THREAD_BASIC_INFO_COUNT;
5849     thread_t thread = pthread_mach_thread_np(info->tid);
5850 
5851     if ((error = thread_info(thread, THREAD_BASIC_INFO,
5852 			     (thread_info_t)&th_info, &th_info_count))
5853 	!= KERN_SUCCESS)
5854       return 0.0;
5855 
5856     if ( which == CPU_SYSTEM )
5857       return ( th_info.system_time.seconds +
5858 	       th_info.system_time.microseconds / 1e6 );
5859     else
5860       return ( th_info.user_time.seconds +
5861 	       th_info.user_time.microseconds / 1e6 );
5862   }
5863 
5864   return 0.0;
5865 }
5866 
5867 #else /*HAVE_MACH_THREAD_ACT_H*/
5868 
5869 #ifdef LINUX_PROCFS
5870 
5871 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5872 Unfortunately POSIX threads does not define a   way  to get the CPU time
5873 per thread. Some systems do have mechanisms  for that. POSIX does define
5874 clock_gettime(CLOCK_THREAD_CPUTIME_ID), but it certainly doesn't work in
5875 Linux 2.6.8.
5876 
5877 Autoconf detects the presense of  /proc  and   we  read  the values from
5878 there. This is rather slow. To make things not  too bad we use a pool of
5879 open handles to entries in the /proc  table. All this junk should really
5880 be moved into a file trying to implement  thread CPU time for as many as
5881 possible platforms. If you happen to know  such a library, please let me
5882 know.
5883 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
5884 
5885 #include <fcntl.h>
5886 
5887 #define CACHED_PROCPS_ENTRIES 5
5888 
5889 typedef struct
5890 { int tid;				/* process id */
5891   int fd;				/* file descriptor */
5892   int offset;				/* start of numbers */
5893   int usecoount;
5894 } procps_entry;
5895 
5896 static procps_entry procps_entries[CACHED_PROCPS_ENTRIES]; /* cached entries */
5897 
5898 static void
close_procps_entry(procps_entry * e)5899 close_procps_entry(procps_entry *e)
5900 { if ( e->tid )
5901   { close(e->fd);
5902     memset(e, 0, sizeof(*e));
5903   }
5904 }
5905 
5906 
5907 static procps_entry*
reclaim_procps_entry()5908 reclaim_procps_entry()
5909 { procps_entry *e, *low;
5910   int i; int lowc;
5911 
5912   low=e=procps_entries;
5913   lowc=low->usecoount;
5914 
5915   for(e++, i=1; i<CACHED_PROCPS_ENTRIES; e++, i++)
5916   { if ( e->usecoount < lowc )
5917     { lowc = e->usecoount;
5918       low = e;
5919     }
5920   }
5921 
5922   for(e=procps_entries, i=0; i<CACHED_PROCPS_ENTRIES; e++, i++)
5923     e->usecoount = 0;
5924 
5925   close_procps_entry(low);
5926 
5927   return low;
5928 }
5929 
5930 
5931 static procps_entry *
open_procps_entry(procps_entry * e,int tid)5932 open_procps_entry(procps_entry *e, int tid)
5933 { char fname[256];
5934   int fd;
5935 
5936   sprintf(fname, "/proc/self/task/%d/stat", tid);
5937   if ( (fd=open(fname, O_RDONLY)) >= 0 )
5938   { char buffer[1000];
5939     int pos;
5940 
5941     pos = read(fd, buffer, sizeof(buffer)-1);
5942     if ( pos > 0 )
5943     { char *bp;
5944 
5945       buffer[pos] = EOS;
5946       if ( (bp=strrchr(buffer, ')')) )
5947       { e->tid = tid;
5948 	e->fd = fd;
5949 	e->offset = (bp-buffer)+4;
5950 	e->usecoount = 1;
5951 
5952 	return e;
5953       }
5954     }
5955   }
5956 
5957   return NULL;
5958 }
5959 
5960 
5961 static procps_entry*
get_procps_entry(int tid)5962 get_procps_entry(int tid)
5963 { int i;
5964   procps_entry *e;
5965 
5966   for(e=procps_entries, i=0; i<CACHED_PROCPS_ENTRIES; e++, i++)
5967   { if ( e->tid == tid )
5968     { e->usecoount++;
5969 
5970       return e;
5971     }
5972   }
5973 
5974   for(e=procps_entries, i=0; i<CACHED_PROCPS_ENTRIES; e++, i++)
5975   { if ( e->tid == 0 )
5976       return open_procps_entry(e, tid);
5977   }
5978 
5979   e = reclaim_procps_entry();
5980   return open_procps_entry(e, tid);
5981 }
5982 
5983 
5984 double
ThreadCPUTime(PL_local_data_t * ld,int which)5985 ThreadCPUTime(PL_local_data_t *ld, int which)
5986 { PL_thread_info_t *info = ld->thread.info;
5987   procps_entry *e;
5988 
5989   if ( (e=get_procps_entry(info->pid)) )
5990   { char buffer[1000];
5991     char *s;
5992     int64_t ticks;
5993     int i, n, nth = 10;			/* user time */
5994 
5995     if ( which == CPU_SYSTEM )
5996       nth++;
5997 
5998     lseek(e->fd, e->offset, SEEK_SET);
5999     n = read(e->fd, buffer, sizeof(buffer)-1);
6000     if ( n >= 0 )
6001       buffer[n] = EOS;
6002 					/* most likely does not need reuse */
6003     if ( info->status != PL_THREAD_RUNNING )
6004       close_procps_entry(e);
6005 
6006     for(s=buffer, i=0; i<nth; i++, s++)
6007     { while(*s != ' ')
6008 	s++;
6009     }
6010 
6011     ticks = atoll(s);
6012     return (double)ticks/100.0;
6013   }
6014 
6015   return 0.0;
6016 }
6017 
6018 #else /*LINUX_PROCFS*/
6019 
6020 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6021 Systems where we can get the  thread   CPU  time from the calling thread
6022 only. Upto Linux 2.4, linux threads where   more  like processes, and we
6023 can  get  these  using  times().  Some    POSIX  systems  have  a  clock
6024 CLOCK_THREAD_CPUTIME_ID. Odly enough, on Debian etche at least, fetching
6025 this with clock_gettime() produces bogus,  but   getting  it through the
6026 syscall interface works. This might  be   because  the  kernel is fairly
6027 up-to-date, but the C library is very much out of data (glibc 2.3)
6028 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6029 
6030 #ifndef SIG_SYNCTIME
6031 #define SIG_SYNCTIME SIGUSR1
6032 #endif
6033 
6034 #ifdef USE_SEM_OPEN
6035 static sem_t *sem_synctime_ptr;
6036 #else
6037 static sem_t sem_synctime;			/* used for atom-gc */
6038 #define sem_synctime_ptr (&sem_synctime)
6039 #endif
6040 
6041 #ifdef LINUX_CPUCLOCKS
6042 #define NO_THREAD_SYSTEM_TIME 1
6043 
6044 static void
SyncUserCPU(int sig)6045 SyncUserCPU(int sig)
6046 { GET_LD
6047 
6048   if ( LD )
6049   { struct timespec ts;
6050 
6051     if ( syscall(__NR_clock_gettime, CLOCK_THREAD_CPUTIME_ID, &ts) == 0 )
6052     { LD->statistics.user_cputime = timespec_to_double(ts);
6053     } else
6054     { perror("clock_gettime");
6055     }
6056   }
6057 
6058   if ( sig )
6059     sem_post(sem_synctime_ptr);
6060 }
6061 
6062 
6063 static void
SyncSystemCPU(int sig)6064 SyncSystemCPU(int sig)
6065 { assert(0);
6066 }
6067 
6068 #else /*LINUX_CPUCLOCKS*/
6069 #define THREAD_CPU_BY_PID 1
6070 
6071 static void
SyncUserCPU(int sig)6072 SyncUserCPU(int sig)
6073 { GET_LD
6074 
6075   if ( LD )
6076     LD->statistics.user_cputime = CpuTime(CPU_USER);
6077   if ( sig )
6078     sem_post(sem_synctime_ptr);
6079 }
6080 
6081 
6082 static void
SyncSystemCPU(int sig)6083 SyncSystemCPU(int sig)
6084 { GET_LD
6085 
6086   if ( LD )
6087     LD->statistics.system_cputime = CpuTime(CPU_SYSTEM);
6088   if ( sig )
6089     sem_post(sem_synctime_ptr);
6090 }
6091 
6092 #endif  /*LINUX_CPUCLOCKS*/
6093 
6094 double
ThreadCPUTime(PL_local_data_t * ld,int which)6095 ThreadCPUTime(PL_local_data_t *ld, int which)
6096 { GET_LD
6097   PL_thread_info_t *info = ld->thread.info;
6098 
6099 #ifdef NO_THREAD_SYSTEM_TIME
6100   if ( which == CPU_SYSTEM )
6101     return 0.0;
6102 #endif
6103 
6104   if ( info->thread_data == LD )
6105   { if ( which == CPU_USER )
6106       SyncUserCPU(0);
6107     else
6108       SyncSystemCPU(0);
6109   } else
6110   { struct sigaction old;
6111     struct sigaction new;
6112     sigset_t sigmask;
6113     sigset_t set;
6114     int ok;
6115 
6116     blockSignals(&set);
6117     sem_init(sem_synctime_ptr, USYNC_THREAD, 0);
6118     allSignalMask(&sigmask);
6119     memset(&new, 0, sizeof(new));
6120     new.sa_handler = (which == CPU_USER ? SyncUserCPU : SyncSystemCPU);
6121     new.sa_flags   = SA_RESTART;
6122     new.sa_mask    = sigmask;
6123     sigaction(SIG_SYNCTIME, &new, &old);
6124 
6125     if ( info->has_tid )
6126       ok = (pthread_kill(info->tid, SIG_SYNCTIME) == 0);
6127     else
6128       ok = FALSE;
6129 
6130     if ( ok )
6131     { while( sem_wait(sem_synctime_ptr) == -1 && errno == EINTR )
6132 	;
6133     }
6134     sem_destroy(sem_synctime_ptr);
6135     sigaction(SIG_SYNCTIME, &old, NULL);
6136     unblockSignals(&set);
6137     if ( !ok )
6138       return 0.0;
6139   }
6140 
6141   if ( which == CPU_USER )
6142     return info->thread_data->statistics.user_cputime;
6143   else
6144     return info->thread_data->statistics.system_cputime;
6145 }
6146 
6147 #endif /*LINUX_PROCFS*/
6148 #endif /*HAVE_MACH_THREAD_ACT_H*/
6149 #endif /*PTHREAD_CPUCLOCKS*/
6150 #endif /*__WINDOWS__*/
6151 
6152 
6153 		 /*******************************
6154 		 *     ITERATE OVER THREADS	*
6155 		 *******************************/
6156 
6157 void
forThreadLocalDataUnsuspended(void (* func)(PL_local_data_t *),unsigned flags)6158 forThreadLocalDataUnsuspended(void (*func)(PL_local_data_t *), unsigned flags)
6159 { GET_LD
6160   int me = PL_thread_self();
6161   int i;
6162 
6163   for( i=1; i<=GD->thread.highest_id; i++ )
6164   { if ( i != me )
6165     { PL_thread_info_t *info = GD->thread.threads[i];
6166 
6167       if ( info && info->thread_data &&
6168 	   ( info->status == PL_THREAD_RUNNING || info->in_exit_hooks ) )
6169       { PL_local_data_t *ld;
6170 
6171 	if ( (ld = acquire_ldata(info)) )
6172 	{ simpleMutexLock(&ld->thread.scan_lock);
6173 	  (*func)(ld);
6174 	  simpleMutexUnlock(&ld->thread.scan_lock);
6175 	}
6176       }
6177     }
6178   }
6179 }
6180 
6181 
6182 		 /*******************************
6183 		 *	 ATOM MARK SUPPORT	*
6184 		 *******************************/
6185 
6186 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6187 We do not register atoms  in   message  queues as the PL_register_atom()
6188 calls seriously harms concurrency  due  to   contention  on  L_ATOM. So,
6189 instead we must sweep atoms in records  in the message queues. Note that
6190 atom-gc runs with the L_THREAD mutex  locked.   As  both he thread mutex
6191 queue tables are guarded by this  mutex, the loops in markAtomsThreads()
6192 are safe.
6193 
6194 The marking must be synchronised with removing   a  message from a queue
6195 (get_message())  and  destroy_message_queue().  Notably  the  latter  is
6196 tricky as this deletes `queue->gc_mutex` and frees the queue, so we must
6197 ensure we do not get into   destroy_message_queue()  when AGC is marking
6198 the queue's messages.  There are three types of queues:
6199 
6200   - Thread queues.  AGC is synced with thread creation and destruction
6201     and thus safe.
6202   - Anonymous message queues. These are reclaimed from AGC and thus
6203     safe.
6204   - Named queues.  These are destroyed using message_queue_destroy/1.
6205     For this case we
6206     - Use `queueTable_mutex` to sync deletion from `queueTable` with
6207       enumeration in markAtomsMessageQueues() which ensures we safely
6208       get the queue with queue->gc_mutex locked.
6209     - We hold queue->gc_mutex during the scan, which delays
6210       message_queue_destroy/1 to set `queue->destroyed`.
6211     - If `queue->destroyed` is set, AGC cannot get the queue anymore
6212       and we can safely discard it.
6213 
6214 This is implemented by Keri Harris and Jan Wielemaker. We are not really
6215 happy with the complicated  locking.  Alternatively   we  could  use the
6216 linger interface where the lingering queues   are reclaimed by AGC. That
6217 might be cleaner, but is a  more   involved  patch  that doesn't perform
6218 significantly better.
6219 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6220 
6221 static void
markAtomsMessageQueue(message_queue * queue)6222 markAtomsMessageQueue(message_queue *queue)
6223 { thread_message *msg;
6224 
6225   for(msg=queue->head; msg; msg=msg->next)
6226   { markAtomsRecord(msg->message);
6227   }
6228 }
6229 
6230 
6231 void
markAtomsThreadMessageQueue(PL_local_data_t * ld)6232 markAtomsThreadMessageQueue(PL_local_data_t *ld)
6233 { message_queue *q = &ld->thread.messages;
6234 
6235   simpleMutexLock(&q->gc_mutex);
6236   markAtomsMessageQueue(q);
6237   simpleMutexUnlock(&q->gc_mutex);
6238 }
6239 
6240 
6241 void
markAtomsMessageQueues(void)6242 markAtomsMessageQueues(void)
6243 { if ( queueTable )
6244   { message_queue *q;
6245     TableEnum e = newTableEnum(queueTable);
6246 
6247     while ( TRUE )
6248     { simpleMutexLock(&queueTable_mutex);
6249       if ( !advanceTableEnum(e, NULL, (void**)&q) )
6250       { simpleMutexUnlock(&queueTable_mutex);
6251         break;
6252       }
6253       simpleMutexLock(&q->gc_mutex);
6254       simpleMutexUnlock(&queueTable_mutex);
6255       markAtomsMessageQueue(q);
6256       simpleMutexUnlock(&q->gc_mutex);
6257     }
6258     freeTableEnum(e);
6259   }
6260 }
6261 
6262 
6263 		 /*******************************
6264 		 *	    PREDICATES		*
6265 		 *******************************/
6266 
6267 
6268 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6269 localiseDefinition(Definition def)
6270     Create a thread-local definition for the predicate `def'.
6271 
6272     This function is called from getProcDefinition() if the procedure is
6273     not yet `localised'.  Calling this function must be guarded by the
6274     L_PREDICATE mutex.
6275 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6276 
6277 #ifndef offsetof
6278 #define offsetof(structure, field) ((int) &(((structure *)NULL)->field))
6279 #endif
6280 
6281 static void
registerLocalDefinition(Definition def)6282 registerLocalDefinition(Definition def)
6283 { GET_LD
6284   DefinitionChain cell = allocHeapOrHalt(sizeof(*cell));
6285 
6286   cell->definition = def;
6287   cell->next = LD->thread.local_definitions;
6288   LD->thread.local_definitions = cell;
6289 }
6290 
6291 
6292 static void
unregisterLocalDefinition(Definition def,PL_local_data_t * ld)6293 unregisterLocalDefinition(Definition def, PL_local_data_t *ld)
6294 { DefinitionChain cell;
6295 
6296   for(cell = ld->thread.local_definitions; cell; cell = cell->next)
6297   { if ( cell->definition == def )
6298     { cell->definition = NULL;
6299       return;
6300     }
6301   }
6302 }
6303 
6304 
6305 LocalDefinitions
new_ldef_vector(void)6306 new_ldef_vector(void)
6307 { LocalDefinitions f = allocHeapOrHalt(sizeof(*f));
6308 
6309   memset(f, 0, sizeof(*f));
6310   f->blocks[0] = f->preallocated - 1;
6311   f->blocks[1] = f->preallocated - 1;
6312   f->blocks[2] = f->preallocated - 1;
6313 
6314   return f;
6315 }
6316 
6317 
6318 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6319 Called  from  destroyDefinition()   for    a   thread-local   predicate.
6320 destroyDefinition() is called  for  destroying   temporary  modules.  If
6321 thread-local predicates are defined in the   temporary module these must
6322 be destroyed and the registration with  module   must  be removed or the
6323 thread cleanup will access the destroyed predicate.
6324 
6325 Now, the thread having a localization for  this predicate is most likely
6326 the calling thread, but in theory other threads can be involved and thus
6327 we need to scan the entire localization array.
6328 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6329 
6330 void
destroyLocalDefinitions(Definition def)6331 destroyLocalDefinitions(Definition def)
6332 { GET_LD
6333   LocalDefinitions ldefs = def->impl.local.local;
6334   int b;
6335 
6336   for(b=0; b<MAX_BLOCKS; b++)
6337   { Definition *d0 = ldefs->blocks[b];
6338 
6339     if ( d0 )
6340     { size_t bs = (size_t)1<<b;
6341       size_t tid = bs;
6342       size_t end = tid+bs;
6343 
6344       for(tid=bs; tid<end; tid++)
6345       { if ( d0[tid] )
6346 	{ PL_thread_info_t *info = GD->thread.threads[tid];
6347 	  PL_local_data_t *ld;
6348 
6349 	  if ( LD )					/* See (*) */
6350 	    ld = acquire_ldata(info);
6351 	  else
6352 	    ld = info->thread_data;
6353 
6354 	  DEBUG(MSG_THREAD_LOCAL,
6355 		if ( ld != LD )
6356 		  Sdprintf("Destroying thread local predicate %s "
6357 			   "with active local definitions\n",
6358 			   predicateName(def)));
6359 
6360 	  unregisterLocalDefinition(def, ld);
6361 	  destroyLocalDefinition(def, tid);
6362 
6363 	  if ( LD )
6364 	    release_ldata(ld);
6365 	}
6366       }
6367     }
6368   }
6369 }
6370 
6371 
6372 void
free_ldef_vector(LocalDefinitions ldefs)6373 free_ldef_vector(LocalDefinitions ldefs)
6374 { int i;
6375 
6376   for(i=3; i<MAX_BLOCKS; i++)
6377   { size_t bs = (size_t)1<<i;
6378     Definition *d0 = ldefs->blocks[i];
6379 
6380     if ( d0 )
6381       freeHeap(d0+bs, bs*sizeof(Definition));
6382   }
6383 
6384   freeHeap(ldefs, sizeof(*ldefs));
6385 }
6386 
6387 
6388 Definition
localiseDefinition(Definition def)6389 localiseDefinition(Definition def)
6390 { Definition local = allocHeapOrHalt(sizeof(*local));
6391   size_t bytes = sizeof(arg_info)*def->functor->arity;
6392 
6393   *local = *def;
6394   local->impl.any.args = allocHeapOrHalt(bytes);
6395   memcpy(local->impl.any.args, def->impl.any.args, bytes);
6396   clear(local, P_THREAD_LOCAL|P_DIRTYREG);	/* remains P_DYNAMIC */
6397   local->impl.clauses.first_clause = NULL;
6398   local->impl.clauses.clause_indexes = NULL;
6399   ATOMIC_INC(&GD->statistics.predicates);
6400   ATOMIC_ADD(&local->module->code_size, sizeof(*local));
6401   DEBUG(MSG_PROC_COUNT, Sdprintf("Localise %s\n", predicateName(def)));
6402 
6403   setSupervisor(local);
6404   registerLocalDefinition(def);
6405 
6406   return local;
6407 }
6408 
6409 
6410 void
cleanupLocalDefinitions(PL_local_data_t * ld)6411 cleanupLocalDefinitions(PL_local_data_t *ld)
6412 { DefinitionChain ch = ld->thread.local_definitions;
6413   DefinitionChain next;
6414   unsigned int id = ld->thread.info->pl_tid;
6415 
6416   for( ; ch; ch = next)
6417   { Definition def = ch->definition;
6418     next = ch->next;
6419 
6420     if ( def )
6421     { DEBUG(MSG_CLEANUP,
6422 	    Sdprintf("Clean local def in thread %d for %s\n",
6423 		     id,
6424 		     predicateName(def)));
6425 
6426       assert(true(def, P_THREAD_LOCAL));
6427       destroyLocalDefinition(def, id);
6428     }
6429     freeHeap(ch, sizeof(*ch));
6430   }
6431 }
6432 
6433 
6434 static size_t
sizeof_local_definitions(PL_local_data_t * ld)6435 sizeof_local_definitions(PL_local_data_t *ld)
6436 { DefinitionChain ch = ld->thread.local_definitions;
6437   size_t size = 0;
6438 
6439   for( ; ch; ch = ch->next)
6440   { Definition def = ch->definition;
6441     Definition local;
6442 
6443     assert(true(def, P_THREAD_LOCAL));
6444     if ( (local = getProcDefinitionForThread(def, ld->thread.info->pl_tid)) )
6445       size += sizeof_predicate(local);
6446   }
6447 
6448   return size;
6449 }
6450 
6451 
6452 
6453 /** '$thread_local_clause_count'(:Head, +Thread, -NumberOfClauses) is semidet.
6454 
6455 True when NumberOfClauses is the number  of clauses for the thread-local
6456 predicate Head in Thread.  Fails silently if
6457 
6458   - Head does not refer to an existing predicate
6459   - The predicate exists, but is not thread local (should
6460     this be an error?)
6461   - The thread does not exist
6462   - The definition was never localised for Thread. One
6463     could argue to return 0 for this case.
6464 
6465 @author	Keri Harris
6466 */
6467 
6468 static
6469 PRED_IMPL("$thread_local_clause_count", 3, thread_local_clause_count, 0)
6470 { PRED_LD
6471   Procedure proc;
6472   Definition def;
6473   PL_thread_info_t *info;
6474   int number_of_clauses = 0;
6475 
6476   term_t pred = A1;
6477   term_t thread = A2;
6478   term_t count  = A3;
6479 
6480   if ( !get_procedure(pred, &proc, 0, GP_RESOLVE) )
6481     fail;
6482 
6483   def = proc->definition;
6484   if ( false(def, P_THREAD_LOCAL) )
6485     fail;
6486 
6487   if ( !get_thread(thread, &info, FALSE) )
6488     fail;
6489 
6490   PL_LOCK(L_THREAD);
6491   if ( (def = getProcDefinitionForThread(proc->definition, info->pl_tid)) )
6492     number_of_clauses = def->impl.clauses.number_of_clauses;
6493   PL_UNLOCK(L_THREAD);
6494 
6495   return PL_unify_integer(count, number_of_clauses);
6496 }
6497 
6498 
6499 #else /*O_PLMT*/
6500 
6501 int
signalGCThread(int sig)6502 signalGCThread(int sig)
6503 { GET_LD
6504 
6505   return raiseSignal(LD, sig);
6506 }
6507 
6508 int
isSignalledGCThread(int sig ARG_LD)6509 isSignalledGCThread(int sig ARG_LD)
6510 { return PL_pending(sig);
6511 }
6512 
6513 
6514 int
PL_thread_self()6515 PL_thread_self()
6516 { return -2;
6517 }
6518 
6519 
6520 int
PL_unify_thread_id(term_t t,int i)6521 PL_unify_thread_id(term_t t, int i)
6522 { if ( i == -2 )
6523     return PL_unify_atom(t, ATOM_main);
6524   return -1;
6525 }
6526 
6527 
6528 int
PL_thread_at_exit(void (* function)(void *),void * closure,int global)6529 PL_thread_at_exit(void (*function)(void *), void *closure, int global)
6530 { return FALSE;
6531 }
6532 
6533 int
PL_thread_attach_engine(PL_thread_attr_t * attr)6534 PL_thread_attach_engine(PL_thread_attr_t *attr)
6535 { return -2;
6536 }
6537 
6538 int
PL_thread_destroy_engine()6539 PL_thread_destroy_engine()
6540 { return FALSE;
6541 }
6542 
6543 #ifdef __WINDOWS__
6544 int
PL_w32thread_raise(DWORD id,int sig)6545 PL_w32thread_raise(DWORD id, int sig)
6546 { return PL_raise(sig);
6547 }
6548 #endif
6549 
6550 foreign_t
pl_thread_self(term_t id)6551 pl_thread_self(term_t id)
6552 { return PL_unify_atom(id, ATOM_main);
6553 }
6554 
6555 PL_engine_t
PL_current_engine(void)6556 PL_current_engine(void)
6557 { return LD;
6558 }
6559 
6560 int
PL_set_engine(PL_engine_t new,PL_engine_t * old)6561 PL_set_engine(PL_engine_t new, PL_engine_t *old)
6562 { if ( new != LD && new != PL_ENGINE_MAIN )
6563     return PL_ENGINE_INVAL;
6564 
6565   if ( old )
6566   { *old = LD;
6567   }
6568 
6569   return PL_ENGINE_SET;
6570 }
6571 
6572 PL_engine_t
PL_create_engine(PL_thread_attr_t * attributes)6573 PL_create_engine(PL_thread_attr_t *attributes)
6574 { return NULL;
6575 }
6576 
6577 int
PL_destroy_engine(PL_engine_t e)6578 PL_destroy_engine(PL_engine_t e)
6579 { fail;
6580 }
6581 
6582 void
PL_cleanup_fork(void)6583 PL_cleanup_fork(void)
6584 {
6585 }
6586 
6587 double
ThreadCPUTime(PL_local_data_t * ld,int which)6588 ThreadCPUTime(PL_local_data_t *ld, int which) {
6589   return CpuTime(which);
6590 }
6591 
6592 void
initPrologThreads()6593 initPrologThreads()
6594 {					/* TBD: only once? */
6595 #ifdef O_MULTIPLE_ENGINES
6596   PL_current_engine_ptr = &PL_local_data;
6597 #endif
6598 }
6599 
6600 int
PL_get_thread_alias(int tid,atom_t * alias)6601 PL_get_thread_alias(int tid, atom_t *alias)
6602 { *alias = ATOM_main;
6603   return TRUE;
6604 }
6605 
6606 #endif  /*O_PLMT*/
6607 
6608 int
get_prop_def(term_t t,atom_t expected,const tprop * list,const tprop ** def)6609 get_prop_def(term_t t, atom_t expected, const tprop *list, const tprop **def)
6610 { GET_LD
6611   functor_t f;
6612 
6613   if ( PL_get_functor(t, &f) )
6614   { const tprop *p = list;
6615 
6616     for( ; p->functor; p++ )
6617     { if ( f == p->functor )
6618       { *def = p;
6619         return TRUE;
6620       }
6621     }
6622 
6623     PL_error(NULL, 0, NULL, ERR_DOMAIN, expected, t);
6624     return -1;
6625   }
6626 
6627   if ( PL_is_variable(t) )
6628     return 0;
6629 
6630   PL_error(NULL, 0, NULL, ERR_TYPE, expected, t);
6631   return -1;
6632 }
6633 
6634 
6635 		 /*******************************
6636 		 *	     STATISTICS		*
6637 		 *******************************/
6638 
6639 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6640 Find the summed size of the local stack.   This is a measure for the CGC
6641 marking cost.
6642 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6643 
6644 int
cgc_thread_stats(cgc_stats * stats ARG_LD)6645 cgc_thread_stats(cgc_stats *stats ARG_LD)
6646 {
6647 #ifdef O_PLMT
6648   int i;
6649 
6650   for(i=1; i<=GD->thread.highest_id; i++)
6651   { PL_thread_info_t *info = GD->thread.threads[i];
6652     PL_local_data_t *ld = acquire_ldata(info);
6653 
6654     if ( ld )
6655     { char *ltop  = (char*)ld->stacks.local.top;
6656       char *lbase = (char*)ld->stacks.local.base;
6657 
6658       if ( ltop > lbase )
6659       { stats->local_size += ltop-lbase;
6660 	stats->threads++;
6661 	stats->erased_skipped += ld->clauses.erased_skipped;
6662       }
6663       release_ldata(ld);
6664     }
6665     if ( GD->clauses.cgc_active )
6666       return FALSE;
6667   }
6668 #else
6669   stats->local_size = usedStack(local);
6670   stats->threads = 1;
6671   stats->erased_skipped = LD->clauses.erased_skipped;
6672 #endif
6673 
6674   return TRUE;
6675 }
6676 
6677 
6678 		 /*******************************
6679 		 *    HASH-TABLE KVS IN USE     *
6680 		 *******************************/
6681 
6682 int
pl_kvs_in_use(KVS kvs)6683 pl_kvs_in_use(KVS kvs)
6684 {
6685 #ifdef O_PLMT
6686   int i;
6687 
6688   for(i=1; i<=GD->thread.highest_id; i++)
6689   { PL_thread_info_t *info = GD->thread.threads[i];
6690     if ( info && info->access.kvs == kvs )
6691     { return TRUE;
6692     }
6693   }
6694 #endif
6695 
6696   return FALSE;
6697 }
6698 
6699 
6700 		 /*******************************
6701 		 *      ATOM-TABLE IN USE       *
6702 		 *******************************/
6703 
6704 int
pl_atom_table_in_use(AtomTable atom_table)6705 pl_atom_table_in_use(AtomTable atom_table)
6706 {
6707 #ifdef O_PLMT
6708   int i;
6709 
6710   for(i=1; i<=GD->thread.highest_id; i++)
6711   { PL_thread_info_t *info = GD->thread.threads[i];
6712     if ( info && info->access.atom_table == atom_table )
6713     { return TRUE;
6714     }
6715   }
6716 #endif
6717 
6718   return FALSE;
6719 }
6720 
6721 
6722 int
pl_atom_bucket_in_use(Atom * bucket)6723 pl_atom_bucket_in_use(Atom *bucket)
6724 {
6725 #ifdef O_PLMT
6726   int i;
6727 
6728   for(i=1; i<=GD->thread.highest_id; i++)
6729   { PL_thread_info_t *info = GD->thread.threads[i];
6730     if ( info && info->access.atom_bucket == bucket )
6731     { return TRUE;
6732     }
6733   }
6734 #endif
6735 
6736   return FALSE;
6737 }
6738 
6739 
6740 #ifdef O_PLMT
6741 static int
ldata_in_use(PL_local_data_t * ld)6742 ldata_in_use(PL_local_data_t *ld)
6743 { int i;
6744 
6745   for(i=1; i<=GD->thread.highest_id; i++)
6746   { PL_thread_info_t *info = GD->thread.threads[i];
6747     if ( info && info->access.ldata == ld )
6748     { return TRUE;
6749     }
6750   }
6751 
6752   return FALSE;
6753 }
6754 #endif
6755 
6756 
6757 Atom**
pl_atom_buckets_in_use(void)6758 pl_atom_buckets_in_use(void)
6759 {
6760 #ifdef O_PLMT
6761   int i, index=0;
6762   size_t sz = 32;
6763 
6764   Atom **buckets = allocHeapOrHalt(sz * sizeof(Atom*));
6765   memset(buckets, 0, sz * sizeof(Atom*));
6766 
6767   for(i=1; i<=GD->thread.highest_id; i++)
6768   { PL_thread_info_t *info = GD->thread.threads[i];
6769     if ( info && info->access.atom_bucket )
6770     { if ( index >= sz-1 )
6771       { int j = 0;
6772         size_t oldsz = sz;
6773         sz *= 2;
6774         Atom **newbuckets = allocHeapOrHalt(sz * sizeof(Atom*));
6775 	memset(newbuckets, 0, sz * sizeof(Atom*));
6776 	for ( ; j < oldsz; j++ )
6777 	{ newbuckets[j] = buckets[j];
6778 	}
6779 	PL_free(buckets);
6780         buckets = newbuckets;
6781       }
6782       buckets[index] = info->access.atom_bucket;
6783       if ( buckets[index] )	/* atom_bucket may have been released */
6784         index++;
6785     }
6786   }
6787 
6788   return buckets;
6789 #endif
6790 
6791   return NULL;
6792 }
6793 
6794 
6795 Definition*
predicates_in_use(void)6796 predicates_in_use(void)
6797 {
6798 #ifdef O_PLMT
6799   int i, index=0;
6800   size_t sz = 32;
6801 
6802   Definition *buckets = allocHeapOrHalt(sz * sizeof(Definition));
6803   memset(buckets, 0, sz * sizeof(Definition*));
6804 
6805   for(i=1; i<=GD->thread.highest_id; i++)
6806   { PL_thread_info_t *info = GD->thread.threads[i];
6807     if ( info && info->access.predicate )
6808     { if ( index >= sz-1 )
6809       { int j = 0;
6810         size_t oldsz = sz;
6811         sz *= 2;
6812         Definition *newbuckets = allocHeapOrHalt(sz * sizeof(Definition));
6813 	memset(newbuckets, 0, sz * sizeof(Definition));
6814 	for ( ; j < oldsz; j++ )
6815 	{ newbuckets[j] = buckets[j];
6816 	}
6817 	PL_free(buckets);
6818         buckets = newbuckets;
6819       }
6820       buckets[index] = info->access.predicate;
6821       if ( buckets[index] )	/* atom_bucket may have been released */
6822         index++;
6823     }
6824   }
6825 
6826   return buckets;
6827 #endif
6828 
6829   return NULL;
6830 }
6831 
6832 
6833 		 /*******************************
6834 		 *     FUNCTOR-TABLE IN USE     *
6835 		 *******************************/
6836 
6837 int
pl_functor_table_in_use(FunctorTable functor_table)6838 pl_functor_table_in_use(FunctorTable functor_table)
6839 {
6840 #ifdef O_PLMT
6841   int me = PL_thread_self();
6842   int i;
6843 
6844   for(i=1; i<=GD->thread.highest_id; i++)
6845   { PL_thread_info_t *info = GD->thread.threads[i];
6846     if ( i != me && info && info->access.functor_table == functor_table )
6847     { return TRUE;
6848     }
6849   }
6850 #endif
6851 
6852   return FALSE;
6853 }
6854 
6855 		 /*******************************
6856 		 * CLAUSE/3 PREDICATE REFERENCES*
6857 		 *******************************/
6858 
6859 #ifdef O_PLMT
6860 
6861 static void
init_predicate_references(PL_local_data_t * ld)6862 init_predicate_references(PL_local_data_t *ld)
6863 { definition_refs *refs = &ld->predicate_references;
6864 
6865   memset(refs, 0, sizeof(*refs));
6866   refs->blocks[0] = refs->preallocated - 1;
6867   refs->blocks[1] = refs->preallocated - 1;
6868   refs->blocks[2] = refs->preallocated - 1;
6869 }
6870 
6871 static void
free_predicate_references(PL_local_data_t * ld)6872 free_predicate_references(PL_local_data_t *ld)
6873 { definition_refs *refs = &ld->predicate_references;
6874   int i;
6875 
6876   for(i=3; i<MAX_BLOCKS; i++)
6877   { size_t bs = (size_t)1<<i;
6878     definition_ref *d0 = refs->blocks[i];
6879 
6880     if ( d0 )
6881       freeHeap(d0+bs, bs*sizeof(definition_ref));
6882   }
6883 }
6884 #endif /*O_PLMT*/
6885 
6886 static void
cgcActivatePredicate__LD(Definition def,gen_t gen ARG_LD)6887 cgcActivatePredicate__LD(Definition def, gen_t gen ARG_LD)
6888 { DirtyDefInfo ddi;
6889 
6890   if ( (ddi=lookupHTable(GD->procedures.dirty, def)) )
6891     ddi_add_access_gen(ddi, gen);
6892 }
6893 
6894 
6895 definition_ref *
pushPredicateAccessObj(Definition def ARG_LD)6896 pushPredicateAccessObj(Definition def ARG_LD)
6897 { definition_refs *refs = &LD->predicate_references;
6898   definition_ref *dref;
6899   size_t top = refs->top+1;
6900   size_t idx = MSB(top);
6901 
6902   DEBUG(MSG_CGC_PRED_REF,
6903 	Sdprintf("pushPredicateAccess(%s)\n", predicateName(def)));
6904 
6905   if ( !refs->blocks[idx] )
6906   { size_t bs = (size_t)1<<idx;
6907     definition_ref *newblock;
6908 
6909     if ( !(newblock=PL_malloc_uncollectable(bs*sizeof(definition_ref))) )
6910       outOfCore();
6911 
6912     memset(newblock, 0, bs*sizeof(definition_ref));
6913     if ( !COMPARE_AND_SWAP_PTR(&refs->blocks[idx], NULL, newblock-bs) )
6914       PL_free(newblock);
6915   }
6916 
6917   enterDefinition(def);			/* probably not needed in the end */
6918   dref = &refs->blocks[idx][top];
6919   dref->predicate  = def;
6920   dref->generation = global_generation();
6921   refs->top = top;
6922   do
6923   { dref->generation = global_generation();
6924   } while ( dref->generation != global_generation() );
6925 
6926   return dref;
6927 }
6928 
6929 
6930 gen_t
pushPredicateAccess__LD(Definition def ARG_LD)6931 pushPredicateAccess__LD(Definition def ARG_LD)
6932 { return pushPredicateAccessObj(def PASS_LD)->generation;
6933 }
6934 
6935 
6936 static definition_ref *
def_ref(const definition_refs * refs,size_t top)6937 def_ref(const definition_refs *refs, size_t top)
6938 { size_t idx = MSB(top);
6939 
6940   return &refs->blocks[idx][top];
6941 }
6942 
6943 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
6944 Note that out-of-order access is possible.   Matt  Lilley created a case
6945 where the term_expansion/2 rule for end_of_file loads source files using
6946 forall(clause(chain_modules(M)),     use_module(M)).     This      locks
6947 chain_modules/1. The created system:'$load_context_module/2'   fact gets
6948 asserted to the load  context  and  is   popped  when  loading  the file
6949 completes, while the reference to  chain_modules/1   is  popped when the
6950 enumeration completes.
6951 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
6952 
6953 void
popPredicateAccess__LD(Definition def ARG_LD)6954 popPredicateAccess__LD(Definition def ARG_LD)
6955 { definition_refs *refs = &LD->predicate_references;
6956   definition_ref *dref;
6957 
6958   DEBUG(MSG_CGC_PRED_REF,
6959 	Sdprintf("popPredicateAccess(%s)\n", predicateName(def)));
6960 
6961   dref = def_ref(refs, refs->top);
6962   if ( dref->predicate == def )
6963   { dref->predicate  = NULL;
6964     dref->generation = 0;
6965   } else
6966   { size_t top;
6967 
6968     DEBUG(MSG_CGC_PRED_REF, Sdprintf("  Out or order!\n"));
6969     for(top = refs->top; top > 0; top-- )
6970     { dref = def_ref(refs, top);
6971       if ( dref->predicate == def )
6972       { for(; top < refs->top; top++)
6973 	{ definition_ref *dr2 = def_ref(refs, top+1);
6974 
6975 	  *dref = *dr2;
6976 	  dref = dr2;
6977 	}
6978 	goto out;
6979       }
6980     }
6981     assert(0);
6982   }
6983 out:
6984   leaveDefinition(def);			/* probably not needed in the end */
6985 
6986   refs->top--;
6987 }
6988 
6989 size_t
popNPredicateAccess__LD(size_t n ARG_LD)6990 popNPredicateAccess__LD(size_t n ARG_LD)
6991 { definition_refs *refs = &LD->predicate_references;
6992 
6993   DEBUG(MSG_CGC_PRED_REF,
6994 	Sdprintf("popNPredicateAccess(%d)\n", (int)n));
6995 
6996   while(n-- > 0)
6997   { definition_ref *dref;
6998     size_t top = refs->top;
6999     size_t idx = MSB(top);
7000 
7001     dref = &refs->blocks[idx][top];
7002     DEBUG(MSG_CGC_PRED_REF,
7003 	  Sdprintf("  -- %s\n", predicateName(dref->predicate)));
7004     leaveDefinition(dref->predicate);
7005     dref->predicate  = NULL;
7006     dref->generation = 0;
7007 
7008     refs->top--;
7009   }
7010 
7011   return refs->top;
7012 }
7013 
7014 
7015 static inline int
is_pointer_like(void * ptr)7016 is_pointer_like(void *ptr)
7017 {
7018 #if SIZEOF_VOIDP == 4
7019   intptr_t mask = 0x3;
7020 #elif SIZEOF_VOIDP == 8
7021   intptr_t mask = 0x7;
7022 #else
7023 #error "Unknown pointer size"
7024 #endif
7025   return ptr && ((intptr_t)ptr&mask) == 0;
7026 }
7027 
7028 void
markAccessedPredicates(PL_local_data_t * ld)7029 markAccessedPredicates(PL_local_data_t *ld)
7030 { GET_LD
7031   definition_refs *refs = &ld->predicate_references;
7032   size_t i;
7033 
7034   for(i=1; i<=refs->top; i++)
7035   { int idx = MSB(i);
7036     volatile definition_ref *drefp = &refs->blocks[idx][i];
7037     definition_ref dref = *drefp;	/* struct copy */
7038 
7039     if ( is_pointer_like(dref.predicate) )
7040       cgcActivatePredicate__LD(dref.predicate, dref.generation PASS_LD);
7041   }
7042 }
7043 
7044 		 /*******************************
7045 		 *      PUBLISH PREDICATES	*
7046 		 *******************************/
7047 
7048 #define NDET PL_FA_NONDETERMINISTIC
7049 
7050 BeginPredDefs(thread)
7051 #ifdef O_PLMT
7052   PRED_DEF("thread_alias",           1, thread_alias,	       0)
7053   PRED_DEF("thread_detach",	     1,	thread_detach,	       PL_FA_ISO)
7054   PRED_DEF("thread_join",	     2,	thread_join,	       0)
7055   PRED_DEF("thread_statistics",	     3,	thread_statistics,     0)
7056   PRED_DEF("thread_property",	     2,	thread_property,       NDET|PL_FA_ISO)
7057   PRED_DEF("is_thread",		     1,	is_thread,	       0)
7058   PRED_DEF("$thread_sigwait",	     1, thread_sigwait,	       0)
7059 #ifdef HAVE_PRED_THREAD_AFFINITY
7060   PRED_DEF("thread_affinity",        3, thread_affinity,       0)
7061 #endif
7062 
7063   PRED_DEF("message_queue_create",   1,	message_queue_create,  0)
7064   PRED_DEF("message_queue_create",   2,	message_queue_create2, PL_FA_ISO)
7065   PRED_DEF("message_queue_property", 2,	message_property,      NDET|PL_FA_ISO)
7066   PRED_DEF("message_queue_set",      2, message_queue_set,     0)
7067 
7068   PRED_DEF("thread_send_message",    2,	thread_send_message,   PL_FA_ISO)
7069   PRED_DEF("thread_send_message",    3,	thread_send_message,   0)
7070   PRED_DEF("thread_get_message",     1,	thread_get_message,    PL_FA_ISO)
7071   PRED_DEF("thread_get_message",     2,	thread_get_message,    PL_FA_ISO)
7072   PRED_DEF("thread_get_message",     3,	thread_get_message,    PL_FA_ISO)
7073   PRED_DEF("thread_peek_message",    1,	thread_peek_message_1, PL_FA_ISO)
7074   PRED_DEF("thread_peek_message",    2,	thread_peek_message_2, PL_FA_ISO)
7075   PRED_DEF("message_queue_destroy",  1,	message_queue_destroy, PL_FA_ISO)
7076   PRED_DEF("thread_setconcurrency",  2,	thread_setconcurrency, 0)
7077 
7078   PRED_DEF("$engine_create",	     3,	engine_create,	       0)
7079   PRED_DEF("engine_destroy",	     1,	engine_destroy,	       0)
7080   PRED_DEF("engine_next",	     2,	engine_next,	       0)
7081   PRED_DEF("engine_post",	     2,	engine_post,	       0)
7082   PRED_DEF("engine_post",	     3,	engine_post,	       0)
7083   PRED_DEF("engine_fetch",	     1, engine_fetch,	       0)
7084   PRED_DEF("is_engine",		     1,	is_engine,	       0)
7085 
7086   PRED_DEF("mutex_statistics",	     0,	mutex_statistics,      0)
7087 
7088   PRED_DEF("$thread_local_clause_count", 3, thread_local_clause_count, 0)
7089   PRED_DEF("$gc_wait",               1, gc_wait,               0)
7090   PRED_DEF("$gc_clear",              1, gc_clear,              0)
7091   PRED_DEF("$gc_stop",               0, gc_stop,               0)
7092 #endif
7093 EndPredDefs
7094