1 /*!
2 * \file sccp_threadpool.c
3 * \brief SCCP Threadpool Class
4 * \author Diederik de Groot < ddegroot@users.sourceforge.net >
5 * \note This program is free software and may be modified and distributed under the terms of the GNU Public License.
6 * See the LICENSE file at the top of the source tree.
7 * \note Based on the work of Johan Hanssen Seferidis
8 * Library providing a threading pool where you can add work.
9 * \since 2009-01-16
10 *
11 */
12
13 #include "config.h"
14 #include "common.h"
15
16 SCCP_FILE_VERSION(__FILE__, "");
17 #include "sccp_threadpool.h"
18 #include <signal.h>
19 #undef pthread_create
20 #if defined(__GNUC__) && __GNUC__ > 3 && defined(HAVE_SYS_INFO_H)
21 #include <sys/sysinfo.h> // to retrieve processor info
22 #endif
23 //#define SEMAPHORE_LOCKED (0)
24 //#define SEMAPHORE_UNLOCKED (1)
25 void sccp_threadpool_grow_locked(sccp_threadpool_t * tp_p, int amount);
26 void sccp_threadpool_shrink_locked(sccp_threadpool_t * tp_p, int amount);
27 void *sccp_threadpool_thread_do(void *p);
28
29 typedef struct sccp_threadpool_thread sccp_threadpool_thread_t;
30
31 struct sccp_threadpool_thread {
32 pthread_t thread;
33 sccp_threadpool_t *tp_p;
34 SCCP_LIST_ENTRY (sccp_threadpool_thread_t) list;
35 boolean_t die;
36 };
37
38 /* The threadpool */
39 struct sccp_threadpool {
40 SCCP_LIST_HEAD (, sccp_threadpool_job_t) jobs;
41 SCCP_LIST_HEAD (, sccp_threadpool_thread_t) threads;
42 pbx_cond_t work;
43 pbx_cond_t exit;
44 time_t last_size_check; /*!< Time since last size check */
45 time_t last_resize; /*!< Time since last resize */
46 int job_high_water_mark; /*!< Highest number of jobs outstanding since last resize check */
47 volatile int sccp_threadpool_shuttingdown;
48 };
49
50 /*
51 * Fast reminders:
52 *
53 * tp = threadpool
54 * sccp_threadpool = threadpool
55 * sccp_threadpool_t = threadpool type
56 * tp_p = threadpool pointer
57 * sem = semaphore
58 * xN = x can be any string. N stands for amount
59 * */
60
61 /* Initialise thread pool */
sccp_threadpool_init(int threadsN)62 sccp_threadpool_t *sccp_threadpool_init(int threadsN)
63 {
64 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_2 "Starting Threadpool\n");
65 sccp_threadpool_t * tp_p = NULL;
66
67 #if defined(__GNUC__) && __GNUC__ > 3 && defined(HAVE_SYS_INFO_H)
68 threadsN = get_nprocs_conf(); // get current number of active processors
69 #endif
70 if (!threadsN || threadsN < THREADPOOL_MIN_SIZE) {
71 threadsN = THREADPOOL_MIN_SIZE;
72 }
73 if (threadsN > THREADPOOL_MAX_SIZE) {
74 threadsN = THREADPOOL_MAX_SIZE;
75 }
76 /* Make new thread pool */
77 if (!(tp_p = (sccp_threadpool_t *) sccp_calloc(sizeof *tp_p, 1))) {
78 pbx_log(LOG_ERROR, SS_Memory_Allocation_Error, "SCCP");
79 return NULL;
80 }
81
82 /* initialize the thread pool */
83 SCCP_LIST_HEAD_INIT(&tp_p->threads);
84
85 /* Initialise the job queue */
86 SCCP_LIST_HEAD_INIT(&tp_p->jobs);
87 tp_p->last_size_check = time(0);
88 tp_p->job_high_water_mark = 0;
89 tp_p->last_resize = time(0);
90 tp_p->sccp_threadpool_shuttingdown = 0;
91
92 /* Initialise Condition */
93 pbx_cond_init(&(tp_p->work), NULL);
94 pbx_cond_init(&(tp_p->exit), NULL);
95
96 /* Make threads in pool */
97 SCCP_LIST_LOCK(&(tp_p->threads));
98 sccp_threadpool_grow_locked(tp_p, threadsN);
99 SCCP_LIST_UNLOCK(&(tp_p->threads));
100
101 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Threadpool Started\n");
102 return tp_p;
103 }
104
105 // sccp_threadpool_grow_locked needs to be called with locked &(tp_p->threads)->lock
sccp_threadpool_grow_locked(sccp_threadpool_t * tp_p,int amount)106 void sccp_threadpool_grow_locked(sccp_threadpool_t * tp_p, int amount)
107 {
108 pthread_attr_t attr;
109 sccp_threadpool_thread_t * tp_thread = NULL;
110 int t = 0;
111
112 if (tp_p && !tp_p->sccp_threadpool_shuttingdown) {
113 for (t = 0; t < amount; t++) {
114 if (!(tp_thread = (sccp_threadpool_thread_t *) sccp_calloc(sizeof *tp_thread, 1))) {
115 pbx_log(LOG_ERROR, SS_Memory_Allocation_Error, "SCCP");
116 return;
117 }
118 tp_thread->die = FALSE;
119 tp_thread->tp_p = tp_p;
120
121 pthread_attr_init(&attr);
122 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
123 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
124 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
125 SCCP_LIST_INSERT_HEAD(&(tp_p->threads), tp_thread, list);
126 pbx_pthread_create(&(tp_thread->thread), &attr, sccp_threadpool_thread_do, (void *) tp_thread);
127 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Created thread %d(%p) in pool \n", t, (void *) tp_thread->thread);
128 pbx_cond_broadcast(&(tp_p->work));
129 }
130 }
131 }
132
133 // sccp_threadpool_shrink_locked needs to be called with locked &(tp_p->threads)->lock
sccp_threadpool_shrink_locked(sccp_threadpool_t * tp_p,int amount)134 void sccp_threadpool_shrink_locked(sccp_threadpool_t * tp_p, int amount)
135 {
136 sccp_threadpool_thread_t * tp_thread = NULL;
137 int t = 0;
138
139 if (tp_p && !tp_p->sccp_threadpool_shuttingdown) {
140 for (t = 0; t < amount; t++) {
141 SCCP_LIST_TRAVERSE(&(tp_p->threads), tp_thread, list) {
142 if (tp_thread->die == FALSE) {
143 tp_thread->die = TRUE;
144 break;
145 }
146 }
147
148 if (tp_thread) {
149 // wake up all threads
150 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Sending die signal to thread %p in pool \n", (void *) tp_thread->thread);
151 pbx_cond_broadcast(&(tp_p->work));
152 }
153 }
154 }
155 }
156
157 /* check threadpool size (increase/decrease if necessary) */
sccp_threadpool_check_size(sccp_threadpool_t * tp_p)158 static void sccp_threadpool_check_size(sccp_threadpool_t * tp_p)
159 {
160 if (tp_p && !tp_p->sccp_threadpool_shuttingdown) {
161 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_check_resize) in thread: %p\n", (void *) pthread_self());
162 SCCP_LIST_LOCK(&(tp_p->threads));
163 {
164 if (SCCP_LIST_GETSIZE(&tp_p->jobs) > (SCCP_LIST_GETSIZE(&tp_p->threads) * 2) && SCCP_LIST_GETSIZE(&tp_p->threads) < THREADPOOL_MAX_SIZE) { // increase
165 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Add new thread to threadpool %p\n", tp_p);
166 sccp_threadpool_grow_locked(tp_p, 1);
167 tp_p->last_resize = time(0);
168 } else if (((time(0) - tp_p->last_resize) > THREADPOOL_RESIZE_INTERVAL * 3) && // wait a little longer to decrease
169 (SCCP_LIST_GETSIZE(&tp_p->threads) > THREADPOOL_MIN_SIZE && SCCP_LIST_GETSIZE(&tp_p->jobs) < (SCCP_LIST_GETSIZE(&tp_p->threads) / 2))) { // decrease
170 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Remove thread %d from threadpool %p\n", SCCP_LIST_GETSIZE(&tp_p->threads) - 1, tp_p);
171 // kill last thread only if it is not executed by itself
172 sccp_threadpool_shrink_locked(tp_p, 1);
173 tp_p->last_resize = time(0);
174 }
175 tp_p->last_size_check = time(0);
176 tp_p->job_high_water_mark = SCCP_LIST_GETSIZE(&tp_p->jobs);
177 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_check_resize) Number of threads: %d, job_high_water_mark: %d\n", SCCP_LIST_GETSIZE(&tp_p->threads), tp_p->job_high_water_mark);
178 }
179 SCCP_LIST_UNLOCK(&(tp_p->threads));
180 }
181 }
182
sccp_threadpool_thread_end(void * p)183 static void sccp_threadpool_thread_end(void *p)
184 {
185 sccp_threadpool_thread_t *tp_thread = (sccp_threadpool_thread_t *) p;
186 sccp_threadpool_thread_t *res = NULL;
187 sccp_threadpool_t *tp_p = tp_thread->tp_p;
188
189 SCCP_LIST_LOCK(&(tp_p->threads));
190 res = SCCP_LIST_REMOVE(&(tp_p->threads), tp_thread, list);
191 SCCP_LIST_UNLOCK(&(tp_p->threads));
192
193 pbx_cond_signal(&(tp_p->exit));
194 if (res) {
195 sccp_free(res);
196 }
197 }
198
199 /* What each individual thread is doing */
sccp_threadpool_thread_do(void * p)200 void *sccp_threadpool_thread_do(void *p)
201 {
202 sccp_threadpool_thread_t *tp_thread = (sccp_threadpool_thread_t *) p;
203 sccp_threadpool_t *tp_p = tp_thread->tp_p;
204 void *thread = (void *) pthread_self();
205
206 pthread_cleanup_push(sccp_threadpool_thread_end, tp_thread);
207
208 int jobs = 0;
209
210 int threads = 0;
211
212 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Starting Threadpool JobQueue:%p\n", thread);
213 while (1) {
214 pthread_testcancel();
215
216 SCCP_LIST_LOCK(&(tp_p->threads)); /* LOCK */
217 threads = SCCP_LIST_GETSIZE(&tp_p->threads);
218 SCCP_LIST_UNLOCK(&(tp_p->threads));
219
220 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
221
222 SCCP_LIST_LOCK(&(tp_p->jobs)); /* LOCK */
223 jobs = SCCP_LIST_GETSIZE(&tp_p->jobs);
224 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) num_jobs: %d, thread: %p, num_threads: %d\n", jobs, thread, threads);
225 while (SCCP_LIST_GETSIZE(&tp_p->jobs) == 0 && !tp_thread->die) {
226 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) Thread %p Waiting for New Work Condition\n", thread);
227 pbx_cond_wait(&(tp_p->work), &(tp_p->jobs.lock));
228 }
229 if (tp_thread->die && SCCP_LIST_GETSIZE(&tp_p->jobs) == 0) {
230 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "JobQueue Die. Exiting thread %p...\n", thread);
231 SCCP_LIST_UNLOCK(&(tp_p->jobs));
232 break;
233 }
234 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) Let's work. num_jobs: %d, thread: %p, num_threads: %d\n", jobs, thread, threads);
235 {
236 /* Read job from queue and execute it */
237 void *(*func_buff) (void *arg) = NULL;
238 void * arg_buff = NULL;
239 sccp_threadpool_job_t * job = NULL;
240
241 if ((job = SCCP_LIST_REMOVE_HEAD(&(tp_p->jobs), list))) {
242 func_buff = job->function;
243 arg_buff = job->arg;
244 }
245 SCCP_LIST_UNLOCK(&(tp_p->jobs));
246
247 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) executing %p in thread: %p\n", job, thread);
248 if (job) {
249 func_buff(arg_buff); /* run function */
250 sccp_free(job); /* DEALLOC job */
251 }
252 // check number of threads in threadpool
253 if ((time(0) - tp_p->last_size_check) > THREADPOOL_RESIZE_INTERVAL) {
254 sccp_threadpool_check_size(tp_p); /* Check Resizing */
255 }
256 }
257 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
258 }
259 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "JobQueue Exiting Thread...\n");
260 pthread_cleanup_pop(1);
261 return NULL;
262 }
263
264 /* Add work to the thread pool */
sccp_threadpool_add_work(sccp_threadpool_t * tp_p,void * (* function_p)(void *),void * arg_p)265 int sccp_threadpool_add_work(sccp_threadpool_t * tp_p, void *(*function_p) (void *), void *arg_p)
266 {
267 // prevent new work while shutting down
268 if (!tp_p->sccp_threadpool_shuttingdown) {
269 sccp_threadpool_job_t * newJob = NULL;
270
271 if (!(newJob = (sccp_threadpool_job_t *) sccp_calloc(sizeof *newJob, 1))) {
272 pbx_log(LOG_ERROR, SS_Memory_Allocation_Error, "SCCP");
273 exit(1);
274 }
275
276 /* add function and argument */
277 newJob->function = function_p;
278 newJob->arg = arg_p;
279
280 /* add job to queue */
281 sccp_threadpool_jobqueue_add(tp_p, newJob);
282 return 1;
283 }
284 pbx_log(LOG_ERROR, "sccp_threadpool_add_work(): Threadpool shutting down, denying new work\n");
285 return 0;
286 }
287
288 /* Destroy the threadpool */
sccp_threadpool_destroy(sccp_threadpool_t * tp_p)289 boolean_t sccp_threadpool_destroy(sccp_threadpool_t * tp_p)
290 {
291 if (!tp_p) {
292 return FALSE;
293 }
294 sccp_threadpool_thread_t *tp_thread = NULL;
295
296 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_2 "Destroying Threadpool %p with %d jobs\n", tp_p, SCCP_LIST_GETSIZE(&tp_p->jobs));
297
298 // After this point, no new jobs can be added
299 SCCP_LIST_LOCK(&(tp_p->jobs));
300 tp_p->sccp_threadpool_shuttingdown = 1;
301 SCCP_LIST_UNLOCK(&(tp_p->jobs));
302
303 // shutdown is a kind of work too
304 SCCP_LIST_LOCK(&(tp_p->threads));
305 SCCP_LIST_TRAVERSE(&(tp_p->threads), tp_thread, list) {
306 tp_thread->die = TRUE;
307 pbx_cond_broadcast(&(tp_p->work));
308 }
309 SCCP_LIST_UNLOCK(&(tp_p->threads));
310
311 // wake up jobs untill jobqueue is empty, before shutting down, to make sure all jobs have been processed
312 pbx_cond_broadcast(&(tp_p->work));
313
314 // wait for all threads to exit
315 if (SCCP_LIST_GETSIZE(&tp_p->threads) != 0) {
316 struct timespec ts;
317 struct timeval tp;
318 int counter = 0;
319
320 SCCP_LIST_LOCK(&(tp_p->threads));
321 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Waiting for threadpool to wind down. please stand by...\n");
322 while (SCCP_LIST_GETSIZE(&tp_p->threads) != 0 && counter++ < THREADPOOL_MAX_SIZE) {
323 gettimeofday(&tp, NULL);
324 ts.tv_sec = tp.tv_sec;
325 ts.tv_nsec = tp.tv_usec * 1000;
326 ts.tv_sec += 1; // wait max 2 second
327 pbx_cond_broadcast(&(tp_p->work));
328 pbx_cond_timedwait(&tp_p->exit, &(tp_p->threads.lock), &ts);
329 }
330
331 /* Make sure threads have finished (should never have to execute) */
332 if (SCCP_LIST_GETSIZE(&tp_p->threads) != 0) {
333 while ((tp_thread = SCCP_LIST_REMOVE_HEAD(&(tp_p->threads), list))) {
334 pbx_log(LOG_ERROR, "Forcing Destroy of thread %p\n", tp_thread);
335 pthread_cancel(tp_thread->thread);
336 pthread_kill(tp_thread->thread, SIGURG);
337 pthread_join(tp_thread->thread, NULL);
338 }
339 }
340 SCCP_LIST_UNLOCK(&(tp_p->threads));
341 }
342
343 /* Dealloc */
344 pbx_cond_destroy(&(tp_p->work)); /* Remove Condition */
345 pbx_cond_destroy(&(tp_p->exit)); /* Remove Condition */
346 SCCP_LIST_HEAD_DESTROY(&(tp_p->jobs));
347 SCCP_LIST_HEAD_DESTROY(&(tp_p->threads));
348 sccp_free(tp_p);
349 tp_p = NULL; /* DEALLOC thread pool */
350 sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Threadpool Ended\n");
351 return TRUE;
352 }
353
sccp_threadpool_thread_count(sccp_threadpool_t * tp_p)354 int __PURE__ sccp_threadpool_thread_count(sccp_threadpool_t * tp_p)
355 {
356 return SCCP_LIST_GETSIZE(&tp_p->threads);
357 }
358
359 /* =================== JOB QUEUE OPERATIONS ===================== */
360
361 /* Add job to queue */
sccp_threadpool_jobqueue_add(sccp_threadpool_t * tp_p,sccp_threadpool_job_t * newjob_p)362 void sccp_threadpool_jobqueue_add(sccp_threadpool_t * tp_p, sccp_threadpool_job_t * newjob_p)
363 {
364 if (!tp_p || !newjob_p) {
365 pbx_log(LOG_ERROR, "(sccp_threadpool_jobqueue_add) no tp_p or no work pointer\n");
366 sccp_free(newjob_p);
367 return;
368 }
369
370 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_jobqueue_add) tp_p: %p, jobCount: %d\n", tp_p, SCCP_LIST_GETSIZE(&tp_p->jobs));
371 SCCP_LIST_LOCK(&(tp_p->jobs));
372 if (tp_p->sccp_threadpool_shuttingdown) {
373 pbx_log(LOG_ERROR, "(sccp_threadpool_jobqueue_add) shutting down. skipping work\n");
374 SCCP_LIST_UNLOCK(&(tp_p->jobs));
375 sccp_free(newjob_p);
376 return;
377 }
378 SCCP_LIST_INSERT_TAIL(&(tp_p->jobs), newjob_p, list);
379 SCCP_LIST_UNLOCK(&(tp_p->jobs));
380
381 SCCP_LIST_LOCK(&(tp_p->jobs));
382 int jobs = (int) SCCP_LIST_GETSIZE(&tp_p->jobs);
383 SCCP_LIST_UNLOCK(&(tp_p->jobs));
384
385 if (jobs > tp_p->job_high_water_mark) {
386 tp_p->job_high_water_mark = jobs;
387 }
388 pbx_cond_signal(&(tp_p->work));
389 }
390
sccp_threadpool_jobqueue_count(sccp_threadpool_t * tp_p)391 int sccp_threadpool_jobqueue_count(sccp_threadpool_t * tp_p)
392 {
393 sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_jobqueue_count) tp_p: %p, jobCount: %d\n", tp_p, SCCP_LIST_GETSIZE(&tp_p->jobs));
394 return SCCP_LIST_GETSIZE(&tp_p->jobs);
395 }
396
397
398 #if CS_TEST_FRAMEWORK
399 #include <asterisk/test.h>
400 #define NUM_WORK 50
401 #define test_category "/channels/chan_sccp/threadpool/"
sccp_cli_threadpool_test_thread(void * data)402 static void *sccp_cli_threadpool_test_thread(void *data)
403 {
404 uint num_loops = rand() % 10000;
405 for(uint loop = 0; loop < num_loops; loop++) {
406 usleep(1);
407 }
408 return 0;
409 }
410
AST_TEST_DEFINE(sccp_threadpool_create_destroy)411 AST_TEST_DEFINE(sccp_threadpool_create_destroy)
412 {
413 switch(cmd) {
414 case TEST_INIT:
415 info->name = "create_destroy";
416 info->category = test_category;
417 info->summary = "chan-sccp-b threadpool creation/destruction";
418 info->description = "chan-sccp-b threadpool creation/destruction";
419 return AST_TEST_NOT_RUN;
420 case TEST_EXECUTE:
421 break;
422 }
423 sccp_threadpool_t *test_threadpool = NULL;
424
425 pbx_test_status_update(test, "Destroy non-existing threadpool: %p\n", test_threadpool);
426 sccp_threadpool_destroy(test_threadpool);
427 pbx_test_validate(test, sccp_threadpool_destroy(test_threadpool) == FALSE);
428
429 pbx_test_status_update(test, "Create threadpool\n");
430 test_threadpool = sccp_threadpool_init(THREADPOOL_MIN_SIZE);
431 pbx_test_validate(test, NULL != test_threadpool);
432
433 int current_size = sccp_threadpool_thread_count(test_threadpool);
434 pbx_test_status_update(test, "Grow threadpool by 3, current %d\n", current_size);
435 SCCP_LIST_LOCK(&(test_threadpool->threads));
436 sccp_threadpool_grow_locked(test_threadpool, 3);
437 SCCP_LIST_UNLOCK(&(test_threadpool->threads));
438 pbx_test_validate(test, sccp_threadpool_thread_count(test_threadpool) == current_size + 3);
439
440 current_size = sccp_threadpool_thread_count(test_threadpool);
441 pbx_test_status_update(test, "Shrink threadpool by 2, current %d\n", current_size);
442 SCCP_LIST_LOCK(&(test_threadpool->threads));
443 sccp_threadpool_shrink_locked(test_threadpool, 2);
444 SCCP_LIST_UNLOCK(&(test_threadpool->threads));
445 sleep(1);
446 pbx_test_validate(test, sccp_threadpool_thread_count(test_threadpool) == current_size - 2);
447
448 pbx_test_status_update(test, "Destroy threadpool: %p\n", test_threadpool);
449 pbx_test_validate(test, sccp_threadpool_destroy(test_threadpool) == TRUE);
450
451 return AST_TEST_PASS;
452 }
453
AST_TEST_DEFINE(sccp_threadpool_work)454 AST_TEST_DEFINE(sccp_threadpool_work)
455 {
456 switch(cmd) {
457 case TEST_INIT:
458 info->name = "addwork";
459 info->category = test_category;
460 info->summary = "chan-sccp-b threadpool work";
461 info->description = "chan-sccp-b threadpool work test";
462 return AST_TEST_NOT_RUN;
463 case TEST_EXECUTE:
464 break;
465 }
466 sccp_threadpool_t *test_threadpool = NULL;
467
468 pbx_test_status_update(test, "Create Test threadpool\n");
469 test_threadpool = sccp_threadpool_init(THREADPOOL_MIN_SIZE);
470 pbx_test_validate(test, NULL != test_threadpool);
471
472 int current_size = sccp_threadpool_thread_count(test_threadpool);
473 pbx_test_status_update(test, "Grow threadpool by 3\n");
474 SCCP_LIST_LOCK(&(test_threadpool->threads));
475 sccp_threadpool_grow_locked(test_threadpool, 3);
476 SCCP_LIST_UNLOCK(&(test_threadpool->threads));
477 pbx_test_validate(test, sccp_threadpool_thread_count(test_threadpool) == current_size + 3);
478
479 if (test_threadpool) {
480 pbx_test_status_update(test, "Adding work to Test threadpool\n");
481 int loopcount = 0;
482 for(uint work = 0; work < NUM_WORK; work++) {
483 pbx_test_validate(test, sccp_threadpool_add_work(test_threadpool, sccp_cli_threadpool_test_thread, test) > 0);
484 }
485
486 pbx_test_status_update(test, "Waiting for work to finishg in Test threadpool\n");
487 while (sccp_threadpool_jobqueue_count(test_threadpool) > 0 && loopcount++ < 20) {
488 pbx_test_status_update(test, "Job Queue: %d, Threads: %d\n", sccp_threadpool_jobqueue_count(test_threadpool), sccp_threadpool_thread_count(test_threadpool));
489 sleep(1);
490 }
491 pbx_test_validate(test, sccp_threadpool_jobqueue_count(test_threadpool) == 0);
492
493 pbx_test_status_update(test, "Destroy Test threadpool\n");
494 sccp_threadpool_destroy(test_threadpool);
495 }
496 return AST_TEST_PASS;
497 }
498
sccp_register_tests(void)499 static void __attribute__((constructor)) sccp_register_tests(void)
500 {
501 AST_TEST_REGISTER(sccp_threadpool_create_destroy);
502 AST_TEST_REGISTER(sccp_threadpool_work);
503 }
504
sccp_unregister_tests(void)505 static void __attribute__((destructor)) sccp_unregister_tests(void)
506 {
507 AST_TEST_UNREGISTER(sccp_threadpool_create_destroy);
508 AST_TEST_UNREGISTER(sccp_threadpool_work);
509 }
510 #endif
511
512 // kate: indent-width 8; replace-tabs off; indent-mode cstyle; auto-insert-doxygen on; line-numbers on; tab-indents on; keep-extra-spaces off; auto-brackets off;
513