1 /*
2  * virthreadpool.c: a generic thread pool implementation
3  *
4  * Copyright (C) 2014 Red Hat, Inc.
5  * Copyright (C) 2010 Hu Tao
6  * Copyright (C) 2010 Daniel P. Berrange
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License as published by the Free Software Foundation; either
11  * version 2.1 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library.  If not, see
20  * <http://www.gnu.org/licenses/>.
21  */
22 
23 #include <config.h>
24 
25 #include "virthreadpool.h"
26 #include "viralloc.h"
27 #include "virthread.h"
28 #include "virerror.h"
29 
30 #define VIR_FROM_THIS VIR_FROM_NONE
31 
32 typedef struct _virThreadPoolJob virThreadPoolJob;
33 struct _virThreadPoolJob {
34     virThreadPoolJob *prev;
35     virThreadPoolJob *next;
36     unsigned int priority;
37 
38     void *data;
39 };
40 
41 typedef struct _virThreadPoolJobList virThreadPoolJobList;
42 struct _virThreadPoolJobList {
43     virThreadPoolJob *head;
44     virThreadPoolJob *tail;
45     virThreadPoolJob *firstPrio;
46 };
47 
48 
49 struct _virThreadPool {
50     bool quit;
51 
52     virThreadPoolJobFunc jobFunc;
53     const char *jobName;
54     void *jobOpaque;
55     virThreadPoolJobList jobList;
56     size_t jobQueueDepth;
57 
58     virIdentity *identity;
59 
60     virMutex mutex;
61     virCond cond;
62     virCond quit_cond;
63 
64     size_t maxWorkers;
65     size_t minWorkers;
66     size_t freeWorkers;
67     size_t nWorkers;
68     virThread *workers;
69 
70     size_t maxPrioWorkers;
71     size_t nPrioWorkers;
72     virThread *prioWorkers;
73     virCond prioCond;
74 };
75 
76 struct virThreadPoolWorkerData {
77     virThreadPool *pool;
78     virCond *cond;
79     bool priority;
80 };
81 
82 /* Test whether the worker needs to quit if the current number of workers @count
83  * is greater than @limit actually allows.
84  */
virThreadPoolWorkerQuitHelper(size_t count,size_t limit)85 static inline bool virThreadPoolWorkerQuitHelper(size_t count, size_t limit)
86 {
87     return count > limit;
88 }
89 
virThreadPoolWorker(void * opaque)90 static void virThreadPoolWorker(void *opaque)
91 {
92     struct virThreadPoolWorkerData *data = opaque;
93     virThreadPool *pool = data->pool;
94     virCond *cond = data->cond;
95     bool priority = data->priority;
96     size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
97     size_t *maxLimit = priority ? &pool->maxPrioWorkers : &pool->maxWorkers;
98     virThreadPoolJob *job = NULL;
99 
100     VIR_FREE(data);
101 
102     virMutexLock(&pool->mutex);
103 
104     if (pool->identity)
105         virIdentitySetCurrent(pool->identity);
106 
107     while (1) {
108         /* In order to support async worker termination, we need ensure that
109          * both busy and free workers know if they need to terminated. Thus,
110          * busy workers need to check for this fact before they start waiting for
111          * another job (and before taking another one from the queue); and
112          * free workers need to check for this right after waking up.
113          */
114         if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
115             goto out;
116         while (!pool->quit &&
117                ((!priority && !pool->jobList.head) ||
118                 (priority && !pool->jobList.firstPrio))) {
119             if (!priority)
120                 pool->freeWorkers++;
121             if (virCondWait(cond, &pool->mutex) < 0) {
122                 if (!priority)
123                     pool->freeWorkers--;
124                 goto out;
125             }
126             if (!priority)
127                 pool->freeWorkers--;
128 
129             if (virThreadPoolWorkerQuitHelper(*curWorkers, *maxLimit))
130                 goto out;
131         }
132 
133         if (pool->quit)
134             break;
135 
136         if (priority) {
137             job = pool->jobList.firstPrio;
138         } else {
139             job = pool->jobList.head;
140         }
141 
142         if (job == pool->jobList.firstPrio) {
143             virThreadPoolJob *tmp = job->next;
144             while (tmp) {
145                 if (tmp->priority)
146                     break;
147                 tmp = tmp->next;
148             }
149             pool->jobList.firstPrio = tmp;
150         }
151 
152         if (job->prev)
153             job->prev->next = job->next;
154         else
155             pool->jobList.head = job->next;
156         if (job->next)
157             job->next->prev = job->prev;
158         else
159             pool->jobList.tail = job->prev;
160 
161         pool->jobQueueDepth--;
162 
163         virMutexUnlock(&pool->mutex);
164         (pool->jobFunc)(job->data, pool->jobOpaque);
165         VIR_FREE(job);
166         virMutexLock(&pool->mutex);
167     }
168 
169  out:
170     if (priority)
171         pool->nPrioWorkers--;
172     else
173         pool->nWorkers--;
174     if (pool->nWorkers == 0 && pool->nPrioWorkers == 0)
175         virCondSignal(&pool->quit_cond);
176     virMutexUnlock(&pool->mutex);
177 }
178 
179 static int
virThreadPoolExpand(virThreadPool * pool,size_t gain,bool priority)180 virThreadPoolExpand(virThreadPool *pool, size_t gain, bool priority)
181 {
182     virThread **workers = priority ? &pool->prioWorkers : &pool->workers;
183     size_t *curWorkers = priority ? &pool->nPrioWorkers : &pool->nWorkers;
184     size_t i = 0;
185     struct virThreadPoolWorkerData *data = NULL;
186 
187     VIR_EXPAND_N(*workers, *curWorkers, gain);
188 
189     for (i = 0; i < gain; i++) {
190         g_autofree char *name = NULL;
191 
192         data = g_new0(struct virThreadPoolWorkerData, 1);
193         data->pool = pool;
194         data->cond = priority ? &pool->prioCond : &pool->cond;
195         data->priority = priority;
196 
197         if (priority)
198             name = g_strdup_printf("prio-%s", pool->jobName);
199         else
200             name = g_strdup(pool->jobName);
201 
202         if (virThreadCreateFull(&(*workers)[i],
203                                 false,
204                                 virThreadPoolWorker,
205                                 name,
206                                 true,
207                                 data) < 0) {
208             VIR_FREE(data);
209             virReportSystemError(errno, "%s", _("Failed to create thread"));
210             goto error;
211         }
212     }
213 
214     return 0;
215 
216  error:
217     *curWorkers -= gain - i;
218     return -1;
219 }
220 
221 virThreadPool *
virThreadPoolNewFull(size_t minWorkers,size_t maxWorkers,size_t prioWorkers,virThreadPoolJobFunc func,const char * name,virIdentity * identity,void * opaque)222 virThreadPoolNewFull(size_t minWorkers,
223                      size_t maxWorkers,
224                      size_t prioWorkers,
225                      virThreadPoolJobFunc func,
226                      const char *name,
227                      virIdentity *identity,
228                      void *opaque)
229 {
230     virThreadPool *pool;
231 
232     if (minWorkers > maxWorkers)
233         minWorkers = maxWorkers;
234 
235     pool = g_new0(virThreadPool, 1);
236 
237     pool->jobList.tail = pool->jobList.head = NULL;
238 
239     pool->jobFunc = func;
240     pool->jobName = name;
241     pool->jobOpaque = opaque;
242 
243     if (identity)
244         pool->identity = g_object_ref(identity);
245 
246     if (virMutexInit(&pool->mutex) < 0)
247         goto error;
248     if (virCondInit(&pool->cond) < 0)
249         goto error;
250     if (virCondInit(&pool->prioCond) < 0)
251         goto error;
252     if (virCondInit(&pool->quit_cond) < 0)
253         goto error;
254 
255     pool->minWorkers = minWorkers;
256     pool->maxWorkers = maxWorkers;
257     pool->maxPrioWorkers = prioWorkers;
258 
259     if ((minWorkers > 0) && virThreadPoolExpand(pool, minWorkers, false) < 0)
260         goto error;
261 
262     if ((prioWorkers > 0) && virThreadPoolExpand(pool, prioWorkers, true) < 0)
263         goto error;
264 
265     return pool;
266 
267  error:
268     virThreadPoolFree(pool);
269     return NULL;
270 
271 }
272 
273 
274 static void
virThreadPoolStopLocked(virThreadPool * pool)275 virThreadPoolStopLocked(virThreadPool *pool)
276 {
277     if (pool->quit)
278         return;
279 
280     pool->quit = true;
281     if (pool->nWorkers > 0)
282         virCondBroadcast(&pool->cond);
283     if (pool->nPrioWorkers > 0)
284         virCondBroadcast(&pool->prioCond);
285 }
286 
287 
288 static void
virThreadPoolDrainLocked(virThreadPool * pool)289 virThreadPoolDrainLocked(virThreadPool *pool)
290 {
291     virThreadPoolJob *job;
292 
293     virThreadPoolStopLocked(pool);
294 
295     while (pool->nWorkers > 0 || pool->nPrioWorkers > 0)
296         ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
297 
298     while ((job = pool->jobList.head)) {
299         pool->jobList.head = pool->jobList.head->next;
300         VIR_FREE(job);
301     }
302 }
303 
virThreadPoolFree(virThreadPool * pool)304 void virThreadPoolFree(virThreadPool *pool)
305 {
306     if (!pool)
307         return;
308 
309     virMutexLock(&pool->mutex);
310     virThreadPoolDrainLocked(pool);
311 
312     if (pool->identity)
313         g_object_unref(pool->identity);
314 
315     g_free(pool->workers);
316     virMutexUnlock(&pool->mutex);
317     virMutexDestroy(&pool->mutex);
318     virCondDestroy(&pool->quit_cond);
319     virCondDestroy(&pool->cond);
320     g_free(pool->prioWorkers);
321     virCondDestroy(&pool->prioCond);
322     g_free(pool);
323 }
324 
325 
virThreadPoolGetMinWorkers(virThreadPool * pool)326 size_t virThreadPoolGetMinWorkers(virThreadPool *pool)
327 {
328     size_t ret;
329 
330     virMutexLock(&pool->mutex);
331     ret = pool->minWorkers;
332     virMutexUnlock(&pool->mutex);
333 
334     return ret;
335 }
336 
virThreadPoolGetMaxWorkers(virThreadPool * pool)337 size_t virThreadPoolGetMaxWorkers(virThreadPool *pool)
338 {
339     size_t ret;
340 
341     virMutexLock(&pool->mutex);
342     ret = pool->maxWorkers;
343     virMutexUnlock(&pool->mutex);
344 
345     return ret;
346 }
347 
virThreadPoolGetPriorityWorkers(virThreadPool * pool)348 size_t virThreadPoolGetPriorityWorkers(virThreadPool *pool)
349 {
350     size_t ret;
351 
352     virMutexLock(&pool->mutex);
353     ret = pool->nPrioWorkers;
354     virMutexUnlock(&pool->mutex);
355 
356     return ret;
357 }
358 
virThreadPoolGetCurrentWorkers(virThreadPool * pool)359 size_t virThreadPoolGetCurrentWorkers(virThreadPool *pool)
360 {
361     size_t ret;
362 
363     virMutexLock(&pool->mutex);
364     ret = pool->nWorkers;
365     virMutexUnlock(&pool->mutex);
366 
367     return ret;
368 }
369 
virThreadPoolGetFreeWorkers(virThreadPool * pool)370 size_t virThreadPoolGetFreeWorkers(virThreadPool *pool)
371 {
372     size_t ret;
373 
374     virMutexLock(&pool->mutex);
375     ret = pool->freeWorkers;
376     virMutexUnlock(&pool->mutex);
377 
378     return ret;
379 }
380 
virThreadPoolGetJobQueueDepth(virThreadPool * pool)381 size_t virThreadPoolGetJobQueueDepth(virThreadPool *pool)
382 {
383     size_t ret;
384 
385     virMutexLock(&pool->mutex);
386     ret = pool->jobQueueDepth;
387     virMutexUnlock(&pool->mutex);
388 
389     return ret;
390 }
391 
392 /*
393  * @priority - job priority
394  * Return: 0 on success, -1 otherwise
395  */
virThreadPoolSendJob(virThreadPool * pool,unsigned int priority,void * jobData)396 int virThreadPoolSendJob(virThreadPool *pool,
397                          unsigned int priority,
398                          void *jobData)
399 {
400     virThreadPoolJob *job;
401 
402     virMutexLock(&pool->mutex);
403     if (pool->quit)
404         goto error;
405 
406     if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
407         pool->nWorkers < pool->maxWorkers &&
408         virThreadPoolExpand(pool, 1, false) < 0)
409         goto error;
410 
411     job = g_new0(virThreadPoolJob, 1);
412 
413     job->data = jobData;
414     job->priority = priority;
415 
416     job->prev = pool->jobList.tail;
417     if (pool->jobList.tail)
418         pool->jobList.tail->next = job;
419     pool->jobList.tail = job;
420 
421     if (!pool->jobList.head)
422         pool->jobList.head = job;
423 
424     if (priority && !pool->jobList.firstPrio)
425         pool->jobList.firstPrio = job;
426 
427     pool->jobQueueDepth++;
428 
429     virCondSignal(&pool->cond);
430     if (priority)
431         virCondSignal(&pool->prioCond);
432 
433     virMutexUnlock(&pool->mutex);
434     return 0;
435 
436  error:
437     virMutexUnlock(&pool->mutex);
438     return -1;
439 }
440 
441 int
virThreadPoolSetParameters(virThreadPool * pool,long long int minWorkers,long long int maxWorkers,long long int prioWorkers)442 virThreadPoolSetParameters(virThreadPool *pool,
443                            long long int minWorkers,
444                            long long int maxWorkers,
445                            long long int prioWorkers)
446 {
447     size_t max;
448     size_t min;
449 
450     virMutexLock(&pool->mutex);
451 
452     max = maxWorkers >= 0 ? maxWorkers : pool->maxWorkers;
453     min = minWorkers >= 0 ? minWorkers : pool->minWorkers;
454     if (min > max) {
455         virReportError(VIR_ERR_INVALID_ARG, "%s",
456                        _("minWorkers cannot be larger than maxWorkers"));
457         goto error;
458     }
459 
460     if ((maxWorkers == 0 && pool->maxWorkers > 0) ||
461         (maxWorkers > 0 && pool->maxWorkers == 0)) {
462         virReportError(VIR_ERR_INVALID_ARG, "%s",
463                        _("maxWorkers must not be switched from zero to non-zero"
464                          " and vice versa"));
465         goto error;
466     }
467 
468     if (minWorkers >= 0) {
469         if ((size_t) minWorkers > pool->nWorkers &&
470             virThreadPoolExpand(pool, minWorkers - pool->nWorkers,
471                                 false) < 0)
472             goto error;
473         pool->minWorkers = minWorkers;
474     }
475 
476     if (maxWorkers >= 0) {
477         pool->maxWorkers = maxWorkers;
478         virCondBroadcast(&pool->cond);
479     }
480 
481     if (prioWorkers >= 0) {
482         if (prioWorkers < pool->nPrioWorkers) {
483             virCondBroadcast(&pool->prioCond);
484         } else if ((size_t) prioWorkers > pool->nPrioWorkers &&
485                    virThreadPoolExpand(pool, prioWorkers - pool->nPrioWorkers,
486                                        true) < 0) {
487             goto error;
488         }
489         pool->maxPrioWorkers = prioWorkers;
490     }
491 
492     virMutexUnlock(&pool->mutex);
493     return 0;
494 
495  error:
496     virMutexUnlock(&pool->mutex);
497     return -1;
498 }
499 
500 void
virThreadPoolStop(virThreadPool * pool)501 virThreadPoolStop(virThreadPool *pool)
502 {
503     virMutexLock(&pool->mutex);
504     virThreadPoolStopLocked(pool);
505     virMutexUnlock(&pool->mutex);
506 }
507 
508 void
virThreadPoolDrain(virThreadPool * pool)509 virThreadPoolDrain(virThreadPool *pool)
510 {
511     virMutexLock(&pool->mutex);
512     virThreadPoolDrainLocked(pool);
513     virMutexUnlock(&pool->mutex);
514 }
515