1 /*
2 * virthreadpool.c: a generic thread pool implementation
3 *
4 * Copyright (C) 2014 Red Hat, Inc.
5 * Copyright (C) 2010 Hu Tao
6 * Copyright (C) 2010 Daniel P. Berrange
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library. If not, see
20 * <http://www.gnu.org/licenses/>.
21 */
22
23 #include <config.h>
24
25 #include "virthreadpool.h"
26 #include "viralloc.h"
27 #include "virthread.h"
28 #include "virerror.h"
29
30 #define VIR_FROM_THIS VIR_FROM_NONE
31
32 typedef struct _virThreadPoolJob virThreadPoolJob;
33 struct _virThreadPoolJob {
34 virThreadPoolJob *prev;
35 virThreadPoolJob *next;
36 unsigned int priority;
37
38 void *data;
39 };
40
41 typedef struct _virThreadPoolJobList virThreadPoolJobList;
42 struct _virThreadPoolJobList {
43 virThreadPoolJob *head;
44 virThreadPoolJob *tail;
45 virThreadPoolJob *firstPrio;
46 };
47
48
49 struct _virThreadPool {
50 bool quit;
51
52 virThreadPoolJobFunc jobFunc;
53 const char *jobName;
54 void *jobOpaque;
55 virThreadPoolJobList jobList;
56 size_t jobQueueDepth;
57
58 virIdentity *identity;
59
60 virMutex mutex;
61 virCond cond;
62 virCond quit_cond;
63
64 size_t maxWorkers;
65 size_t minWorkers;
66 size_t freeWorkers;
67 size_t nWorkers;
68 virThread *workers;
69
70 size_t maxPrioWorkers;
71 size_t nPrioWorkers;
72 virThread *prioWorkers;
73 virCond prioCond;
74 };
75
76 struct virThreadPoolWorkerData {
77 virThreadPool *pool;
78 virCond *cond;
79 bool priority;
80 };
81
82 /* Test whether the worker needs to quit if the current number of workers @count
83 * is greater than @limit actually allows.
84 */
virThreadPoolWorkerQuitHelper(size_t count,size_t limit)85 static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t limit)
86 {
87 return count > limit;
88 }
89
virThreadPoolWorker(void * opaque)90 static void virThreadPoolWorker(void *opaque)
91 {
92 struct virThreadPoolWorkerData *data = opaque;
93 virThreadPool *pool = data->pool;
94 virCond *cond = data->cond;
95 bool priority = data->priority;
96 size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
97 size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers;
98 virThreadPoolJob *job = NULL;
99
100 VIR_FREE(data);
101
102 virMutexLock(&pool->mutex);
103
104 if (pool->identity)
105 virIdentitySetCurrent(pool->identity);
106
107 while (1) {
108 /* In order to support async worker termination, we need ensure that
109 * both busy and free workers know if they need to terminated. Thus,
110 * busy workers need to check for this fact before they start waiting for
111 * another job (and before taking another one from the queue); and
112 * free workers need to check for this right after waking up.
113 */
114 if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
115 goto out;
116 while (!pool->quit &&
117 ((!priority && !pool->jobList.head) ||
118 (priority && !pool->jobList.firstPrio))) {
119 if (!priority)
120 pool->freeWorkers++;
121 if (virCondWait(cond, &pool->mutex) < 0) {
122 if (!priority)
123 pool->freeWorkers--;
124 goto out;
125 }
126 if (!priority)
127 pool->freeWorkers--;
128
129 if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
130 goto out;
131 }
132
133 if (pool->quit)
134 break;
135
136 if (priority) {
137 job = pool->jobList.firstPrio;
138 } else {
139 job = pool->jobList.head;
140 }
141
142 if (job == pool->jobList.firstPrio) {
143 virThreadPoolJob *tmp = job->next;
144 while (tmp) {
145 if (tmp->priority)
146 break;
147 tmp = tmp->next;
148 }
149 pool->jobList.firstPrio = tmp;
150 }
151
152 if (job->prev)
153 job->prev->next = job->next;
154 else
155 pool->jobList.head = job->next;
156 if (job->next)
157 job->next->prev = job->prev;
158 else
159 pool->jobList.tail = job->prev;
160
161 pool->jobQueueDepth--;
162
163 virMutexUnlock(&pool->mutex);
164 (pool->jobFunc)(job->data, pool->jobOpaque);
165 VIR_FREE(job);
166 virMutexLock(&pool->mutex);
167 }
168
169 out:
170 if (priority)
171 pool->nPrioWorkers--;
172 else
173 pool->nWorkers--;
174 if (pool->nWorkers == 0 && pool->nPrioWorkers == 0)
175 virCondSignal(&pool->quit_cond);
176 virMutexUnlock(&pool->mutex);
177 }
178
179 static int
virThreadPoolExpand(virThreadPool * pool,size_t gain,bool priority)180 virThreadPoolExpand(virThreadPool *pool, size_t gain, bool priority)
181 {
182 virThread **workers = priority ? &pool->prioWorkers : &pool->workers;
183 size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
184 size_t i = 0;
185 struct virThreadPoolWorkerData *data = NULL;
186
187 VIR_EXPAND_N(*workers, *curWorkers, gain);
188
189 for (i = 0; i < gain; i++) {
190 g_autofree char *name = NULL;
191
192 data = g_new0(struct virThreadPoolWorkerData, 1);
193 data->pool = pool;
194 data->cond = priority ? &pool->prioCond : &pool->cond;
195 data->priority = priority;
196
197 if (priority)
198 name = g_strdup_printf("prio-%s", pool->jobName);
199 else
200 name = g_strdup(pool->jobName);
201
202 if (virThreadCreateFull(&(*workers)[i],
203 false,
204 virThreadPoolWorker,
205 name,
206 true,
207 data) < 0) {
208 VIR_FREE(data);
209 virReportSystemError(errno, "%s", _("Failed to create thread"));
210 goto error;
211 }
212 }
213
214 return 0;
215
216 error:
217 *curWorkers -= gain - i;
218 return -1;
219 }
220
221 virThreadPool *
virThreadPoolNewFull(size_t minWorkers,size_t maxWorkers,size_t prioWorkers,virThreadPoolJobFunc func,const char * name,virIdentity * identity,void * opaque)222 virThreadPoolNewFull(size_t minWorkers,
223 size_t maxWorkers,
224 size_t prioWorkers,
225 virThreadPoolJobFunc func,
226 const char *name,
227 virIdentity *identity,
228 void *opaque)
229 {
230 virThreadPool *pool;
231
232 if (minWorkers > maxWorkers)
233 minWorkers = maxWorkers;
234
235 pool = g_new0(virThreadPool, 1);
236
237 pool->jobList.tail = pool->jobList.head = NULL;
238
239 pool->jobFunc = func;
240 pool->jobName = name;
241 pool->jobOpaque = opaque;
242
243 if (identity)
244 pool->identity = g_object_ref(identity);
245
246 if (virMutexInit(&pool->mutex) < 0)
247 goto error;
248 if (virCondInit(&pool->cond) < 0)
249 goto error;
250 if (virCondInit(&pool->prioCond) < 0)
251 goto error;
252 if (virCondInit(&pool->quit_cond) < 0)
253 goto error;
254
255 pool->minWorkers = minWorkers;
256 pool->maxWorkers = maxWorkers;
257 pool->maxPrioWorkers = prioWorkers;
258
259 if ((minWorkers > 0) && virThreadPoolExpand(pool, minWorkers, false) < 0)
260 goto error;
261
262 if ((prioWorkers > 0) && virThreadPoolExpand(pool, prioWorkers, true) < 0)
263 goto error;
264
265 return pool;
266
267 error:
268 virThreadPoolFree(pool);
269 return NULL;
270
271 }
272
273
274 static void
virThreadPoolStopLocked(virThreadPool * pool)275 virThreadPoolStopLocked(virThreadPool *pool)
276 {
277 if (pool->quit)
278 return;
279
280 pool->quit = true;
281 if (pool->nWorkers > 0)
282 virCondBroadcast(&pool->cond);
283 if (pool->nPrioWorkers > 0)
284 virCondBroadcast(&pool->prioCond);
285 }
286
287
288 static void
virThreadPoolDrainLocked(virThreadPool * pool)289 virThreadPoolDrainLocked(virThreadPool *pool)
290 {
291 virThreadPoolJob *job;
292
293 virThreadPoolStopLocked(pool);
294
295 while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
296 ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
297
298 while ((job = pool->jobList.head)) {
299 pool->jobList.head = pool->jobList.head->next;
300 VIR_FREE(job);
301 }
302 }
303
virThreadPoolFree(virThreadPool * pool)304 void virThreadPoolFree(virThreadPool *pool)
305 {
306 if (!pool)
307 return;
308
309 virMutexLock(&pool->mutex);
310 virThreadPoolDrainLocked(pool);
311
312 if (pool->identity)
313 g_object_unref(pool->identity);
314
315 g_free(pool->workers);
316 virMutexUnlock(&pool->mutex);
317 virMutexDestroy(&pool->mutex);
318 virCondDestroy(&pool->quit_cond);
319 virCondDestroy(&pool->cond);
320 g_free(pool->prioWorkers);
321 virCondDestroy(&pool->prioCond);
322 g_free(pool);
323 }
324
325
virThreadPoolGetMinWorkers(virThreadPool * pool)326 size_t virThreadPoolGetMinWorkers(virThreadPool *pool)
327 {
328 size_t ret;
329
330 virMutexLock(&pool->mutex);
331 ret = pool->minWorkers;
332 virMutexUnlock(&pool->mutex);
333
334 return ret;
335 }
336
virThreadPoolGetMaxWorkers(virThreadPool * pool)337 size_t virThreadPoolGetMaxWorkers(virThreadPool *pool)
338 {
339 size_t ret;
340
341 virMutexLock(&pool->mutex);
342 ret = pool->maxWorkers;
343 virMutexUnlock(&pool->mutex);
344
345 return ret;
346 }
347
virThreadPoolGetPriorityWorkers(virThreadPool * pool)348 size_t virThreadPoolGetPriorityWorkers(virThreadPool *pool)
349 {
350 size_t ret;
351
352 virMutexLock(&pool->mutex);
353 ret = pool->nPrioWorkers;
354 virMutexUnlock(&pool->mutex);
355
356 return ret;
357 }
358
virThreadPoolGetCurrentWorkers(virThreadPool * pool)359 size_t virThreadPoolGetCurrentWorkers(virThreadPool *pool)
360 {
361 size_t ret;
362
363 virMutexLock(&pool->mutex);
364 ret = pool->nWorkers;
365 virMutexUnlock(&pool->mutex);
366
367 return ret;
368 }
369
virThreadPoolGetFreeWorkers(virThreadPool * pool)370 size_t virThreadPoolGetFreeWorkers(virThreadPool *pool)
371 {
372 size_t ret;
373
374 virMutexLock(&pool->mutex);
375 ret = pool->freeWorkers;
376 virMutexUnlock(&pool->mutex);
377
378 return ret;
379 }
380
virThreadPoolGetJobQueueDepth(virThreadPool * pool)381 size_t virThreadPoolGetJobQueueDepth(virThreadPool *pool)
382 {
383 size_t ret;
384
385 virMutexLock(&pool->mutex);
386 ret = pool->jobQueueDepth;
387 virMutexUnlock(&pool->mutex);
388
389 return ret;
390 }
391
392 /*
393 * @priority - job priority
394 * Return: 0 on success, -1 otherwise
395 */
virThreadPoolSendJob(virThreadPool * pool,unsigned int priority,void * jobData)396 int virThreadPoolSendJob(virThreadPool *pool,
397 unsigned int priority,
398 void *jobData)
399 {
400 virThreadPoolJob *job;
401
402 virMutexLock(&pool->mutex);
403 if (pool->quit)
404 goto error;
405
406 if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
407 pool->nWorkers < pool->maxWorkers &&
408 virThreadPoolExpand(pool, 1, false) < 0)
409 goto error;
410
411 job = g_new0(virThreadPoolJob, 1);
412
413 job->data = jobData;
414 job->priority = priority;
415
416 job->prev = pool->jobList.tail;
417 if (pool->jobList.tail)
418 pool->jobList.tail->next = job;
419 pool->jobList.tail = job;
420
421 if (!pool->jobList.head)
422 pool->jobList.head = job;
423
424 if (priority && !pool->jobList.firstPrio)
425 pool->jobList.firstPrio = job;
426
427 pool->jobQueueDepth++;
428
429 virCondSignal(&pool->cond);
430 if (priority)
431 virCondSignal(&pool->prioCond);
432
433 virMutexUnlock(&pool->mutex);
434 return 0;
435
436 error:
437 virMutexUnlock(&pool->mutex);
438 return -1;
439 }
440
441 int
virThreadPoolSetParameters(virThreadPool * pool,long long int minWorkers,long long int maxWorkers,long long int prioWorkers)442 virThreadPoolSetParameters(virThreadPool *pool,
443 long long int minWorkers,
444 long long int maxWorkers,
445 long long int prioWorkers)
446 {
447 size_t max;
448 size_t min;
449
450 virMutexLock(&pool->mutex);
451
452 max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers;
453 min = minWorkers >= 0 ? minWorkers : pool->minWorkers;
454 if (min > max) {
455 virReportError(VIR_ERR_INVALID_ARG, "%s",
456 _("minWorkers cannot be larger than maxWorkers"));
457 goto error;
458 }
459
460 if ((maxWorkers == 0 && pool->maxWorkers > 0) ||
461 (maxWorkers > 0 && pool->maxWorkers == 0)) {
462 virReportError(VIR_ERR_INVALID_ARG, "%s",
463 _("maxWorkers must not be switched from zero to non-zero"
464 " and vice versa"));
465 goto error;
466 }
467
468 if (minWorkers >= 0) {
469 if ((size_t) minWorkers > pool->nWorkers &&
470 virThreadPoolExpand(pool, minWorkers - pool->nWorkers,
471 false) < 0)
472 goto error;
473 pool->minWorkers = minWorkers;
474 }
475
476 if (maxWorkers >= 0) {
477 pool->maxWorkers = maxWorkers;
478 virCondBroadcast(&pool->cond);
479 }
480
481 if (prioWorkers >= 0) {
482 if (prioWorkers < pool->nPrioWorkers) {
483 virCondBroadcast(&pool->prioCond);
484 } else if ((size_t) prioWorkers > pool->nPrioWorkers &&
485 virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers,
486 true) < 0) {
487 goto error;
488 }
489 pool->maxPrioWorkers = prioWorkers;
490 }
491
492 virMutexUnlock(&pool->mutex);
493 return 0;
494
495 error:
496 virMutexUnlock(&pool->mutex);
497 return -1;
498 }
499
500 void
virThreadPoolStop(virThreadPool * pool)501 virThreadPoolStop(virThreadPool *pool)
502 {
503 virMutexLock(&pool->mutex);
504 virThreadPoolStopLocked(pool);
505 virMutexUnlock(&pool->mutex);
506 }
507
508 void
virThreadPoolDrain(virThreadPool * pool)509 virThreadPoolDrain(virThreadPool *pool)
510 {
511 virMutexLock(&pool->mutex);
512 virThreadPoolDrainLocked(pool);
513 virMutexUnlock(&pool->mutex);
514 }
515