Lines Matching refs:pool

25 static void do_spawn_thread(ThreadPool *pool);
37 ThreadPool *pool; member
78 ThreadPool *pool = opaque; in worker_thread() local
80 qemu_mutex_lock(&pool->lock); in worker_thread()
81 pool->pending_threads--; in worker_thread()
82 do_spawn_thread(pool); in worker_thread()
84 while (!pool->stopping) { in worker_thread()
89 pool->idle_threads++; in worker_thread()
90 qemu_mutex_unlock(&pool->lock); in worker_thread()
91 ret = qemu_sem_timedwait(&pool->sem, 10000); in worker_thread()
92 qemu_mutex_lock(&pool->lock); in worker_thread()
93 pool->idle_threads--; in worker_thread()
94 } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); in worker_thread()
95 if (ret == -1 || pool->stopping) { in worker_thread()
99 req = QTAILQ_FIRST(&pool->request_list); in worker_thread()
100 QTAILQ_REMOVE(&pool->request_list, req, reqs); in worker_thread()
102 qemu_mutex_unlock(&pool->lock); in worker_thread()
111 qemu_mutex_lock(&pool->lock); in worker_thread()
113 qemu_bh_schedule(pool->completion_bh); in worker_thread()
116 pool->cur_threads--; in worker_thread()
117 qemu_cond_signal(&pool->worker_stopped); in worker_thread()
118 qemu_mutex_unlock(&pool->lock); in worker_thread()
122 static void do_spawn_thread(ThreadPool *pool) in do_spawn_thread() argument
127 if (!pool->new_threads) { in do_spawn_thread()
131 pool->new_threads--; in do_spawn_thread()
132 pool->pending_threads++; in do_spawn_thread()
134 qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED); in do_spawn_thread()
139 ThreadPool *pool = opaque; in spawn_thread_bh_fn() local
141 qemu_mutex_lock(&pool->lock); in spawn_thread_bh_fn()
142 do_spawn_thread(pool); in spawn_thread_bh_fn()
143 qemu_mutex_unlock(&pool->lock); in spawn_thread_bh_fn()
146 static void spawn_thread(ThreadPool *pool) in spawn_thread() argument
148 pool->cur_threads++; in spawn_thread()
149 pool->new_threads++; in spawn_thread()
157 if (!pool->pending_threads) { in spawn_thread()
158 qemu_bh_schedule(pool->new_thread_bh); in spawn_thread()
164 ThreadPool *pool = opaque; in thread_pool_completion_bh() local
167 aio_context_acquire(pool->ctx); in thread_pool_completion_bh()
169 QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { in thread_pool_completion_bh()
174 trace_thread_pool_complete(pool, elem, elem->common.opaque, in thread_pool_completion_bh()
185 qemu_bh_schedule(pool->completion_bh); in thread_pool_completion_bh()
187 aio_context_release(pool->ctx); in thread_pool_completion_bh()
189 aio_context_acquire(pool->ctx); in thread_pool_completion_bh()
195 qemu_bh_cancel(pool->completion_bh); in thread_pool_completion_bh()
203 aio_context_release(pool->ctx); in thread_pool_completion_bh()
209 ThreadPool *pool = elem->pool; in thread_pool_cancel() local
213 qemu_mutex_lock(&pool->lock); in thread_pool_cancel()
220 qemu_sem_timedwait(&pool->sem, 0) == 0) { in thread_pool_cancel()
221 QTAILQ_REMOVE(&pool->request_list, elem, reqs); in thread_pool_cancel()
222 qemu_bh_schedule(pool->completion_bh); in thread_pool_cancel()
228 qemu_mutex_unlock(&pool->lock); in thread_pool_cancel()
234 ThreadPool *pool = elem->pool; in thread_pool_get_aio_context() local
235 return pool->ctx; in thread_pool_get_aio_context()
244 BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, in thread_pool_submit_aio() argument
254 req->pool = pool; in thread_pool_submit_aio()
256 QLIST_INSERT_HEAD(&pool->head, req, all); in thread_pool_submit_aio()
258 trace_thread_pool_submit(pool, req, arg); in thread_pool_submit_aio()
260 qemu_mutex_lock(&pool->lock); in thread_pool_submit_aio()
261 if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { in thread_pool_submit_aio()
262 spawn_thread(pool); in thread_pool_submit_aio()
264 QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); in thread_pool_submit_aio()
265 qemu_mutex_unlock(&pool->lock); in thread_pool_submit_aio()
266 qemu_sem_post(&pool->sem); in thread_pool_submit_aio()
283 int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, in thread_pool_submit_co() argument
288 thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); in thread_pool_submit_co()
293 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) in thread_pool_submit() argument
295 thread_pool_submit_aio(pool, func, arg, NULL, NULL); in thread_pool_submit()
298 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) in thread_pool_init_one() argument
304 memset(pool, 0, sizeof(*pool)); in thread_pool_init_one()
305 pool->ctx = ctx; in thread_pool_init_one()
306 pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool); in thread_pool_init_one()
307 qemu_mutex_init(&pool->lock); in thread_pool_init_one()
308 qemu_cond_init(&pool->worker_stopped); in thread_pool_init_one()
309 qemu_sem_init(&pool->sem, 0); in thread_pool_init_one()
310 pool->max_threads = 64; in thread_pool_init_one()
311 pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); in thread_pool_init_one()
313 QLIST_INIT(&pool->head); in thread_pool_init_one()
314 QTAILQ_INIT(&pool->request_list); in thread_pool_init_one()
319 ThreadPool *pool = g_new(ThreadPool, 1); in thread_pool_new() local
320 thread_pool_init_one(pool, ctx); in thread_pool_new()
321 return pool; in thread_pool_new()
324 void thread_pool_free(ThreadPool *pool) in thread_pool_free() argument
326 if (!pool) { in thread_pool_free()
330 assert(QLIST_EMPTY(&pool->head)); in thread_pool_free()
332 qemu_mutex_lock(&pool->lock); in thread_pool_free()
335 qemu_bh_delete(pool->new_thread_bh); in thread_pool_free()
336 pool->cur_threads -= pool->new_threads; in thread_pool_free()
337 pool->new_threads = 0; in thread_pool_free()
340 pool->stopping = true; in thread_pool_free()
341 while (pool->cur_threads > 0) { in thread_pool_free()
342 qemu_sem_post(&pool->sem); in thread_pool_free()
343 qemu_cond_wait(&pool->worker_stopped, &pool->lock); in thread_pool_free()
346 qemu_mutex_unlock(&pool->lock); in thread_pool_free()
348 qemu_bh_delete(pool->completion_bh); in thread_pool_free()
349 qemu_sem_destroy(&pool->sem); in thread_pool_free()
350 qemu_cond_destroy(&pool->worker_stopped); in thread_pool_free()
351 qemu_mutex_destroy(&pool->lock); in thread_pool_free()
352 g_free(pool); in thread_pool_free()