1 /*
2 * TurboXSL XML+XSLT processing library
3 * Thread pool library
4 *
5 *
6 * (c) Egor Voznessenski, voznyak@mail.ru
7 *
8 * $Id$
9 *
10 **/
11 #include "threadpool.h"
12
13 #include "logger.h"
14 #include "unbounded_queue.h"
15
16 typedef struct threadpool_task_ {
17 void (*routine)(void *);
18 void *data;
19 } threadpool_task;
20
21 struct threadpool_ {
22 pthread_t *threads;
23 unsigned int num_of_threads;
24 pthread_mutex_t blocked_lock;
25 unsigned int num_of_blocked;
26 threadpool_task *stop_task;
27
28 unbounded_queue *queue;
29 };
30
worker_thr_routine(void * data)31 void *worker_thr_routine(void *data)
32 {
33 threadpool *pool = (threadpool *) data;
34 for (; ;)
35 {
36 threadpool_task *task = unbounded_queue_dequeue(pool->queue);
37 if (task == NULL || task == pool->stop_task) break;
38 task->routine(task->data);
39 }
40
41 return NULL;
42 }
43
threadpool_init(unsigned int num_of_threads)44 threadpool *threadpool_init(unsigned int num_of_threads)
45 {
46 debug("threadpool_init:: pool size %d", num_of_threads);
47
48 if (num_of_threads == 0) return NULL;
49
50 threadpool *pool = memory_allocator_new(sizeof(threadpool));
51 pool->num_of_threads = num_of_threads;
52 pool->stop_task = memory_allocator_new(sizeof(threadpool_task));
53
54 if (pthread_mutex_init(&(pool->blocked_lock), NULL))
55 {
56 error("shared_variable_create:: blocked lock");
57 return NULL;
58 }
59
60 pool->queue = unbounded_queue_create();
61 if (pool->queue == NULL)
62 {
63 error("threadpool_init:: queue");
64 return NULL;
65 }
66
67 pool->threads = memory_allocator_new(sizeof(pthread_t) * num_of_threads);
68 for (unsigned int i = 0; i < num_of_threads; i++)
69 {
70 if (pthread_create(&(pool->threads[i]), NULL, worker_thr_routine, pool))
71 {
72 error("threadpool_init:: thread");
73 threadpool_free(pool);
74 return NULL;
75 }
76 }
77 return pool;
78 }
79
threadpool_free(threadpool * pool)80 void threadpool_free(threadpool *pool)
81 {
82 if (!pool) return;
83
84 unbounded_queue_close(pool->queue);
85
86 for (unsigned int i = 0; i < pool->num_of_threads; i++)
87 {
88 unbounded_queue_enqueue(pool->queue, pool->stop_task);
89 }
90
91 for (unsigned int i = 0; i < pool->num_of_threads; i++)
92 {
93 pthread_join(pool->threads[i], NULL);
94 }
95
96 unbounded_queue_release(pool->queue);
97 pthread_mutex_destroy(&(pool->blocked_lock));
98 }
99
thread_pool_try_wait(threadpool * pool)100 int thread_pool_try_wait(threadpool *pool)
101 {
102 if (pthread_mutex_lock(&(pool->blocked_lock)))
103 {
104 error("thread_pool_try_wait:: lock");
105 return 0;
106 }
107
108 int result = 0;
109 if (pool->num_of_blocked < pool->num_of_threads)
110 {
111 pool->num_of_blocked += 1;
112 result = 1;
113 }
114
115 pthread_mutex_unlock(&(pool->blocked_lock));
116
117 return result;
118 }
119
thread_pool_finish_wait(threadpool * pool)120 void thread_pool_finish_wait(threadpool *pool)
121 {
122 if (pthread_mutex_lock(&(pool->blocked_lock)))
123 {
124 error("thread_pool_finish_wait:: lock");
125 return;
126 }
127
128 if (pool->num_of_blocked > 0) pool->num_of_blocked -= 1;
129
130 pthread_mutex_unlock(&(pool->blocked_lock));
131 }
132
threadpool_start(threadpool * pool,void (* routine)(void *),void * data)133 void threadpool_start(threadpool *pool, void (*routine)(void *), void *data)
134 {
135 if (!pool)
136 {
137 (*routine)(data);
138 return;
139 }
140
141 threadpool_task *task = memory_allocator_new(sizeof(threadpool_task));
142 task->routine = routine;
143 task->data = data;
144 unbounded_queue_enqueue(pool->queue, task);
145 }
146
threadpool_set_allocator(memory_allocator * allocator,threadpool * pool)147 void threadpool_set_allocator(memory_allocator *allocator, threadpool *pool)
148 {
149 if (!pool) return;
150
151 debug("threadpool_set_allocator:: setup");
152 for (unsigned int i = 0; i < pool->num_of_threads; i++)
153 {
154 memory_allocator_add_entry(allocator, pool->threads[i], 1000000);
155 }
156 }
157
threadpool_set_external_cache(external_cache * cache,threadpool * pool)158 void threadpool_set_external_cache(external_cache *cache, threadpool *pool)
159 {
160 if (!pool) return;
161
162 debug("threadpool_set_external_cache:: setup");
163 for (unsigned int i = 0; i < pool->num_of_threads; i++)
164 {
165 external_cache_add_client(cache, pool->threads[i]);
166 }
167 }
168