1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <memory.h>
40 #include <toku_portability.h>
41 #include <stdio.h>
42 #include <errno.h>
43 #include <string.h>
44 
45 #include <toku_assert.h>
46 #include <toku_list.h>
47 #include <portability/toku_pthread.h>
48 
49 #include "threadpool.h"
50 
51 toku_instr_key *tpool_lock_mutex_key;
52 toku_instr_key *tp_thread_wait_key;
53 toku_instr_key *tp_pool_wait_free_key;
54 toku_instr_key *tp_internal_thread_key;
55 
56 struct toku_thread {
57     struct toku_thread_pool *pool;
58     toku_pthread_t tid;
59     void *(*f)(void *arg);
60     void *arg;
61     int doexit;
62     struct toku_list free_link;
63     struct toku_list all_link;
64     toku_cond_t wait;
65 };
66 
67 struct toku_thread_pool {
68     int max_threads;
69     int cur_threads;
70     struct toku_list free_threads;
71     struct toku_list all_threads;
72 
73     toku_mutex_t lock;
74     toku_cond_t wait_free;
75 
76     uint64_t gets, get_blocks;
77 };
78 
79 static void *toku_thread_run_internal(void *arg);
80 static void toku_thread_pool_lock(struct toku_thread_pool *pool);
81 static void toku_thread_pool_unlock(struct toku_thread_pool *pool);
82 
83 static int
toku_thread_create(struct toku_thread_pool * pool,struct toku_thread ** toku_thread_return)84 toku_thread_create(struct toku_thread_pool *pool, struct toku_thread **toku_thread_return) {
85     int r;
86     struct toku_thread *MALLOC(thread);
87     if (thread == nullptr) {
88         r = get_error_errno();
89     } else {
90         memset(thread, 0, sizeof *thread);
91         thread->pool = pool;
92         toku_cond_init(*tp_thread_wait_key, &thread->wait, nullptr);
93         r = toku_pthread_create(*tp_internal_thread_key,
94                                 &thread->tid,
95                                 nullptr,
96                                 toku_thread_run_internal,
97                                 thread);
98         if (r) {
99             toku_cond_destroy(&thread->wait);
100             toku_free(thread);
101             thread = nullptr;
102         }
103         *toku_thread_return = thread;
104     }
105     return r;
106 }
107 
108 void
toku_thread_run(struct toku_thread * thread,void * (* f)(void * arg),void * arg)109 toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) {
110     toku_thread_pool_lock(thread->pool);
111     thread->f = f;
112     thread->arg = arg;
113     toku_cond_signal(&thread->wait);
114     toku_thread_pool_unlock(thread->pool);
115 }
116 
toku_thread_destroy(struct toku_thread * thread)117 static void toku_thread_destroy(struct toku_thread *thread) {
118     int r;
119     void *ret;
120     r = toku_pthread_join(thread->tid, &ret);
121     invariant(r == 0 && ret == thread);
122     struct toku_thread_pool *pool = thread->pool;
123     toku_thread_pool_lock(pool);
124     toku_list_remove(&thread->free_link);
125     toku_thread_pool_unlock(pool);
126     toku_cond_destroy(&thread->wait);
127     toku_free(thread);
128 }
129 
130 static void
toku_thread_ask_exit(struct toku_thread * thread)131 toku_thread_ask_exit(struct toku_thread *thread) {
132     thread->doexit = 1;
133     toku_cond_signal(&thread->wait);
134 }
135 
136 static void *
toku_thread_run_internal(void * arg)137 toku_thread_run_internal(void *arg) {
138     struct toku_thread *thread = (struct toku_thread *) arg;
139     struct toku_thread_pool *pool = thread->pool;
140     toku_thread_pool_lock(pool);
141     while (1) {
142         toku_cond_signal(&pool->wait_free);
143         void *(*thread_f)(void *); void *thread_arg; int doexit;
144         while (1) {
145             thread_f = thread->f; thread_arg = thread->arg; doexit = thread->doexit; // make copies of these variables to make helgrind happy
146             if (thread_f || doexit)
147                 break;
148             toku_cond_wait(&thread->wait, &pool->lock);
149         }
150         toku_thread_pool_unlock(pool);
151         if (thread_f)
152             (void) thread_f(thread_arg);
153         if (doexit)
154             break;
155         toku_thread_pool_lock(pool);
156         thread->f = nullptr;
157         toku_list_push(&pool->free_threads, &thread->free_link);
158     }
159     return toku_pthread_done(arg);
160 }
161 
toku_thread_pool_create(struct toku_thread_pool ** pool_return,int max_threads)162 int toku_thread_pool_create(struct toku_thread_pool **pool_return,
163                             int max_threads) {
164     int r;
165     struct toku_thread_pool *CALLOC(pool);
166     if (pool == nullptr) {
167         r = get_error_errno();
168     } else {
169         toku_mutex_init(*tpool_lock_mutex_key, &pool->lock, nullptr);
170         toku_list_init(&pool->free_threads);
171         toku_list_init(&pool->all_threads);
172         toku_cond_init(*tp_pool_wait_free_key, &pool->wait_free, nullptr);
173         pool->cur_threads = 0;
174         pool->max_threads = max_threads;
175         *pool_return = pool;
176         r = 0;
177     }
178     return r;
179 }
180 
181 static void
toku_thread_pool_lock(struct toku_thread_pool * pool)182 toku_thread_pool_lock(struct toku_thread_pool *pool) {
183     toku_mutex_lock(&pool->lock);
184 }
185 
186 static void
toku_thread_pool_unlock(struct toku_thread_pool * pool)187 toku_thread_pool_unlock(struct toku_thread_pool *pool) {
188     toku_mutex_unlock(&pool->lock);
189 }
190 
191 void
toku_thread_pool_destroy(struct toku_thread_pool ** poolptr)192 toku_thread_pool_destroy(struct toku_thread_pool **poolptr) {
193     struct toku_thread_pool *pool = *poolptr;
194     *poolptr = nullptr;
195 
196     // ask the threads to exit
197     toku_thread_pool_lock(pool);
198     struct toku_list *list;
199     for (list = pool->all_threads.next; list != &pool->all_threads; list = list->next) {
200         struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
201         toku_thread_ask_exit(thread);
202     }
203     toku_thread_pool_unlock(pool);
204 
205     // wait for all of the threads to exit
206     while (!toku_list_empty(&pool->all_threads)) {
207         list = toku_list_pop_head(&pool->all_threads);
208         struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
209         toku_thread_destroy(thread);
210         pool->cur_threads -= 1;
211     }
212 
213     invariant(pool->cur_threads == 0);
214 
215     // cleanup
216     toku_cond_destroy(&pool->wait_free);
217     toku_mutex_destroy(&pool->lock);
218 
219     toku_free(pool);
220 }
221 
222 static int
toku_thread_pool_add(struct toku_thread_pool * pool)223 toku_thread_pool_add(struct toku_thread_pool *pool) {
224     struct toku_thread *thread = nullptr;
225     int r = toku_thread_create(pool, &thread);
226     if (r == 0) {
227         pool->cur_threads += 1;
228         toku_list_push(&pool->all_threads, &thread->all_link);
229         toku_list_push(&pool->free_threads, &thread->free_link);
230         toku_cond_signal(&pool->wait_free);
231     }
232     return r;
233 }
234 
235 // get one thread from the free pool.
236 static int
toku_thread_pool_get_one(struct toku_thread_pool * pool,int dowait,struct toku_thread ** toku_thread_return)237 toku_thread_pool_get_one(struct toku_thread_pool *pool, int dowait, struct toku_thread **toku_thread_return) {
238     int r = 0;
239     toku_thread_pool_lock(pool);
240     pool->gets++;
241     while (1) {
242         if (!toku_list_empty(&pool->free_threads))
243             break;
244         if (pool->max_threads == 0 || pool->cur_threads < pool->max_threads)
245             (void) toku_thread_pool_add(pool);
246         if (toku_list_empty(&pool->free_threads) && !dowait) {
247             r = EWOULDBLOCK;
248             break;
249         }
250         pool->get_blocks++;
251         toku_cond_wait(&pool->wait_free, &pool->lock);
252     }
253     if (r == 0) {
254         struct toku_list *list = toku_list_pop_head(&pool->free_threads);
255         struct toku_thread *thread = toku_list_struct(list, struct toku_thread, free_link);
256         *toku_thread_return = thread;
257     } else
258         *toku_thread_return = nullptr;
259     toku_thread_pool_unlock(pool);
260     return r;
261 }
262 
263 int
toku_thread_pool_get(struct toku_thread_pool * pool,int dowait,int * nthreads,struct toku_thread ** toku_thread_return)264 toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return) {
265     int r = 0;
266     int n = *nthreads;
267     int i;
268     for (i = 0; i < n; i++) {
269         r = toku_thread_pool_get_one(pool, dowait, &toku_thread_return[i]);
270         if (r != 0)
271             break;
272     }
273     *nthreads = i;
274     return r;
275 }
276 
277 int
toku_thread_pool_run(struct toku_thread_pool * pool,int dowait,int * nthreads,void * (* f)(void * arg),void * arg)278 toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg) {
279     int n = *nthreads;
280     struct toku_thread *tids[n];
281     int r = toku_thread_pool_get(pool, dowait, nthreads, tids);
282     if (r == 0 || r == EWOULDBLOCK) {
283         n = *nthreads;
284         for (int i = 0; i < n; i++)
285             toku_thread_run(tids[i], f, arg);
286     }
287     return r;
288 }
289 
290 void
toku_thread_pool_print(struct toku_thread_pool * pool,FILE * out)291 toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out) {
292     fprintf(out, "%s:%d %p %llu %llu\n", __FILE__, __LINE__, pool, (long long unsigned) pool->gets, (long long unsigned) pool->get_blocks);
293 }
294 
295 int
toku_thread_pool_get_current_threads(struct toku_thread_pool * pool)296 toku_thread_pool_get_current_threads(struct toku_thread_pool *pool) {
297     return pool->cur_threads;
298 }
299