1 /*
2  *  TurboXSL XML+XSLT processing library
3  *  Thread pool library
4  *
5  *
6  *  (c) Egor Voznessenski, voznyak@mail.ru
7  *
8  *  $Id$
9  *
10 **/
11 #include "threadpool.h"
12 
13 #include "logger.h"
14 #include "unbounded_queue.h"
15 
16 typedef struct threadpool_task_ {
17     void (*routine)(void *);
18     void *data;
19 } threadpool_task;
20 
21 struct threadpool_ {
22     pthread_t *threads;
23     unsigned int num_of_threads;
24     pthread_mutex_t blocked_lock;
25     unsigned int num_of_blocked;
26     threadpool_task *stop_task;
27 
28     unbounded_queue *queue;
29 };
30 
worker_thr_routine(void * data)31 void *worker_thr_routine(void *data)
32 {
33     threadpool *pool = (threadpool *) data;
34     for (; ;)
35     {
36         threadpool_task *task = unbounded_queue_dequeue(pool->queue);
37         if (task == NULL || task == pool->stop_task) break;
38         task->routine(task->data);
39     }
40 
41     return NULL;
42 }
43 
threadpool_init(unsigned int num_of_threads)44 threadpool *threadpool_init(unsigned int num_of_threads)
45 {
46     debug("threadpool_init:: pool size %d", num_of_threads);
47 
48     if (num_of_threads == 0) return NULL;
49 
50     threadpool *pool = memory_allocator_new(sizeof(threadpool));
51     pool->num_of_threads = num_of_threads;
52     pool->stop_task = memory_allocator_new(sizeof(threadpool_task));
53 
54     if (pthread_mutex_init(&(pool->blocked_lock), NULL))
55     {
56         error("shared_variable_create:: blocked lock");
57         return NULL;
58     }
59 
60     pool->queue = unbounded_queue_create();
61     if (pool->queue == NULL)
62     {
63         error("threadpool_init:: queue");
64         return NULL;
65     }
66 
67     pool->threads = memory_allocator_new(sizeof(pthread_t) * num_of_threads);
68     for (unsigned int i = 0; i < num_of_threads; i++)
69     {
70         if (pthread_create(&(pool->threads[i]), NULL, worker_thr_routine, pool))
71         {
72             error("threadpool_init:: thread");
73             threadpool_free(pool);
74             return NULL;
75         }
76     }
77     return pool;
78 }
79 
threadpool_free(threadpool * pool)80 void threadpool_free(threadpool *pool)
81 {
82     if (!pool) return;
83 
84     unbounded_queue_close(pool->queue);
85 
86     for (unsigned int i = 0; i < pool->num_of_threads; i++)
87     {
88         unbounded_queue_enqueue(pool->queue, pool->stop_task);
89     }
90 
91     for (unsigned int i = 0; i < pool->num_of_threads; i++)
92     {
93         pthread_join(pool->threads[i], NULL);
94     }
95 
96     unbounded_queue_release(pool->queue);
97     pthread_mutex_destroy(&(pool->blocked_lock));
98 }
99 
thread_pool_try_wait(threadpool * pool)100 int thread_pool_try_wait(threadpool *pool)
101 {
102     if (pthread_mutex_lock(&(pool->blocked_lock)))
103     {
104         error("thread_pool_try_wait:: lock");
105         return 0;
106     }
107 
108     int result = 0;
109     if (pool->num_of_blocked < pool->num_of_threads)
110     {
111         pool->num_of_blocked += 1;
112         result = 1;
113     }
114 
115     pthread_mutex_unlock(&(pool->blocked_lock));
116 
117     return result;
118 }
119 
thread_pool_finish_wait(threadpool * pool)120 void thread_pool_finish_wait(threadpool *pool)
121 {
122     if (pthread_mutex_lock(&(pool->blocked_lock)))
123     {
124         error("thread_pool_finish_wait:: lock");
125         return;
126     }
127 
128     if (pool->num_of_blocked > 0) pool->num_of_blocked -= 1;
129 
130     pthread_mutex_unlock(&(pool->blocked_lock));
131 }
132 
threadpool_start(threadpool * pool,void (* routine)(void *),void * data)133 void threadpool_start(threadpool *pool, void (*routine)(void *), void *data)
134 {
135     if (!pool)
136     {
137         (*routine)(data);
138         return;
139     }
140 
141     threadpool_task *task = memory_allocator_new(sizeof(threadpool_task));
142     task->routine = routine;
143     task->data = data;
144     unbounded_queue_enqueue(pool->queue, task);
145 }
146 
threadpool_set_allocator(memory_allocator * allocator,threadpool * pool)147 void threadpool_set_allocator(memory_allocator *allocator, threadpool *pool)
148 {
149     if (!pool) return;
150 
151     debug("threadpool_set_allocator:: setup");
152     for (unsigned int i = 0; i < pool->num_of_threads; i++)
153     {
154         memory_allocator_add_entry(allocator, pool->threads[i], 1000000);
155     }
156 }
157 
threadpool_set_external_cache(external_cache * cache,threadpool * pool)158 void threadpool_set_external_cache(external_cache *cache, threadpool *pool)
159 {
160     if (!pool) return;
161 
162     debug("threadpool_set_external_cache:: setup");
163     for (unsigned int i = 0; i < pool->num_of_threads; i++)
164     {
165         external_cache_add_client(cache, pool->threads[i]);
166     }
167 }
168