1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11 static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
12 static void nxt_thread_pool_start(void *ctx);
13 static void nxt_thread_pool_loop(void *ctx);
14 static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
15
16
17 nxt_thread_pool_t *
nxt_thread_pool_create(nxt_uint_t max_threads,nxt_nsec_t timeout,nxt_thread_pool_init_t init,nxt_event_engine_t * engine,nxt_work_handler_t exit)18 nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
19 nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
20 nxt_work_handler_t exit)
21 {
22 nxt_thread_pool_t *tp;
23
24 tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
25 if (tp == NULL) {
26 return NULL;
27 }
28
29 tp->max_threads = max_threads;
30 tp->timeout = timeout;
31 tp->engine = engine;
32 tp->task.thread = engine->task.thread;
33 tp->task.log = engine->task.log;
34 tp->init = init;
35 tp->exit = exit;
36
37 return tp;
38 }
39
40
41 nxt_int_t
nxt_thread_pool_post(nxt_thread_pool_t * tp,nxt_work_t * work)42 nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
43 {
44 nxt_thread_log_debug("thread pool post");
45
46 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
47 return NXT_ERROR;
48 }
49
50 nxt_locked_work_queue_add(&tp->work_queue, work);
51
52 (void) nxt_sem_post(&tp->sem);
53
54 return NXT_OK;
55 }
56
57
58 static nxt_int_t
nxt_thread_pool_init(nxt_thread_pool_t * tp)59 nxt_thread_pool_init(nxt_thread_pool_t *tp)
60 {
61 nxt_int_t ret;
62 nxt_thread_link_t *link;
63 nxt_thread_handle_t handle;
64
65 if (nxt_fast_path(tp->ready)) {
66 return NXT_OK;
67 }
68
69 if (tp->max_threads == 0) {
70 /* The pool is being destroyed. */
71 return NXT_ERROR;
72 }
73
74 nxt_thread_spin_lock(&tp->work_queue.lock);
75
76 ret = NXT_OK;
77
78 if (!tp->ready) {
79
80 nxt_thread_log_debug("thread pool init");
81
82 (void) nxt_atomic_fetch_add(&tp->threads, 1);
83
84 if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
85
86 link = nxt_zalloc(sizeof(nxt_thread_link_t));
87
88 if (nxt_fast_path(link != NULL)) {
89 link->start = nxt_thread_pool_start;
90 link->work.data = tp;
91
92 if (nxt_thread_create(&handle, link) == NXT_OK) {
93 tp->ready = 1;
94 goto done;
95 }
96 }
97
98 nxt_sem_destroy(&tp->sem);
99 }
100
101 (void) nxt_atomic_fetch_add(&tp->threads, -1);
102
103 ret = NXT_ERROR;
104 }
105
106 done:
107
108 nxt_thread_spin_unlock(&tp->work_queue.lock);
109
110 return ret;
111 }
112
113
114 static void
nxt_thread_pool_start(void * ctx)115 nxt_thread_pool_start(void *ctx)
116 {
117 nxt_thread_t *thr;
118 nxt_thread_pool_t *tp;
119
120 tp = ctx;
121 thr = nxt_thread();
122
123 tp->main = thr->handle;
124 tp->task.thread = thr;
125
126 nxt_thread_pool_loop(ctx);
127 }
128
129
130 static void
nxt_thread_pool_loop(void * ctx)131 nxt_thread_pool_loop(void *ctx)
132 {
133 void *obj, *data;
134 nxt_task_t *task;
135 nxt_thread_t *thr;
136 nxt_thread_pool_t *tp;
137 nxt_work_handler_t handler;
138
139 tp = ctx;
140 thr = nxt_thread();
141
142 if (tp->init != NULL) {
143 tp->init();
144 }
145
146 for ( ;; ) {
147 nxt_thread_pool_wait(tp);
148
149 handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
150 &data);
151
152 if (nxt_fast_path(handler != NULL)) {
153 task->thread = thr;
154
155 nxt_log_debug(thr->log, "locked work queue");
156
157 handler(task, obj, data);
158 }
159
160 thr->log = &nxt_main_log;
161 }
162 }
163
164
165 static void
nxt_thread_pool_wait(nxt_thread_pool_t * tp)166 nxt_thread_pool_wait(nxt_thread_pool_t *tp)
167 {
168 nxt_err_t err;
169 nxt_thread_t *thr;
170 nxt_atomic_uint_t waiting, threads;
171 nxt_thread_link_t *link;
172 nxt_thread_handle_t handle;
173
174 thr = nxt_thread();
175
176 nxt_log_debug(thr->log, "thread pool wait");
177
178 (void) nxt_atomic_fetch_add(&tp->waiting, 1);
179
180 for ( ;; ) {
181 err = nxt_sem_wait(&tp->sem, tp->timeout);
182
183 if (err == 0) {
184 waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
185 break;
186 }
187
188 if (err == NXT_ETIMEDOUT) {
189 if (nxt_thread_handle_equal(thr->handle, tp->main)) {
190 continue;
191 }
192 }
193
194 (void) nxt_atomic_fetch_add(&tp->waiting, -1);
195 (void) nxt_atomic_fetch_add(&tp->threads, -1);
196
197 nxt_thread_exit(thr);
198 nxt_unreachable();
199 }
200
201 nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
202
203 if (waiting > 1) {
204 return;
205 }
206
207 do {
208 threads = tp->threads;
209
210 if (threads >= tp->max_threads) {
211 return;
212 }
213
214 } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
215
216 link = nxt_zalloc(sizeof(nxt_thread_link_t));
217
218 if (nxt_fast_path(link != NULL)) {
219 link->start = nxt_thread_pool_loop;
220 link->work.data = tp;
221
222 if (nxt_thread_create(&handle, link) != NXT_OK) {
223 (void) nxt_atomic_fetch_add(&tp->threads, -1);
224 }
225 }
226 }
227
228
229 void
nxt_thread_pool_destroy(nxt_thread_pool_t * tp)230 nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
231 {
232 nxt_thread_t *thr;
233
234 thr = nxt_thread();
235
236 nxt_log_debug(thr->log, "thread pool destroy: %A", tp->ready);
237
238 if (!tp->ready) {
239 nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
240 &tp->engine->task, tp, NULL);
241 return;
242 }
243
244 if (tp->max_threads != 0) {
245 /* Disable new threads creation and mark a pool as being destroyed. */
246 tp->max_threads = 0;
247
248 nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
249
250 nxt_thread_pool_post(tp, &tp->work);
251 }
252 }
253
254
255 /*
256 * A thread handle (pthread_t) is either pointer or integer, so it can be
257 * passed as work handler pointer "data" argument. To convert void pointer
258 * to pthread_t and vice versa the source argument should be cast first to
259 * uintptr_t type and then to the destination type.
260 *
261 * If the handle would be a struct it should be stored in thread pool and
262 * the thread pool must be freed in the thread pool exit procedure after
263 * the last thread of pool will exit.
264 */
265
266 static void
nxt_thread_pool_exit(nxt_task_t * task,void * obj,void * data)267 nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
268 {
269 nxt_thread_t *thread;
270 nxt_thread_pool_t *tp;
271 nxt_atomic_uint_t threads;
272 nxt_thread_handle_t handle;
273
274 tp = obj;
275 thread = task->thread;
276
277 nxt_debug(task, "thread pool exit");
278
279 if (data != NULL) {
280 handle = (nxt_thread_handle_t) (uintptr_t) data;
281 nxt_thread_wait(handle);
282 }
283
284 threads = nxt_atomic_fetch_add(&tp->threads, -1);
285
286 nxt_debug(task, "thread pool threads: %A", threads);
287
288 if (threads > 1) {
289 nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
290 (void *) (uintptr_t) thread->handle);
291
292 nxt_thread_pool_post(tp, &tp->work);
293
294 } else {
295 nxt_debug(task, "thread pool destroy");
296
297 nxt_sem_destroy(&tp->sem);
298
299 nxt_work_set(&tp->work, tp->exit, &tp->engine->task, tp,
300 (void *) (uintptr_t) thread->handle);
301
302 nxt_event_engine_post(tp->engine, &tp->work);
303
304 /* The "tp" memory should be freed by tp->exit handler. */
305 }
306
307 nxt_thread_exit(thread);
308
309 nxt_unreachable();
310 }
311