1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2021 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15 
16 #include "portable.h"
17 
18 #include <stdio.h>
19 
20 #include <ac/signal.h>
21 #include <ac/stdarg.h>
22 #include <ac/stdlib.h>
23 #include <ac/string.h>
24 #include <ac/time.h>
25 #include <ac/errno.h>
26 
27 #include "ldap-int.h"
28 #include "ldap_pvt_thread.h" /* Get the thread interface */
29 #include "ldap_queue.h"
30 #define LDAP_THREAD_POOL_IMPLEMENTATION
31 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
32 
33 #ifndef LDAP_THREAD_HAVE_TPOOL
34 
35 /* Thread-specific key with data and optional free function */
36 typedef struct ldap_int_tpool_key_s {
37 	void *ltk_key;
38 	void *ltk_data;
39 	ldap_pvt_thread_pool_keyfree_t *ltk_free;
40 } ldap_int_tpool_key_t;
41 
42 /* Max number of thread-specific keys we store per thread.
43  * We don't expect to use many...
44  */
45 #define	MAXKEYS	32
46 
47 /* Max number of threads */
48 #define	LDAP_MAXTHR	1024	/* must be a power of 2 */
49 
50 /* (Theoretical) max number of pending requests */
51 #define MAX_PENDING (INT_MAX/2)	/* INT_MAX - (room to avoid overflow) */
52 
53 /* pool->ltp_pause values */
54 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
55 
56 /* Context: thread ID and thread-specific key/data pairs */
57 typedef struct ldap_int_thread_userctx_s {
58 	ldap_pvt_thread_t ltu_id;
59 	ldap_int_tpool_key_t ltu_key[MAXKEYS];
60 } ldap_int_thread_userctx_t;
61 
62 
63 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
64  * Protected by ldap_pvt_thread_pool_mutex except during pauses,
65  * when it is read-only (used by pool_purgekey and pool_context).
66  * Protected by tpool->ltp_mutex during pauses.
67  */
68 static struct {
69 	ldap_int_thread_userctx_t *ctx;
70 	/* ctx is valid when not NULL or DELETED_THREAD_CTX */
71 #	define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
72 } thread_keys[LDAP_MAXTHR];
73 
74 #define	TID_HASH(tid, hash) do { \
75 	unsigned const char *ptr_ = (unsigned const char *)&(tid); \
76 	unsigned i_; \
77 	for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
78 		(hash) += ((hash) << 5) ^ ptr_[i_]; \
79 } while(0)
80 
81 
82 /* Task for a thread to perform */
83 typedef struct ldap_int_thread_task_s {
84 	union {
85 		LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
86 		LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
87 	} ltt_next;
88 	ldap_pvt_thread_start_t *ltt_start_routine;
89 	void *ltt_arg;
90 } ldap_int_thread_task_t;
91 
92 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
93 
94 struct ldap_int_thread_pool_s {
95 	LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
96 
97 	/* protect members below, and protect thread_keys[] during pauses */
98 	ldap_pvt_thread_mutex_t ltp_mutex;
99 
100 	/* not paused and something to do for pool_<wrapper/pause/destroy>() */
101 	ldap_pvt_thread_cond_t ltp_cond;
102 
103 	/* ltp_active_count <= 1 && ltp_pause */
104 	ldap_pvt_thread_cond_t ltp_pcond;
105 
106 	/* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
107 	 * maintaned to reduce work for pool_wrapper()
108 	 */
109 	ldap_int_tpool_plist_t *ltp_work_list;
110 
111 	/* pending tasks, and unused task objects */
112 	ldap_int_tpool_plist_t ltp_pending_list;
113 	LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
114 
115 	/* The pool is finishing, waiting for its threads to close.
116 	 * They close when ltp_pending_list is done.  pool_submit()
117 	 * rejects new tasks.  ltp_max_pending = -(its old value).
118 	 */
119 	int ltp_finishing;
120 
121 	/* Some active task needs to be the sole active task.
122 	 * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
123 	 * Note: Pauses adjust ltp_<open_count/vary_open_count/work_list>,
124 	 * so pool_<submit/wrapper>() mostly can avoid testing ltp_pause.
125 	 */
126 	volatile sig_atomic_t ltp_pause;
127 
128 	/* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */
129 	int ltp_max_count;
130 
131 	/* Max pending + paused + idle tasks, negated when ltp_finishing */
132 	int ltp_max_pending;
133 
134 	int ltp_pending_count;		/* Pending + paused + idle tasks */
135 	int ltp_active_count;		/* Active, not paused/idle tasks */
136 	int ltp_open_count;			/* Number of threads, negated when ltp_pause */
137 	int ltp_starting;			/* Currenlty starting threads */
138 
139 	/* >0 if paused or we may open a thread, <0 if we should close a thread.
140 	 * Updated when ltp_<finishing/pause/max_count/open_count> change.
141 	 * Maintained to reduce the time ltp_mutex must be locked in
142 	 * ldap_pvt_thread_pool_<submit/wrapper>().
143 	 */
144 	int ltp_vary_open_count;
145 #	define SET_VARY_OPEN_COUNT(pool)	\
146 		((pool)->ltp_vary_open_count =	\
147 		 (pool)->ltp_pause      ?  1 :	\
148 		 (pool)->ltp_finishing  ? -1 :	\
149 		 ((pool)->ltp_max_count ? (pool)->ltp_max_count : LDAP_MAXTHR) \
150 		 - (pool)->ltp_open_count)
151 };
152 
153 static ldap_int_tpool_plist_t empty_pending_list =
154 	LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
155 
156 static int ldap_int_has_thread_pool = 0;
157 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
158 	ldap_int_thread_pool_list =
159 	LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
160 
161 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
162 
163 static void *ldap_int_thread_pool_wrapper( void *pool );
164 
165 static ldap_pvt_thread_key_t	ldap_tpool_key;
166 
167 /* Context of the main thread */
168 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
169 
170 int
ldap_int_thread_pool_startup(void)171 ldap_int_thread_pool_startup ( void )
172 {
173 	ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
174 	ldap_pvt_thread_key_create( &ldap_tpool_key );
175 	return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
176 }
177 
178 int
ldap_int_thread_pool_shutdown(void)179 ldap_int_thread_pool_shutdown ( void )
180 {
181 	struct ldap_int_thread_pool_s *pool;
182 
183 	while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
184 		(ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
185 	}
186 	ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
187 	ldap_pvt_thread_key_destroy( ldap_tpool_key );
188 	return(0);
189 }
190 
191 
192 /* Create a thread pool */
193 int
ldap_pvt_thread_pool_init(ldap_pvt_thread_pool_t * tpool,int max_threads,int max_pending)194 ldap_pvt_thread_pool_init (
195 	ldap_pvt_thread_pool_t *tpool,
196 	int max_threads,
197 	int max_pending )
198 {
199 	ldap_pvt_thread_pool_t pool;
200 	int rc;
201 
202 	/* multiple pools are currently not supported (ITS#4943) */
203 	assert(!ldap_int_has_thread_pool);
204 
205 	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
206 		max_threads = 0;
207 	if (! (1 <= max_pending && max_pending <= MAX_PENDING))
208 		max_pending = MAX_PENDING;
209 
210 	*tpool = NULL;
211 	pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
212 		sizeof(struct ldap_int_thread_pool_s));
213 
214 	if (pool == NULL) return(-1);
215 
216 	rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
217 	if (rc != 0) {
218 fail1:
219 		LDAP_FREE(pool);
220 		return(rc);
221 	}
222 	rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
223 	if (rc != 0) {
224 fail2:
225 		ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
226 		goto fail1;
227 	}
228 	rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
229 	if (rc != 0) {
230 		ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
231 		goto fail2;
232 	}
233 
234 	ldap_int_has_thread_pool = 1;
235 
236 	pool->ltp_max_count = max_threads;
237 	SET_VARY_OPEN_COUNT(pool);
238 	pool->ltp_max_pending = max_pending;
239 
240 	LDAP_STAILQ_INIT(&pool->ltp_pending_list);
241 	pool->ltp_work_list = &pool->ltp_pending_list;
242 	LDAP_SLIST_INIT(&pool->ltp_free_list);
243 
244 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
245 	LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
246 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
247 
248 	/* Start no threads just yet.  That can break if the process forks
249 	 * later, as slapd does in order to daemonize.  On at least POSIX,
250 	 * only the forking thread would survive in the child.  Yet fork()
251 	 * can't unlock/clean up other threads' locks and data structures,
252 	 * unless pthread_atfork() handlers have been set up to do so.
253 	 */
254 
255 	*tpool = pool;
256 	return(0);
257 }
258 
259 
260 /* Submit a task to be performed by the thread pool */
261 int
ldap_pvt_thread_pool_submit(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start_routine,void * arg)262 ldap_pvt_thread_pool_submit (
263 	ldap_pvt_thread_pool_t *tpool,
264 	ldap_pvt_thread_start_t *start_routine, void *arg )
265 {
266 	struct ldap_int_thread_pool_s *pool;
267 	ldap_int_thread_task_t *task;
268 	ldap_pvt_thread_t thr;
269 
270 	if (tpool == NULL)
271 		return(-1);
272 
273 	pool = *tpool;
274 
275 	if (pool == NULL)
276 		return(-1);
277 
278 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
279 
280 	if (pool->ltp_pending_count >= pool->ltp_max_pending)
281 		goto failed;
282 
283 	task = LDAP_SLIST_FIRST(&pool->ltp_free_list);
284 	if (task) {
285 		LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
286 	} else {
287 		task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
288 		if (task == NULL)
289 			goto failed;
290 	}
291 
292 	task->ltt_start_routine = start_routine;
293 	task->ltt_arg = arg;
294 
295 	pool->ltp_pending_count++;
296 	LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, task, ltt_next.q);
297 
298 	/* true if ltp_pause != 0 or we should open (create) a thread */
299 	if (pool->ltp_vary_open_count > 0 &&
300 		pool->ltp_open_count < pool->ltp_active_count+pool->ltp_pending_count)
301 	{
302 		if (pool->ltp_pause)
303 			goto done;
304 
305 		pool->ltp_starting++;
306 		pool->ltp_open_count++;
307 		SET_VARY_OPEN_COUNT(pool);
308 
309 		if (0 != ldap_pvt_thread_create(
310 			&thr, 1, ldap_int_thread_pool_wrapper, pool))
311 		{
312 			/* couldn't create thread.  back out of
313 			 * ltp_open_count and check for even worse things.
314 			 */
315 			pool->ltp_starting--;
316 			pool->ltp_open_count--;
317 			SET_VARY_OPEN_COUNT(pool);
318 
319 			if (pool->ltp_open_count == 0) {
320 				/* no open threads at all?!?
321 				 */
322 				ldap_int_thread_task_t *ptr;
323 
324 				/* let pool_destroy know there are no more threads */
325 				ldap_pvt_thread_cond_signal(&pool->ltp_cond);
326 
327 				LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltt_next.q)
328 					if (ptr == task) break;
329 				if (ptr == task) {
330 					/* no open threads, task not handled, so
331 					 * back out of ltp_pending_count, free the task,
332 					 * report the error.
333 					 */
334 					pool->ltp_pending_count--;
335 					LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, task,
336 						ldap_int_thread_task_s, ltt_next.q);
337 					LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task,
338 						ltt_next.l);
339 					goto failed;
340 				}
341 			}
342 			/* there is another open thread, so this
343 			 * task will be handled eventually.
344 			 */
345 		}
346 	}
347 	ldap_pvt_thread_cond_signal(&pool->ltp_cond);
348 
349  done:
350 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
351 	return(0);
352 
353  failed:
354 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
355 	return(-1);
356 }
357 
358 static void *
no_task(void * ctx,void * arg)359 no_task( void *ctx, void *arg )
360 {
361 	return NULL;
362 }
363 
364 /* Cancel a pending task that was previously submitted.
365  * Return 1 if the task was successfully cancelled, 0 if
366  * not found, -1 for invalid parameters
367  */
368 int
ldap_pvt_thread_pool_retract(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start_routine,void * arg)369 ldap_pvt_thread_pool_retract (
370 	ldap_pvt_thread_pool_t *tpool,
371 	ldap_pvt_thread_start_t *start_routine, void *arg )
372 {
373 	struct ldap_int_thread_pool_s *pool;
374 	ldap_int_thread_task_t *task;
375 
376 	if (tpool == NULL)
377 		return(-1);
378 
379 	pool = *tpool;
380 
381 	if (pool == NULL)
382 		return(-1);
383 
384 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
385 	LDAP_STAILQ_FOREACH(task, &pool->ltp_pending_list, ltt_next.q)
386 		if (task->ltt_start_routine == start_routine &&
387 			task->ltt_arg == arg) {
388 			/* Could LDAP_STAILQ_REMOVE the task, but that
389 			 * walks ltp_pending_list again to find it.
390 			 */
391 			task->ltt_start_routine = no_task;
392 			task->ltt_arg = NULL;
393 			break;
394 		}
395 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
396 	return task != NULL;
397 }
398 
399 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
400 int
ldap_pvt_thread_pool_maxthreads(ldap_pvt_thread_pool_t * tpool,int max_threads)401 ldap_pvt_thread_pool_maxthreads(
402 	ldap_pvt_thread_pool_t *tpool,
403 	int max_threads )
404 {
405 	struct ldap_int_thread_pool_s *pool;
406 
407 	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
408 		max_threads = 0;
409 
410 	if (tpool == NULL)
411 		return(-1);
412 
413 	pool = *tpool;
414 
415 	if (pool == NULL)
416 		return(-1);
417 
418 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
419 
420 	pool->ltp_max_count = max_threads;
421 	SET_VARY_OPEN_COUNT(pool);
422 
423 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
424 	return(0);
425 }
426 
427 /* Inspect the pool */
428 int
ldap_pvt_thread_pool_query(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_pool_param_t param,void * value)429 ldap_pvt_thread_pool_query(
430 	ldap_pvt_thread_pool_t *tpool,
431 	ldap_pvt_thread_pool_param_t param,
432 	void *value )
433 {
434 	struct ldap_int_thread_pool_s	*pool;
435 	int				count = -1;
436 
437 	if ( tpool == NULL || value == NULL ) {
438 		return -1;
439 	}
440 
441 	pool = *tpool;
442 
443 	if ( pool == NULL ) {
444 		return 0;
445 	}
446 
447 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
448 	switch ( param ) {
449 	case LDAP_PVT_THREAD_POOL_PARAM_MAX:
450 		count = pool->ltp_max_count;
451 		break;
452 
453 	case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
454 		count = pool->ltp_max_pending;
455 		if (count < 0)
456 			count = -count;
457 		if (count == MAX_PENDING)
458 			count = 0;
459 		break;
460 
461 	case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
462 		count = pool->ltp_open_count;
463 		if (count < 0)
464 			count = -count;
465 		break;
466 
467 	case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
468 		count = pool->ltp_starting;
469 		break;
470 
471 	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
472 		count = pool->ltp_active_count;
473 		break;
474 
475 	case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
476 		count = (pool->ltp_pause != 0);
477 		break;
478 
479 	case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
480 		count = pool->ltp_pending_count;
481 		break;
482 
483 	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
484 		count = pool->ltp_pending_count + pool->ltp_active_count;
485 		break;
486 
487 	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
488 		break;
489 
490 	case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
491 		break;
492 
493 	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
494 		break;
495 
496 	case LDAP_PVT_THREAD_POOL_PARAM_STATE:
497 		*((char **)value) =
498 			pool->ltp_pause ? "pausing" :
499 			!pool->ltp_finishing ? "running" :
500 			pool->ltp_pending_count ? "finishing" : "stopping";
501 		break;
502 
503 	case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
504 		break;
505 	}
506 	ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );
507 
508 	if ( count > -1 ) {
509 		*((int *)value) = count;
510 	}
511 
512 	return ( count == -1 ? -1 : 0 );
513 }
514 
515 /*
516  * true if pool is pausing; does not lock any mutex to check.
517  * 0 if not pause, 1 if pause, -1 if error or no pool.
518  */
519 int
ldap_pvt_thread_pool_pausing(ldap_pvt_thread_pool_t * tpool)520 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
521 {
522 	int rc = -1;
523 	struct ldap_int_thread_pool_s *pool;
524 
525 	if ( tpool != NULL && (pool = *tpool) != NULL ) {
526 		rc = (pool->ltp_pause != 0);
527 	}
528 
529 	return rc;
530 }
531 
532 /*
533  * wrapper for ldap_pvt_thread_pool_query(), left around
534  * for backwards compatibility
535  */
536 int
ldap_pvt_thread_pool_backload(ldap_pvt_thread_pool_t * tpool)537 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
538 {
539 	int	rc, count;
540 
541 	rc = ldap_pvt_thread_pool_query( tpool,
542 		LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
543 
544 	if ( rc == 0 ) {
545 		return count;
546 	}
547 
548 	return rc;
549 }
550 
551 /* Destroy the pool after making its threads finish */
552 int
ldap_pvt_thread_pool_destroy(ldap_pvt_thread_pool_t * tpool,int run_pending)553 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
554 {
555 	struct ldap_int_thread_pool_s *pool, *pptr;
556 	ldap_int_thread_task_t *task;
557 
558 	if (tpool == NULL)
559 		return(-1);
560 
561 	pool = *tpool;
562 
563 	if (pool == NULL) return(-1);
564 
565 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
566 	LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
567 		if (pptr == pool) break;
568 	if (pptr == pool)
569 		LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
570 			ldap_int_thread_pool_s, ltp_next);
571 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
572 
573 	if (pool != pptr) return(-1);
574 
575 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
576 
577 	pool->ltp_finishing = 1;
578 	SET_VARY_OPEN_COUNT(pool);
579 	if (pool->ltp_max_pending > 0)
580 		pool->ltp_max_pending = -pool->ltp_max_pending;
581 
582 	if (!run_pending) {
583 		while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) {
584 			LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q);
585 			LDAP_FREE(task);
586 		}
587 		pool->ltp_pending_count = 0;
588 	}
589 
590 	while (pool->ltp_open_count) {
591 		if (!pool->ltp_pause)
592 			ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
593 		ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
594 	}
595 
596 	while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
597 	{
598 		LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l);
599 		LDAP_FREE(task);
600 	}
601 
602 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
603 	ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
604 	ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
605 	ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
606 	LDAP_FREE(pool);
607 	*tpool = NULL;
608 	ldap_int_has_thread_pool = 0;
609 	return(0);
610 }
611 
612 /* Thread loop.  Accept and handle submitted tasks. */
613 static void *
ldap_int_thread_pool_wrapper(void * xpool)614 ldap_int_thread_pool_wrapper (
615 	void *xpool )
616 {
617 	struct ldap_int_thread_pool_s *pool = xpool;
618 	ldap_int_thread_task_t *task;
619 	ldap_int_tpool_plist_t *work_list;
620 	ldap_int_thread_userctx_t ctx, *kctx;
621 	unsigned i, keyslot, hash;
622 
623 	assert(pool != NULL);
624 
625 	for ( i=0; i<MAXKEYS; i++ ) {
626 		ctx.ltu_key[i].ltk_key = NULL;
627 	}
628 
629 	ctx.ltu_id = ldap_pvt_thread_self();
630 	TID_HASH(ctx.ltu_id, hash);
631 
632 	ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
633 
634 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
635 
636 	/* thread_keys[] is read-only when paused */
637 	while (pool->ltp_pause)
638 		ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
639 
640 	/* find a key slot to give this thread ID and store a
641 	 * pointer to our keys there; start at the thread ID
642 	 * itself (mod LDAP_MAXTHR) and look for an empty slot.
643 	 */
644 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
645 	for (keyslot = hash & (LDAP_MAXTHR-1);
646 		(kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
647 		keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
648 	thread_keys[keyslot].ctx = &ctx;
649 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
650 
651 	pool->ltp_starting--;
652 	pool->ltp_active_count++;
653 
654 	for (;;) {
655 		work_list = pool->ltp_work_list; /* help the compiler a bit */
656 		task = LDAP_STAILQ_FIRST(work_list);
657 		if (task == NULL) {	/* paused or no pending tasks */
658 			if (--(pool->ltp_active_count) < 2) {
659 				/* Notify pool_pause it is the sole active thread. */
660 				ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
661 			}
662 
663 			do {
664 				if (pool->ltp_vary_open_count < 0) {
665 					/* Not paused, and either finishing or too many
666 					 * threads running (can happen if ltp_max_count
667 					 * was reduced).  Let this thread die.
668 					 */
669 					goto done;
670 				}
671 
672 				/* We could check an idle timer here, and let the
673 				 * thread die if it has been inactive for a while.
674 				 * Only die if there are other open threads (i.e.,
675 				 * always have at least one thread open).
676 				 * The check should be like this:
677 				 *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
678 				 *       check timer, wait if ltp_pause, leave thread;
679 				 *
680 				 * Just use pthread_cond_timedwait() if we want to
681 				 * check idle time.
682 				 */
683 				ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
684 
685 				work_list = pool->ltp_work_list;
686 				task = LDAP_STAILQ_FIRST(work_list);
687 			} while (task == NULL);
688 
689 			pool->ltp_active_count++;
690 		}
691 
692 		LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
693 		pool->ltp_pending_count--;
694 		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
695 
696 		task->ltt_start_routine(&ctx, task->ltt_arg);
697 
698 		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
699 		LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, ltt_next.l);
700 	}
701  done:
702 
703 	assert(!pool->ltp_pause); /* thread_keys writable, ltp_open_count >= 0 */
704 
705 	/* The ltp_mutex lock protects ctx->ltu_key from pool_purgekey()
706 	 * during this call, since it prevents new pauses. */
707 	ldap_pvt_thread_pool_context_reset(&ctx);
708 
709 	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
710 	thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
711 	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
712 
713 	pool->ltp_open_count--;
714 	SET_VARY_OPEN_COUNT(pool);
715 	/* let pool_destroy know we're all done */
716 	if (pool->ltp_open_count == 0)
717 		ldap_pvt_thread_cond_signal(&pool->ltp_cond);
718 
719 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
720 
721 	ldap_pvt_thread_exit(NULL);
722 	return(NULL);
723 }
724 
725 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
726  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
727  */
728 #define GO_IDLE		8
729 #define GO_UNIDLE	16
730 #define CHECK_PAUSE	32	/* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
731 #define DO_PAUSE	64	/* CHECK_PAUSE; pause the pool */
732 #define PAUSE_ARG(a) \
733 		((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
734 
735 static int
handle_pause(ldap_pvt_thread_pool_t * tpool,int pause_type)736 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
737 {
738 	struct ldap_int_thread_pool_s *pool;
739 	int ret = 0, pause, max_ltp_pause;
740 
741 	if (tpool == NULL)
742 		return(-1);
743 
744 	pool = *tpool;
745 
746 	if (pool == NULL)
747 		return(0);
748 
749 	if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
750 		return(0);
751 
752 	/* Let pool_unidle() ignore requests for new pauses */
753 	max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
754 
755 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
756 
757 	pause = pool->ltp_pause;	/* NOT_PAUSED, WANT_PAUSE or PAUSED */
758 
759 	/* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
760 	pause_type -= pause;
761 
762 	if (pause_type & GO_IDLE) {
763 		pool->ltp_pending_count++;
764 		pool->ltp_active_count--;
765 		if (pause && pool->ltp_active_count < 2) {
766 			/* Tell the task waiting to DO_PAUSE it can proceed */
767 			ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
768 		}
769 	}
770 
771 	if (pause_type & GO_UNIDLE) {
772 		/* Wait out pause if any, then cancel GO_IDLE */
773 		if (pause > max_ltp_pause) {
774 			ret = 1;
775 			do {
776 				ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
777 			} while (pool->ltp_pause > max_ltp_pause);
778 		}
779 		pool->ltp_pending_count--;
780 		pool->ltp_active_count++;
781 	}
782 
783 	if (pause_type & DO_PAUSE) {
784 		/* Tell everyone else to pause or finish, then await that */
785 		ret = 0;
786 		assert(!pool->ltp_pause);
787 		pool->ltp_pause = WANT_PAUSE;
788 		/* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test,
789 		 * and do not finish threads in ldap_pvt_thread_pool_wrapper() */
790 		pool->ltp_open_count = -pool->ltp_open_count;
791 		SET_VARY_OPEN_COUNT(pool);
792 		/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
793 		pool->ltp_work_list = &empty_pending_list;
794 		/* Wait for this task to become the sole active task */
795 		while (pool->ltp_active_count > 1) {
796 			ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
797 		}
798 		assert(pool->ltp_pause == WANT_PAUSE);
799 		pool->ltp_pause = PAUSED;
800 	}
801 
802 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
803 	return(ret);
804 }
805 
806 /* Consider this task idle: It will not block pool_pause() in other tasks. */
807 void
ldap_pvt_thread_pool_idle(ldap_pvt_thread_pool_t * tpool)808 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
809 {
810 	handle_pause(tpool, PAUSE_ARG(GO_IDLE));
811 }
812 
813 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
814 void
ldap_pvt_thread_pool_unidle(ldap_pvt_thread_pool_t * tpool)815 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
816 {
817 	handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
818 }
819 
820 /*
821  * If a pause was requested, wait for it.  If several threads
822  * are waiting to pause, let through one or more pauses.
823  * The calling task must be active, not idle.
824  * Return 1 if we waited, 0 if not, -1 at parameter error.
825  */
826 int
ldap_pvt_thread_pool_pausecheck(ldap_pvt_thread_pool_t * tpool)827 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
828 {
829 	return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
830 }
831 
832 /*
833  * Pause the pool.  The calling task must be active, not idle.
834  * Return when all other tasks are paused or idle.
835  */
836 int
ldap_pvt_thread_pool_pause(ldap_pvt_thread_pool_t * tpool)837 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
838 {
839 	return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
840 }
841 
842 /* End a pause */
843 int
ldap_pvt_thread_pool_resume(ldap_pvt_thread_pool_t * tpool)844 ldap_pvt_thread_pool_resume (
845 	ldap_pvt_thread_pool_t *tpool )
846 {
847 	struct ldap_int_thread_pool_s *pool;
848 
849 	if (tpool == NULL)
850 		return(-1);
851 
852 	pool = *tpool;
853 
854 	if (pool == NULL)
855 		return(0);
856 
857 	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
858 
859 	assert(pool->ltp_pause == PAUSED);
860 	pool->ltp_pause = 0;
861 	if (pool->ltp_open_count <= 0) /* true when paused, but be paranoid */
862 		pool->ltp_open_count = -pool->ltp_open_count;
863 	SET_VARY_OPEN_COUNT(pool);
864 	pool->ltp_work_list = &pool->ltp_pending_list;
865 
866 	ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
867 
868 	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
869 	return(0);
870 }
871 
872 /*
873  * Get the key's data and optionally free function in the given context.
874  */
ldap_pvt_thread_pool_getkey(void * xctx,void * key,void ** data,ldap_pvt_thread_pool_keyfree_t ** kfree)875 int ldap_pvt_thread_pool_getkey(
876 	void *xctx,
877 	void *key,
878 	void **data,
879 	ldap_pvt_thread_pool_keyfree_t **kfree )
880 {
881 	ldap_int_thread_userctx_t *ctx = xctx;
882 	int i;
883 
884 	if ( !ctx || !key || !data ) return EINVAL;
885 
886 	for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
887 		if ( ctx->ltu_key[i].ltk_key == key ) {
888 			*data = ctx->ltu_key[i].ltk_data;
889 			if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
890 			return 0;
891 		}
892 	}
893 	return ENOENT;
894 }
895 
896 static void
clear_key_idx(ldap_int_thread_userctx_t * ctx,int i)897 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
898 {
899 	for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
900 		ctx->ltu_key[i] = ctx->ltu_key[i+1];
901 	ctx->ltu_key[i].ltk_key = NULL;
902 }
903 
904 /*
905  * Set or remove data for the key in the given context.
906  * key can be any unique pointer.
907  * kfree() is an optional function to free the data (but not the key):
908  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
909  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
910  *   responsibility to free any existing data with the same key.
911  *   kfree() must not call functions taking a tpool argument.
912  */
ldap_pvt_thread_pool_setkey(void * xctx,void * key,void * data,ldap_pvt_thread_pool_keyfree_t * kfree,void ** olddatap,ldap_pvt_thread_pool_keyfree_t ** oldkfreep)913 int ldap_pvt_thread_pool_setkey(
914 	void *xctx,
915 	void *key,
916 	void *data,
917 	ldap_pvt_thread_pool_keyfree_t *kfree,
918 	void **olddatap,
919 	ldap_pvt_thread_pool_keyfree_t **oldkfreep )
920 {
921 	ldap_int_thread_userctx_t *ctx = xctx;
922 	int i, found;
923 
924 	if ( !ctx || !key ) return EINVAL;
925 
926 	for ( i=found=0; i<MAXKEYS; i++ ) {
927 		if ( ctx->ltu_key[i].ltk_key == key ) {
928 			found = 1;
929 			break;
930 		} else if ( !ctx->ltu_key[i].ltk_key ) {
931 			break;
932 		}
933 	}
934 
935 	if ( olddatap ) {
936 		if ( found ) {
937 			*olddatap = ctx->ltu_key[i].ltk_data;
938 		} else {
939 			*olddatap = NULL;
940 		}
941 	}
942 
943 	if ( oldkfreep ) {
944 		if ( found ) {
945 			*oldkfreep = ctx->ltu_key[i].ltk_free;
946 		} else {
947 			*oldkfreep = 0;
948 		}
949 	}
950 
951 	if ( data || kfree ) {
952 		if ( i>=MAXKEYS )
953 			return ENOMEM;
954 		ctx->ltu_key[i].ltk_key = key;
955 		ctx->ltu_key[i].ltk_data = data;
956 		ctx->ltu_key[i].ltk_free = kfree;
957 	} else if ( found ) {
958 		clear_key_idx( ctx, i );
959 	}
960 
961 	return 0;
962 }
963 
964 /* Free all elements with this key, no matter which thread they're in.
965  * May only be called while the pool is paused.
966  */
ldap_pvt_thread_pool_purgekey(void * key)967 void ldap_pvt_thread_pool_purgekey( void *key )
968 {
969 	int i, j;
970 	ldap_int_thread_userctx_t *ctx;
971 
972 	assert ( key != NULL );
973 
974 	for ( i=0; i<LDAP_MAXTHR; i++ ) {
975 		ctx = thread_keys[i].ctx;
976 		if ( ctx && ctx != DELETED_THREAD_CTX ) {
977 			for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
978 				if ( ctx->ltu_key[j].ltk_key == key ) {
979 					if (ctx->ltu_key[j].ltk_free)
980 						ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
981 						ctx->ltu_key[j].ltk_data );
982 					clear_key_idx( ctx, j );
983 					break;
984 				}
985 			}
986 		}
987 	}
988 }
989 
990 /*
991  * Find the context of the current thread.
992  * This is necessary if the caller does not have access to the
993  * thread context handle (for example, a slapd plugin calling
994  * slapi_search_internal()). No doubt it is more efficient
995  * for the application to keep track of the thread context
996  * handles itself.
997  */
ldap_pvt_thread_pool_context()998 void *ldap_pvt_thread_pool_context( )
999 {
1000 	void *ctx = NULL;
1001 
1002 	ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1003 	return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1004 }
1005 
1006 /*
1007  * Free the context's keys.
1008  * Must not call functions taking a tpool argument (because this
1009  * thread already holds ltp_mutex when called from pool_wrapper()).
1010  */
ldap_pvt_thread_pool_context_reset(void * vctx)1011 void ldap_pvt_thread_pool_context_reset( void *vctx )
1012 {
1013 	ldap_int_thread_userctx_t *ctx = vctx;
1014 	int i;
1015 
1016 	for ( i=MAXKEYS-1; i>=0; i--) {
1017 		if ( !ctx->ltu_key[i].ltk_key )
1018 			continue;
1019 		if ( ctx->ltu_key[i].ltk_free )
1020 			ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1021 			ctx->ltu_key[i].ltk_data );
1022 		ctx->ltu_key[i].ltk_key = NULL;
1023 	}
1024 }
1025 
ldap_pvt_thread_pool_tid(void * vctx)1026 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1027 {
1028 	ldap_int_thread_userctx_t *ctx = vctx;
1029 
1030 	return ctx->ltu_id;
1031 }
1032 #endif /* LDAP_THREAD_HAVE_TPOOL */
1033