1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed
4 * with this work for additional information regarding copyright
5 * ownership. The ASF licenses this file to you under the Apache
6 * License, Version 2.0 (the "License"); you may not use this file
7 * except in compliance with the License. You may obtain a copy of
8 * the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 * implied. See the License for the specific language governing
16 * permissions and limitations under the License.
17 */
18
19 #include <assert.h>
20 #include "apr_thread_pool.h"
21 #include "apr_ring.h"
22 #include "apr_thread_cond.h"
23 #include "apr_portable.h"
24
25 #if APR_HAS_THREADS
26
27 #define TASK_PRIORITY_SEGS 4
28 #define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
29
30 typedef struct apr_thread_pool_task
31 {
32 APR_RING_ENTRY(apr_thread_pool_task) link;
33 apr_thread_start_t func;
34 void *param;
35 void *owner;
36 union
37 {
38 apr_byte_t priority;
39 apr_time_t time;
40 } dispatch;
41 } apr_thread_pool_task_t;
42
43 APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
44
45 struct apr_thread_list_elt
46 {
47 APR_RING_ENTRY(apr_thread_list_elt) link;
48 apr_thread_t *thd;
49 volatile void *current_owner;
50 volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
51 };
52
53 APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
54
55 struct apr_thread_pool
56 {
57 apr_pool_t *pool;
58 volatile apr_size_t thd_max;
59 volatile apr_size_t idle_max;
60 volatile apr_interval_time_t idle_wait;
61 volatile apr_size_t thd_cnt;
62 volatile apr_size_t idle_cnt;
63 volatile apr_size_t task_cnt;
64 volatile apr_size_t scheduled_task_cnt;
65 volatile apr_size_t threshold;
66 volatile apr_size_t tasks_run;
67 volatile apr_size_t tasks_high;
68 volatile apr_size_t thd_high;
69 volatile apr_size_t thd_timed_out;
70 struct apr_thread_pool_tasks *tasks;
71 struct apr_thread_pool_tasks *scheduled_tasks;
72 struct apr_thread_list *busy_thds;
73 struct apr_thread_list *idle_thds;
74 apr_thread_mutex_t *lock;
75 apr_thread_cond_t *cond;
76 volatile int terminated;
77 struct apr_thread_pool_tasks *recycled_tasks;
78 struct apr_thread_list *recycled_thds;
79 apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
80 };
81
thread_pool_construct(apr_thread_pool_t * me,apr_size_t init_threads,apr_size_t max_threads)82 static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
83 apr_size_t init_threads,
84 apr_size_t max_threads)
85 {
86 apr_status_t rv;
87 int i;
88
89 me->thd_max = max_threads;
90 me->idle_max = init_threads;
91 me->threshold = init_threads / 2;
92 rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
93 me->pool);
94 if (APR_SUCCESS != rv) {
95 return rv;
96 }
97 rv = apr_thread_cond_create(&me->cond, me->pool);
98 if (APR_SUCCESS != rv) {
99 apr_thread_mutex_destroy(me->lock);
100 return rv;
101 }
102 me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
103 if (!me->tasks) {
104 goto CATCH_ENOMEM;
105 }
106 APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
107 me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
108 if (!me->scheduled_tasks) {
109 goto CATCH_ENOMEM;
110 }
111 APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
112 me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
113 if (!me->recycled_tasks) {
114 goto CATCH_ENOMEM;
115 }
116 APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
117 me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
118 if (!me->busy_thds) {
119 goto CATCH_ENOMEM;
120 }
121 APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
122 me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
123 if (!me->idle_thds) {
124 goto CATCH_ENOMEM;
125 }
126 APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
127 me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
128 if (!me->recycled_thds) {
129 goto CATCH_ENOMEM;
130 }
131 APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
132 me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
133 me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
134 me->idle_wait = 0;
135 me->terminated = 0;
136 for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
137 me->task_idx[i] = NULL;
138 }
139 goto FINAL_EXIT;
140 CATCH_ENOMEM:
141 rv = APR_ENOMEM;
142 apr_thread_mutex_destroy(me->lock);
143 apr_thread_cond_destroy(me->cond);
144 FINAL_EXIT:
145 return rv;
146 }
147
148 /*
149 * NOTE: This function is not thread safe by itself. Caller should hold the lock
150 */
pop_task(apr_thread_pool_t * me)151 static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
152 {
153 apr_thread_pool_task_t *task = NULL;
154 int seg;
155
156 /* check for scheduled tasks */
157 if (me->scheduled_task_cnt > 0) {
158 task = APR_RING_FIRST(me->scheduled_tasks);
159 assert(task != NULL);
160 assert(task !=
161 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
162 link));
163 /* if it's time */
164 if (task->dispatch.time <= apr_time_now()) {
165 --me->scheduled_task_cnt;
166 APR_RING_REMOVE(task, link);
167 return task;
168 }
169 }
170 /* check for normal tasks if we're not returning a scheduled task */
171 if (me->task_cnt == 0) {
172 return NULL;
173 }
174
175 task = APR_RING_FIRST(me->tasks);
176 assert(task != NULL);
177 assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
178 --me->task_cnt;
179 seg = TASK_PRIORITY_SEG(task);
180 if (task == me->task_idx[seg]) {
181 me->task_idx[seg] = APR_RING_NEXT(task, link);
182 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
183 apr_thread_pool_task, link)
184 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
185 me->task_idx[seg] = NULL;
186 }
187 }
188 APR_RING_REMOVE(task, link);
189 return task;
190 }
191
waiting_time(apr_thread_pool_t * me)192 static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
193 {
194 apr_thread_pool_task_t *task = NULL;
195
196 task = APR_RING_FIRST(me->scheduled_tasks);
197 assert(task != NULL);
198 assert(task !=
199 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
200 link));
201 return task->dispatch.time - apr_time_now();
202 }
203
204 /*
205 * NOTE: This function is not thread safe by itself. Caller should hold the lock
206 */
elt_new(apr_thread_pool_t * me,apr_thread_t * t)207 static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
208 apr_thread_t * t)
209 {
210 struct apr_thread_list_elt *elt;
211
212 if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
213 elt = apr_pcalloc(me->pool, sizeof(*elt));
214 if (NULL == elt) {
215 return NULL;
216 }
217 }
218 else {
219 elt = APR_RING_FIRST(me->recycled_thds);
220 APR_RING_REMOVE(elt, link);
221 }
222
223 APR_RING_ELEM_INIT(elt, link);
224 elt->thd = t;
225 elt->current_owner = NULL;
226 elt->state = TH_RUN;
227 return elt;
228 }
229
230 /*
231 * The worker thread function. Take a task from the queue and perform it if
232 * there is any. Otherwise, put itself into the idle thread list and waiting
233 * for signal to wake up.
234 * The thread terminate directly by detach and exit when it is asked to stop
235 * after finishing a task. Otherwise, the thread should be in idle thread list
236 * and should be joined.
237 */
thread_pool_func(apr_thread_t * t,void * param)238 static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
239 {
240 apr_thread_pool_t *me = param;
241 apr_thread_pool_task_t *task = NULL;
242 apr_interval_time_t wait;
243 struct apr_thread_list_elt *elt;
244
245 apr_thread_mutex_lock(me->lock);
246 elt = elt_new(me, t);
247 if (!elt) {
248 apr_thread_mutex_unlock(me->lock);
249 apr_thread_exit(t, APR_ENOMEM);
250 }
251
252 while (!me->terminated && elt->state != TH_STOP) {
253 /* Test if not new element, it is awakened from idle */
254 if (APR_RING_NEXT(elt, link) != elt) {
255 --me->idle_cnt;
256 APR_RING_REMOVE(elt, link);
257 }
258
259 APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
260 task = pop_task(me);
261 while (NULL != task && !me->terminated) {
262 ++me->tasks_run;
263 elt->current_owner = task->owner;
264 apr_thread_mutex_unlock(me->lock);
265 apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
266 task->func(t, task->param);
267 apr_thread_mutex_lock(me->lock);
268 APR_RING_INSERT_TAIL(me->recycled_tasks, task,
269 apr_thread_pool_task, link);
270 elt->current_owner = NULL;
271 if (TH_STOP == elt->state) {
272 break;
273 }
274 task = pop_task(me);
275 }
276 assert(NULL == elt->current_owner);
277 if (TH_STOP != elt->state)
278 APR_RING_REMOVE(elt, link);
279
280 /* Test if a busy thread been asked to stop, which is not joinable */
281 if ((me->idle_cnt >= me->idle_max
282 && !(me->scheduled_task_cnt && 0 >= me->idle_max)
283 && !me->idle_wait)
284 || me->terminated || elt->state != TH_RUN) {
285 --me->thd_cnt;
286 if ((TH_PROBATION == elt->state) && me->idle_wait)
287 ++me->thd_timed_out;
288 APR_RING_INSERT_TAIL(me->recycled_thds, elt,
289 apr_thread_list_elt, link);
290 apr_thread_mutex_unlock(me->lock);
291 apr_thread_detach(t);
292 apr_thread_exit(t, APR_SUCCESS);
293 return NULL; /* should not be here, safe net */
294 }
295
296 /* busy thread become idle */
297 ++me->idle_cnt;
298 APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
299
300 /*
301 * If there is a scheduled task, always scheduled to perform that task.
302 * Since there is no guarantee that current idle threads are scheduled
303 * for next scheduled task.
304 */
305 if (me->scheduled_task_cnt)
306 wait = waiting_time(me);
307 else if (me->idle_cnt > me->idle_max) {
308 wait = me->idle_wait;
309 elt->state = TH_PROBATION;
310 }
311 else
312 wait = -1;
313
314 if (wait >= 0) {
315 apr_thread_cond_timedwait(me->cond, me->lock, wait);
316 }
317 else {
318 apr_thread_cond_wait(me->cond, me->lock);
319 }
320 }
321
322 /* idle thread been asked to stop, will be joined */
323 --me->thd_cnt;
324 apr_thread_mutex_unlock(me->lock);
325 apr_thread_exit(t, APR_SUCCESS);
326 return NULL; /* should not be here, safe net */
327 }
328
thread_pool_cleanup(void * me)329 static apr_status_t thread_pool_cleanup(void *me)
330 {
331 apr_thread_pool_t *_myself = me;
332
333 _myself->terminated = 1;
334 apr_thread_pool_idle_max_set(_myself, 0);
335 while (_myself->thd_cnt) {
336 apr_sleep(20 * 1000); /* spin lock with 20 ms */
337 }
338 apr_thread_mutex_destroy(_myself->lock);
339 apr_thread_cond_destroy(_myself->cond);
340 return APR_SUCCESS;
341 }
342
apr_thread_pool_create(apr_thread_pool_t ** me,apr_size_t init_threads,apr_size_t max_threads,apr_pool_t * pool)343 APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
344 apr_size_t init_threads,
345 apr_size_t max_threads,
346 apr_pool_t * pool)
347 {
348 apr_thread_t *t;
349 apr_status_t rv = APR_SUCCESS;
350 apr_thread_pool_t *tp;
351
352 *me = NULL;
353 tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
354
355 /*
356 * This pool will be used by different threads. As we cannot ensure that
357 * our caller won't use the pool without acquiring the mutex, we must
358 * create a new sub pool.
359 */
360 rv = apr_pool_create(&tp->pool, pool);
361 if (APR_SUCCESS != rv)
362 return rv;
363 rv = thread_pool_construct(tp, init_threads, max_threads);
364 if (APR_SUCCESS != rv)
365 return rv;
366 apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
367
368 while (init_threads) {
369 /* Grab the mutex as apr_thread_create() and thread_pool_func() will
370 * allocate from (*me)->pool. This is dangerous if there are multiple
371 * initial threads to create.
372 */
373 apr_thread_mutex_lock(tp->lock);
374 rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
375 apr_thread_mutex_unlock(tp->lock);
376 if (APR_SUCCESS != rv) {
377 break;
378 }
379 tp->thd_cnt++;
380 if (tp->thd_cnt > tp->thd_high) {
381 tp->thd_high = tp->thd_cnt;
382 }
383 --init_threads;
384 }
385
386 if (rv == APR_SUCCESS) {
387 *me = tp;
388 }
389
390 return rv;
391 }
392
apr_thread_pool_destroy(apr_thread_pool_t * me)393 APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
394 {
395 apr_pool_destroy(me->pool);
396 return APR_SUCCESS;
397 }
398
399 /*
400 * NOTE: This function is not thread safe by itself. Caller should hold the lock
401 */
task_new(apr_thread_pool_t * me,apr_thread_start_t func,void * param,apr_byte_t priority,void * owner,apr_time_t time)402 static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
403 apr_thread_start_t func,
404 void *param, apr_byte_t priority,
405 void *owner, apr_time_t time)
406 {
407 apr_thread_pool_task_t *t;
408
409 if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
410 t = apr_pcalloc(me->pool, sizeof(*t));
411 if (NULL == t) {
412 return NULL;
413 }
414 }
415 else {
416 t = APR_RING_FIRST(me->recycled_tasks);
417 APR_RING_REMOVE(t, link);
418 }
419
420 APR_RING_ELEM_INIT(t, link);
421 t->func = func;
422 t->param = param;
423 t->owner = owner;
424 if (time > 0) {
425 t->dispatch.time = apr_time_now() + time;
426 }
427 else {
428 t->dispatch.priority = priority;
429 }
430 return t;
431 }
432
433 /*
434 * Test it the task is the only one within the priority segment.
435 * If it is not, return the first element with same or lower priority.
436 * Otherwise, add the task into the queue and return NULL.
437 *
438 * NOTE: This function is not thread safe by itself. Caller should hold the lock
439 */
add_if_empty(apr_thread_pool_t * me,apr_thread_pool_task_t * const t)440 static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
441 apr_thread_pool_task_t * const t)
442 {
443 int seg;
444 int next;
445 apr_thread_pool_task_t *t_next;
446
447 seg = TASK_PRIORITY_SEG(t);
448 if (me->task_idx[seg]) {
449 assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
450 me->task_idx[seg]);
451 t_next = me->task_idx[seg];
452 while (t_next->dispatch.priority > t->dispatch.priority) {
453 t_next = APR_RING_NEXT(t_next, link);
454 if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
455 t_next) {
456 return t_next;
457 }
458 }
459 return t_next;
460 }
461
462 for (next = seg - 1; next >= 0; next--) {
463 if (me->task_idx[next]) {
464 APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
465 break;
466 }
467 }
468 if (0 > next) {
469 APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
470 }
471 me->task_idx[seg] = t;
472 return NULL;
473 }
474
475 /*
476 * schedule a task to run in "time" microseconds. Find the spot in the ring where
477 * the time fits. Adjust the short_time so the thread wakes up when the time is reached.
478 */
schedule_task(apr_thread_pool_t * me,apr_thread_start_t func,void * param,void * owner,apr_interval_time_t time)479 static apr_status_t schedule_task(apr_thread_pool_t *me,
480 apr_thread_start_t func, void *param,
481 void *owner, apr_interval_time_t time)
482 {
483 apr_thread_pool_task_t *t;
484 apr_thread_pool_task_t *t_loc;
485 apr_thread_t *thd;
486 apr_status_t rv = APR_SUCCESS;
487 apr_thread_mutex_lock(me->lock);
488
489 t = task_new(me, func, param, 0, owner, time);
490 if (NULL == t) {
491 apr_thread_mutex_unlock(me->lock);
492 return APR_ENOMEM;
493 }
494 t_loc = APR_RING_FIRST(me->scheduled_tasks);
495 while (NULL != t_loc) {
496 /* if the time is less than the entry insert ahead of it */
497 if (t->dispatch.time < t_loc->dispatch.time) {
498 ++me->scheduled_task_cnt;
499 APR_RING_INSERT_BEFORE(t_loc, t, link);
500 break;
501 }
502 else {
503 t_loc = APR_RING_NEXT(t_loc, link);
504 if (t_loc ==
505 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
506 link)) {
507 ++me->scheduled_task_cnt;
508 APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
509 apr_thread_pool_task, link);
510 break;
511 }
512 }
513 }
514 /* there should be at least one thread for scheduled tasks */
515 if (0 == me->thd_cnt) {
516 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
517 if (APR_SUCCESS == rv) {
518 ++me->thd_cnt;
519 if (me->thd_cnt > me->thd_high)
520 me->thd_high = me->thd_cnt;
521 }
522 }
523 apr_thread_cond_signal(me->cond);
524 apr_thread_mutex_unlock(me->lock);
525 return rv;
526 }
527
add_task(apr_thread_pool_t * me,apr_thread_start_t func,void * param,apr_byte_t priority,int push,void * owner)528 static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
529 void *param, apr_byte_t priority, int push,
530 void *owner)
531 {
532 apr_thread_pool_task_t *t;
533 apr_thread_pool_task_t *t_loc;
534 apr_thread_t *thd;
535 apr_status_t rv = APR_SUCCESS;
536
537 apr_thread_mutex_lock(me->lock);
538
539 t = task_new(me, func, param, priority, owner, 0);
540 if (NULL == t) {
541 apr_thread_mutex_unlock(me->lock);
542 return APR_ENOMEM;
543 }
544
545 t_loc = add_if_empty(me, t);
546 if (NULL == t_loc) {
547 goto FINAL_EXIT;
548 }
549
550 if (push) {
551 while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
552 t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
553 t_loc = APR_RING_NEXT(t_loc, link);
554 }
555 }
556 APR_RING_INSERT_BEFORE(t_loc, t, link);
557 if (!push) {
558 if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
559 me->task_idx[TASK_PRIORITY_SEG(t)] = t;
560 }
561 }
562
563 FINAL_EXIT:
564 me->task_cnt++;
565 if (me->task_cnt > me->tasks_high)
566 me->tasks_high = me->task_cnt;
567 if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
568 me->task_cnt > me->threshold)) {
569 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
570 if (APR_SUCCESS == rv) {
571 ++me->thd_cnt;
572 if (me->thd_cnt > me->thd_high)
573 me->thd_high = me->thd_cnt;
574 }
575 }
576
577 apr_thread_cond_signal(me->cond);
578 apr_thread_mutex_unlock(me->lock);
579
580 return rv;
581 }
582
apr_thread_pool_push(apr_thread_pool_t * me,apr_thread_start_t func,void * param,apr_byte_t priority,void * owner)583 APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
584 apr_thread_start_t func,
585 void *param,
586 apr_byte_t priority,
587 void *owner)
588 {
589 return add_task(me, func, param, priority, 1, owner);
590 }
591
apr_thread_pool_schedule(apr_thread_pool_t * me,apr_thread_start_t func,void * param,apr_interval_time_t time,void * owner)592 APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
593 apr_thread_start_t func,
594 void *param,
595 apr_interval_time_t time,
596 void *owner)
597 {
598 return schedule_task(me, func, param, owner, time);
599 }
600
apr_thread_pool_top(apr_thread_pool_t * me,apr_thread_start_t func,void * param,apr_byte_t priority,void * owner)601 APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
602 apr_thread_start_t func,
603 void *param,
604 apr_byte_t priority,
605 void *owner)
606 {
607 return add_task(me, func, param, priority, 0, owner);
608 }
609
remove_scheduled_tasks(apr_thread_pool_t * me,void * owner)610 static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
611 void *owner)
612 {
613 apr_thread_pool_task_t *t_loc;
614 apr_thread_pool_task_t *next;
615
616 t_loc = APR_RING_FIRST(me->scheduled_tasks);
617 while (t_loc !=
618 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
619 link)) {
620 next = APR_RING_NEXT(t_loc, link);
621 /* if this is the owner remove it */
622 if (t_loc->owner == owner) {
623 --me->scheduled_task_cnt;
624 APR_RING_REMOVE(t_loc, link);
625 }
626 t_loc = next;
627 }
628 return APR_SUCCESS;
629 }
630
remove_tasks(apr_thread_pool_t * me,void * owner)631 static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
632 {
633 apr_thread_pool_task_t *t_loc;
634 apr_thread_pool_task_t *next;
635 int seg;
636
637 t_loc = APR_RING_FIRST(me->tasks);
638 while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
639 next = APR_RING_NEXT(t_loc, link);
640 if (t_loc->owner == owner) {
641 --me->task_cnt;
642 seg = TASK_PRIORITY_SEG(t_loc);
643 if (t_loc == me->task_idx[seg]) {
644 me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
645 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
646 apr_thread_pool_task,
647 link)
648 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
649 me->task_idx[seg] = NULL;
650 }
651 }
652 APR_RING_REMOVE(t_loc, link);
653 }
654 t_loc = next;
655 }
656 return APR_SUCCESS;
657 }
658
wait_on_busy_threads(apr_thread_pool_t * me,void * owner)659 static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
660 {
661 #ifndef NDEBUG
662 apr_os_thread_t *os_thread;
663 #endif
664 struct apr_thread_list_elt *elt;
665 apr_thread_mutex_lock(me->lock);
666 elt = APR_RING_FIRST(me->busy_thds);
667 while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
668 if (elt->current_owner != owner) {
669 elt = APR_RING_NEXT(elt, link);
670 continue;
671 }
672 #ifndef NDEBUG
673 /* make sure the thread is not the one calling tasks_cancel */
674 apr_os_thread_get(&os_thread, elt->thd);
675 #ifdef WIN32
676 /* hack for apr win32 bug */
677 assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
678 #else
679 assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
680 #endif
681 #endif
682 while (elt->current_owner == owner) {
683 apr_thread_mutex_unlock(me->lock);
684 apr_sleep(200 * 1000);
685 apr_thread_mutex_lock(me->lock);
686 }
687 elt = APR_RING_FIRST(me->busy_thds);
688 }
689 apr_thread_mutex_unlock(me->lock);
690 return;
691 }
692
apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,void * owner)693 APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
694 void *owner)
695 {
696 apr_status_t rv = APR_SUCCESS;
697
698 apr_thread_mutex_lock(me->lock);
699 if (me->task_cnt > 0) {
700 rv = remove_tasks(me, owner);
701 }
702 if (me->scheduled_task_cnt > 0) {
703 rv = remove_scheduled_tasks(me, owner);
704 }
705 apr_thread_mutex_unlock(me->lock);
706 wait_on_busy_threads(me, owner);
707
708 return rv;
709 }
710
apr_thread_pool_tasks_count(apr_thread_pool_t * me)711 APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
712 {
713 return me->task_cnt;
714 }
715
716 APU_DECLARE(apr_size_t)
apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me)717 apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
718 {
719 return me->scheduled_task_cnt;
720 }
721
apr_thread_pool_threads_count(apr_thread_pool_t * me)722 APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
723 {
724 return me->thd_cnt;
725 }
726
apr_thread_pool_busy_count(apr_thread_pool_t * me)727 APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
728 {
729 return me->thd_cnt - me->idle_cnt;
730 }
731
apr_thread_pool_idle_count(apr_thread_pool_t * me)732 APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
733 {
734 return me->idle_cnt;
735 }
736
737 APU_DECLARE(apr_size_t)
apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)738 apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
739 {
740 return me->tasks_run;
741 }
742
743 APU_DECLARE(apr_size_t)
apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)744 apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
745 {
746 return me->tasks_high;
747 }
748
749 APU_DECLARE(apr_size_t)
apr_thread_pool_threads_high_count(apr_thread_pool_t * me)750 apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
751 {
752 return me->thd_high;
753 }
754
755 APU_DECLARE(apr_size_t)
apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)756 apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
757 {
758 return me->thd_timed_out;
759 }
760
761
apr_thread_pool_idle_max_get(apr_thread_pool_t * me)762 APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
763 {
764 return me->idle_max;
765 }
766
767 APU_DECLARE(apr_interval_time_t)
apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)768 apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
769 {
770 return me->idle_wait;
771 }
772
773 /*
774 * This function stop extra idle threads to the cnt.
775 * @return the number of threads stopped
776 * NOTE: There could be busy threads become idle during this function
777 */
trim_threads(apr_thread_pool_t * me,apr_size_t * cnt,int idle)778 static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
779 apr_size_t *cnt, int idle)
780 {
781 struct apr_thread_list *thds;
782 apr_size_t n, n_dbg, i;
783 struct apr_thread_list_elt *head, *tail, *elt;
784
785 apr_thread_mutex_lock(me->lock);
786 if (idle) {
787 thds = me->idle_thds;
788 n = me->idle_cnt;
789 }
790 else {
791 thds = me->busy_thds;
792 n = me->thd_cnt - me->idle_cnt;
793 }
794 if (n <= *cnt) {
795 apr_thread_mutex_unlock(me->lock);
796 *cnt = 0;
797 return NULL;
798 }
799 n -= *cnt;
800
801 head = APR_RING_FIRST(thds);
802 for (i = 0; i < *cnt; i++) {
803 head = APR_RING_NEXT(head, link);
804 }
805 tail = APR_RING_LAST(thds);
806 if (idle) {
807 APR_RING_UNSPLICE(head, tail, link);
808 me->idle_cnt = *cnt;
809 }
810
811 n_dbg = 0;
812 for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
813 elt->state = TH_STOP;
814 n_dbg++;
815 }
816 elt->state = TH_STOP;
817 n_dbg++;
818 assert(n == n_dbg);
819 *cnt = n;
820
821 apr_thread_mutex_unlock(me->lock);
822
823 APR_RING_PREV(head, link) = NULL;
824 APR_RING_NEXT(tail, link) = NULL;
825 return head;
826 }
827
trim_idle_threads(apr_thread_pool_t * me,apr_size_t cnt)828 static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
829 {
830 apr_size_t n_dbg;
831 struct apr_thread_list_elt *elt, *head, *tail;
832 apr_status_t rv;
833
834 elt = trim_threads(me, &cnt, 1);
835
836 apr_thread_mutex_lock(me->lock);
837 apr_thread_cond_broadcast(me->cond);
838 apr_thread_mutex_unlock(me->lock);
839
840 n_dbg = 0;
841 if (NULL != (head = elt)) {
842 while (elt) {
843 tail = elt;
844 apr_thread_join(&rv, elt->thd);
845 elt = APR_RING_NEXT(elt, link);
846 ++n_dbg;
847 }
848 apr_thread_mutex_lock(me->lock);
849 APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
850 apr_thread_list_elt, link);
851 apr_thread_mutex_unlock(me->lock);
852 }
853 assert(cnt == n_dbg);
854
855 return cnt;
856 }
857
858 /* don't join on busy threads for performance reasons, who knows how long will
859 * the task takes to perform
860 */
trim_busy_threads(apr_thread_pool_t * me,apr_size_t cnt)861 static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
862 {
863 trim_threads(me, &cnt, 0);
864 return cnt;
865 }
866
apr_thread_pool_idle_max_set(apr_thread_pool_t * me,apr_size_t cnt)867 APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
868 apr_size_t cnt)
869 {
870 me->idle_max = cnt;
871 cnt = trim_idle_threads(me, cnt);
872 return cnt;
873 }
874
875 APU_DECLARE(apr_interval_time_t)
apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,apr_interval_time_t timeout)876 apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
877 apr_interval_time_t timeout)
878 {
879 apr_interval_time_t oldtime;
880
881 oldtime = me->idle_wait;
882 me->idle_wait = timeout;
883
884 return oldtime;
885 }
886
apr_thread_pool_thread_max_get(apr_thread_pool_t * me)887 APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
888 {
889 return me->thd_max;
890 }
891
892 /*
893 * This function stop extra working threads to the new limit.
894 * NOTE: There could be busy threads become idle during this function
895 */
apr_thread_pool_thread_max_set(apr_thread_pool_t * me,apr_size_t cnt)896 APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
897 apr_size_t cnt)
898 {
899 unsigned int n;
900
901 me->thd_max = cnt;
902 if (0 == cnt || me->thd_cnt <= cnt) {
903 return 0;
904 }
905
906 n = me->thd_cnt - cnt;
907 if (n >= me->idle_cnt) {
908 trim_busy_threads(me, n - me->idle_cnt);
909 trim_idle_threads(me, 0);
910 }
911 else {
912 trim_idle_threads(me, me->idle_cnt - n);
913 }
914 return n;
915 }
916
apr_thread_pool_threshold_get(apr_thread_pool_t * me)917 APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
918 {
919 return me->threshold;
920 }
921
apr_thread_pool_threshold_set(apr_thread_pool_t * me,apr_size_t val)922 APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
923 apr_size_t val)
924 {
925 apr_size_t ov;
926
927 ov = me->threshold;
928 me->threshold = val;
929 return ov;
930 }
931
apr_thread_pool_task_owner_get(apr_thread_t * thd,void ** owner)932 APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
933 void **owner)
934 {
935 apr_status_t rv;
936 apr_thread_pool_task_t *task;
937 void *data;
938
939 rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
940 if (rv != APR_SUCCESS) {
941 return rv;
942 }
943
944 task = data;
945 if (!task) {
946 *owner = NULL;
947 return APR_BADARG;
948 }
949
950 *owner = task->owner;
951 return APR_SUCCESS;
952 }
953
954 #endif /* APR_HAS_THREADS */
955
956 /* vim: set ts=4 sw=4 et cin tw=80: */
957