1 /*!
2  * \file        sccp_threadpool.c
3  * \brief       SCCP Threadpool Class
4  * \author      Diederik de Groot < ddegroot@users.sourceforge.net >
5  * \note        This program is free software and may be modified and distributed under the terms of the GNU Public License.
6  *              See the LICENSE file at the top of the source tree.
7  * \note        Based on the work of Johan Hanssen Seferidis
8  *              Library providing a threading pool where you can add work.
9  * \since       2009-01-16
10  *
11  */
12 
13 #include "config.h"
14 #include "common.h"
15 
16 SCCP_FILE_VERSION(__FILE__, "");
17 #include "sccp_threadpool.h"
18 #include <signal.h>
19 #undef pthread_create
20 #if defined(__GNUC__) && __GNUC__ > 3 && defined(HAVE_SYS_INFO_H)
21 #include <sys/sysinfo.h>											// to retrieve processor info
22 #endif
23 //#define SEMAPHORE_LOCKED	(0)
24 //#define SEMAPHORE_UNLOCKED	(1)
25 void sccp_threadpool_grow_locked(sccp_threadpool_t * tp_p, int amount);
26 void sccp_threadpool_shrink_locked(sccp_threadpool_t * tp_p, int amount);
27 void *sccp_threadpool_thread_do(void *p);
28 
29 typedef struct sccp_threadpool_thread sccp_threadpool_thread_t;
30 
31 struct sccp_threadpool_thread {
32 	pthread_t thread;
33 	sccp_threadpool_t *tp_p;
34 	SCCP_LIST_ENTRY (sccp_threadpool_thread_t) list;
35 	boolean_t die;
36 };
37 
38 /* The threadpool */
39 struct sccp_threadpool {
40 	SCCP_LIST_HEAD (, sccp_threadpool_job_t) jobs;
41 	SCCP_LIST_HEAD (, sccp_threadpool_thread_t) threads;
42 	pbx_cond_t work;
43 	pbx_cond_t exit;
44 	time_t last_size_check;											/*!< Time since last size check */
45 	time_t last_resize;											/*!< Time since last resize */
46 	int job_high_water_mark;										/*!< Highest number of jobs outstanding since last resize check */
47 	volatile int sccp_threadpool_shuttingdown;
48 };
49 
50 /*
51  * Fast reminders:
52  *
53  * tp                   = threadpool
54  * sccp_threadpool      = threadpool
55  * sccp_threadpool_t    = threadpool type
56  * tp_p                 = threadpool pointer
57  * sem                  = semaphore
58  * xN                   = x can be any string. N stands for amount
59  * */
60 
61 /* Initialise thread pool */
sccp_threadpool_init(int threadsN)62 sccp_threadpool_t *sccp_threadpool_init(int threadsN)
63 {
64 	sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_2 "Starting Threadpool\n");
65 	sccp_threadpool_t * tp_p = NULL;
66 
67 #if defined(__GNUC__) && __GNUC__ > 3 && defined(HAVE_SYS_INFO_H)
68 	threadsN = get_nprocs_conf();										// get current number of active processors
69 #endif
70 	if (!threadsN || threadsN < THREADPOOL_MIN_SIZE) {
71 		threadsN = THREADPOOL_MIN_SIZE;
72 	}
73 	if (threadsN > THREADPOOL_MAX_SIZE) {
74 		threadsN = THREADPOOL_MAX_SIZE;
75 	}
76 	/* Make new thread pool */
77 	if (!(tp_p = (sccp_threadpool_t *) sccp_calloc(sizeof *tp_p, 1))) {
78 		pbx_log(LOG_ERROR, SS_Memory_Allocation_Error, "SCCP");
79 		return NULL;
80 	}
81 
82 	/* initialize the thread pool */
83 	SCCP_LIST_HEAD_INIT(&tp_p->threads);
84 
85 	/* Initialise the job queue */
86 	SCCP_LIST_HEAD_INIT(&tp_p->jobs);
87 	tp_p->last_size_check = time(0);
88 	tp_p->job_high_water_mark = 0;
89 	tp_p->last_resize = time(0);
90 	tp_p->sccp_threadpool_shuttingdown = 0;
91 
92 	/* Initialise Condition */
93 	pbx_cond_init(&(tp_p->work), NULL);
94 	pbx_cond_init(&(tp_p->exit), NULL);
95 
96 	/* Make threads in pool */
97 	SCCP_LIST_LOCK(&(tp_p->threads));
98 	sccp_threadpool_grow_locked(tp_p, threadsN);
99 	SCCP_LIST_UNLOCK(&(tp_p->threads));
100 
101 	sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Threadpool Started\n");
102 	return tp_p;
103 }
104 
105 // sccp_threadpool_grow_locked needs to be called with locked &(tp_p->threads)->lock
sccp_threadpool_grow_locked(sccp_threadpool_t * tp_p,int amount)106 void sccp_threadpool_grow_locked(sccp_threadpool_t * tp_p, int amount)
107 {
108 	pthread_attr_t attr;
109 	sccp_threadpool_thread_t * tp_thread = NULL;
110 	int t = 0;
111 
112 	if (tp_p && !tp_p->sccp_threadpool_shuttingdown) {
113 		for (t = 0; t < amount; t++) {
114 			if (!(tp_thread = (sccp_threadpool_thread_t *) sccp_calloc(sizeof *tp_thread, 1))) {
115                 		pbx_log(LOG_ERROR, SS_Memory_Allocation_Error, "SCCP");
116 				return;
117 			}
118 			tp_thread->die = FALSE;
119 			tp_thread->tp_p = tp_p;
120 
121 			pthread_attr_init(&attr);
122 			pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
123 			pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
124 			pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
125 			SCCP_LIST_INSERT_HEAD(&(tp_p->threads), tp_thread, list);
126 			pbx_pthread_create(&(tp_thread->thread), &attr, sccp_threadpool_thread_do, (void *) tp_thread);
127 			sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Created thread %d(%p) in pool \n", t, (void *) tp_thread->thread);
128 			pbx_cond_broadcast(&(tp_p->work));
129 		}
130 	}
131 }
132 
133 // sccp_threadpool_shrink_locked needs to be called with locked &(tp_p->threads)->lock
sccp_threadpool_shrink_locked(sccp_threadpool_t * tp_p,int amount)134 void sccp_threadpool_shrink_locked(sccp_threadpool_t * tp_p, int amount)
135 {
136 	sccp_threadpool_thread_t * tp_thread = NULL;
137 	int t = 0;
138 
139 	if (tp_p && !tp_p->sccp_threadpool_shuttingdown) {
140 		for (t = 0; t < amount; t++) {
141 			SCCP_LIST_TRAVERSE(&(tp_p->threads), tp_thread, list) {
142 				if (tp_thread->die == FALSE) {
143 					tp_thread->die = TRUE;
144 					break;
145 				}
146 			}
147 
148 			if (tp_thread) {
149 				// wake up all threads
150 				sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Sending die signal to thread %p in pool \n", (void *) tp_thread->thread);
151 				pbx_cond_broadcast(&(tp_p->work));
152 			}
153 		}
154 	}
155 }
156 
157 /* check threadpool size (increase/decrease if necessary) */
sccp_threadpool_check_size(sccp_threadpool_t * tp_p)158 static void sccp_threadpool_check_size(sccp_threadpool_t * tp_p)
159 {
160 	if (tp_p && !tp_p->sccp_threadpool_shuttingdown) {
161 		sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_check_resize) in thread: %p\n", (void *) pthread_self());
162 		SCCP_LIST_LOCK(&(tp_p->threads));
163 		{
164 			if (SCCP_LIST_GETSIZE(&tp_p->jobs) > (SCCP_LIST_GETSIZE(&tp_p->threads) * 2) && SCCP_LIST_GETSIZE(&tp_p->threads) < THREADPOOL_MAX_SIZE) {	// increase
165 				sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Add new thread to threadpool %p\n", tp_p);
166 				sccp_threadpool_grow_locked(tp_p, 1);
167 				tp_p->last_resize = time(0);
168 			} else if (((time(0) - tp_p->last_resize) > THREADPOOL_RESIZE_INTERVAL * 3) &&		// wait a little longer to decrease
169 				   (SCCP_LIST_GETSIZE(&tp_p->threads) > THREADPOOL_MIN_SIZE && SCCP_LIST_GETSIZE(&tp_p->jobs) < (SCCP_LIST_GETSIZE(&tp_p->threads) / 2))) {	// decrease
170 				sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Remove thread %d from threadpool %p\n", SCCP_LIST_GETSIZE(&tp_p->threads) - 1, tp_p);
171 				// kill last thread only if it is not executed by itself
172 				sccp_threadpool_shrink_locked(tp_p, 1);
173 				tp_p->last_resize = time(0);
174 			}
175 			tp_p->last_size_check = time(0);
176 			tp_p->job_high_water_mark = SCCP_LIST_GETSIZE(&tp_p->jobs);
177 			sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_check_resize) Number of threads: %d, job_high_water_mark: %d\n", SCCP_LIST_GETSIZE(&tp_p->threads), tp_p->job_high_water_mark);
178 		}
179 		SCCP_LIST_UNLOCK(&(tp_p->threads));
180 	}
181 }
182 
sccp_threadpool_thread_end(void * p)183 static void sccp_threadpool_thread_end(void *p)
184 {
185 	sccp_threadpool_thread_t *tp_thread = (sccp_threadpool_thread_t *) p;
186 	sccp_threadpool_thread_t *res = NULL;
187 	sccp_threadpool_t *tp_p = tp_thread->tp_p;
188 
189 	SCCP_LIST_LOCK(&(tp_p->threads));
190 	res = SCCP_LIST_REMOVE(&(tp_p->threads), tp_thread, list);
191 	SCCP_LIST_UNLOCK(&(tp_p->threads));
192 
193 	pbx_cond_signal(&(tp_p->exit));
194 	if (res) {
195 		sccp_free(res);
196 	}
197 }
198 
199 /* What each individual thread is doing */
sccp_threadpool_thread_do(void * p)200 void *sccp_threadpool_thread_do(void *p)
201 {
202 	sccp_threadpool_thread_t *tp_thread = (sccp_threadpool_thread_t *) p;
203 	sccp_threadpool_t *tp_p = tp_thread->tp_p;
204 	void *thread = (void *) pthread_self();
205 
206 	pthread_cleanup_push(sccp_threadpool_thread_end, tp_thread);
207 
208 	int jobs = 0;
209 
210 	int threads = 0;
211 
212 	sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Starting Threadpool JobQueue:%p\n", thread);
213 	while (1) {
214 		pthread_testcancel();
215 
216 		SCCP_LIST_LOCK(&(tp_p->threads));								/* LOCK */
217 		threads = SCCP_LIST_GETSIZE(&tp_p->threads);
218 		SCCP_LIST_UNLOCK(&(tp_p->threads));
219 
220 		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
221 
222 		SCCP_LIST_LOCK(&(tp_p->jobs));									/* LOCK */
223 		jobs = SCCP_LIST_GETSIZE(&tp_p->jobs);
224 		sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) num_jobs: %d, thread: %p, num_threads: %d\n", jobs, thread, threads);
225 		while (SCCP_LIST_GETSIZE(&tp_p->jobs) == 0 && !tp_thread->die) {
226 			sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) Thread %p Waiting for New Work Condition\n", thread);
227 			pbx_cond_wait(&(tp_p->work), &(tp_p->jobs.lock));
228 		}
229 		if (tp_thread->die && SCCP_LIST_GETSIZE(&tp_p->jobs) == 0) {
230 			sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "JobQueue Die. Exiting thread %p...\n", thread);
231 			SCCP_LIST_UNLOCK(&(tp_p->jobs));
232 			break;
233 		}
234 		sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) Let's work. num_jobs: %d, thread: %p, num_threads: %d\n", jobs, thread, threads);
235 		{
236 			/* Read job from queue and execute it */
237 			void *(*func_buff) (void *arg) = NULL;
238 			void * arg_buff = NULL;
239 			sccp_threadpool_job_t * job = NULL;
240 
241 			if ((job = SCCP_LIST_REMOVE_HEAD(&(tp_p->jobs), list))) {
242 				func_buff = job->function;
243 				arg_buff = job->arg;
244 			}
245 			SCCP_LIST_UNLOCK(&(tp_p->jobs));
246 
247 			sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_thread_do) executing %p in thread: %p\n", job, thread);
248 			if (job) {
249 				func_buff(arg_buff);								/* run function */
250 				sccp_free(job);									/* DEALLOC job */
251 			}
252 			// check number of threads in threadpool
253 			if ((time(0) - tp_p->last_size_check) > THREADPOOL_RESIZE_INTERVAL) {
254 				sccp_threadpool_check_size(tp_p);						/* Check Resizing */
255 			}
256 		}
257 		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
258 	}
259 	sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "JobQueue Exiting Thread...\n");
260 	pthread_cleanup_pop(1);
261 	return NULL;
262 }
263 
264 /* Add work to the thread pool */
sccp_threadpool_add_work(sccp_threadpool_t * tp_p,void * (* function_p)(void *),void * arg_p)265 int sccp_threadpool_add_work(sccp_threadpool_t * tp_p, void *(*function_p) (void *), void *arg_p)
266 {
267 	// prevent new work while shutting down
268 	if (!tp_p->sccp_threadpool_shuttingdown) {
269 		sccp_threadpool_job_t * newJob = NULL;
270 
271 		if (!(newJob = (sccp_threadpool_job_t *) sccp_calloc(sizeof *newJob, 1))) {
272         		pbx_log(LOG_ERROR, SS_Memory_Allocation_Error, "SCCP");
273 			exit(1);
274 		}
275 
276 		/* add function and argument */
277 		newJob->function = function_p;
278 		newJob->arg = arg_p;
279 
280 		/* add job to queue */
281 		sccp_threadpool_jobqueue_add(tp_p, newJob);
282 		return 1;
283 	}
284         pbx_log(LOG_ERROR, "sccp_threadpool_add_work(): Threadpool shutting down, denying new work\n");
285         return 0;
286 }
287 
288 /* Destroy the threadpool */
sccp_threadpool_destroy(sccp_threadpool_t * tp_p)289 boolean_t sccp_threadpool_destroy(sccp_threadpool_t * tp_p)
290 {
291 	if (!tp_p) {
292 		return FALSE;
293 	}
294 	sccp_threadpool_thread_t *tp_thread = NULL;
295 
296 	sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_2 "Destroying Threadpool %p with %d jobs\n", tp_p, SCCP_LIST_GETSIZE(&tp_p->jobs));
297 
298 	// After this point, no new jobs can be added
299 	SCCP_LIST_LOCK(&(tp_p->jobs));
300 	tp_p->sccp_threadpool_shuttingdown = 1;
301 	SCCP_LIST_UNLOCK(&(tp_p->jobs));
302 
303 	// shutdown is a kind of work too
304 	SCCP_LIST_LOCK(&(tp_p->threads));
305 	SCCP_LIST_TRAVERSE(&(tp_p->threads), tp_thread, list) {
306 		tp_thread->die = TRUE;
307 		pbx_cond_broadcast(&(tp_p->work));
308 	}
309 	SCCP_LIST_UNLOCK(&(tp_p->threads));
310 
311 	// wake up jobs untill jobqueue is empty, before shutting down, to make sure all jobs have been processed
312 	pbx_cond_broadcast(&(tp_p->work));
313 
314 	// wait for all threads to exit
315 	if (SCCP_LIST_GETSIZE(&tp_p->threads) != 0) {
316 		struct timespec ts;
317 		struct timeval tp;
318 		int counter = 0;
319 
320 		SCCP_LIST_LOCK(&(tp_p->threads));
321 		sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Waiting for threadpool to wind down. please stand by...\n");
322 		while (SCCP_LIST_GETSIZE(&tp_p->threads) != 0 && counter++ < THREADPOOL_MAX_SIZE) {
323 			gettimeofday(&tp, NULL);
324 			ts.tv_sec = tp.tv_sec;
325 			ts.tv_nsec = tp.tv_usec * 1000;
326 			ts.tv_sec += 1;										// wait max 2 second
327 			pbx_cond_broadcast(&(tp_p->work));
328 			pbx_cond_timedwait(&tp_p->exit, &(tp_p->threads.lock), &ts);
329 		}
330 
331 		/* Make sure threads have finished (should never have to execute) */
332 		if (SCCP_LIST_GETSIZE(&tp_p->threads) != 0) {
333 			while ((tp_thread = SCCP_LIST_REMOVE_HEAD(&(tp_p->threads), list))) {
334 				pbx_log(LOG_ERROR, "Forcing Destroy of thread %p\n", tp_thread);
335 				pthread_cancel(tp_thread->thread);
336 				pthread_kill(tp_thread->thread, SIGURG);
337 				pthread_join(tp_thread->thread, NULL);
338 			}
339 		}
340 		SCCP_LIST_UNLOCK(&(tp_p->threads));
341 	}
342 
343 	/* Dealloc */
344 	pbx_cond_destroy(&(tp_p->work));									/* Remove Condition */
345 	pbx_cond_destroy(&(tp_p->exit));									/* Remove Condition */
346 	SCCP_LIST_HEAD_DESTROY(&(tp_p->jobs));
347 	SCCP_LIST_HEAD_DESTROY(&(tp_p->threads));
348 	sccp_free(tp_p);
349 	tp_p = NULL;												/* DEALLOC thread pool */
350 	sccp_log((DEBUGCAT_CORE)) (VERBOSE_PREFIX_3 "Threadpool Ended\n");
351 	return TRUE;
352 }
353 
sccp_threadpool_thread_count(sccp_threadpool_t * tp_p)354 int __PURE__ sccp_threadpool_thread_count(sccp_threadpool_t * tp_p)
355 {
356 	return SCCP_LIST_GETSIZE(&tp_p->threads);
357 }
358 
359 /* =================== JOB QUEUE OPERATIONS ===================== */
360 
361 /* Add job to queue */
sccp_threadpool_jobqueue_add(sccp_threadpool_t * tp_p,sccp_threadpool_job_t * newjob_p)362 void sccp_threadpool_jobqueue_add(sccp_threadpool_t * tp_p, sccp_threadpool_job_t * newjob_p)
363 {
364 	if (!tp_p || !newjob_p) {
365 		pbx_log(LOG_ERROR, "(sccp_threadpool_jobqueue_add) no tp_p or no work pointer\n");
366 		sccp_free(newjob_p);
367 		return;
368 	}
369 
370 	sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_jobqueue_add) tp_p: %p, jobCount: %d\n", tp_p, SCCP_LIST_GETSIZE(&tp_p->jobs));
371 	SCCP_LIST_LOCK(&(tp_p->jobs));
372 	if (tp_p->sccp_threadpool_shuttingdown) {
373 		pbx_log(LOG_ERROR, "(sccp_threadpool_jobqueue_add) shutting down. skipping work\n");
374 		SCCP_LIST_UNLOCK(&(tp_p->jobs));
375 		sccp_free(newjob_p);
376 		return;
377 	}
378 	SCCP_LIST_INSERT_TAIL(&(tp_p->jobs), newjob_p, list);
379 	SCCP_LIST_UNLOCK(&(tp_p->jobs));
380 
381 	SCCP_LIST_LOCK(&(tp_p->jobs));
382 	int jobs = (int) SCCP_LIST_GETSIZE(&tp_p->jobs);
383 	SCCP_LIST_UNLOCK(&(tp_p->jobs));
384 
385 	if (jobs > tp_p->job_high_water_mark) {
386 		tp_p->job_high_water_mark = jobs;
387 	}
388 	pbx_cond_signal(&(tp_p->work));
389 }
390 
sccp_threadpool_jobqueue_count(sccp_threadpool_t * tp_p)391 int sccp_threadpool_jobqueue_count(sccp_threadpool_t * tp_p)
392 {
393 	sccp_log((DEBUGCAT_THPOOL)) (VERBOSE_PREFIX_3 "(sccp_threadpool_jobqueue_count) tp_p: %p, jobCount: %d\n", tp_p, SCCP_LIST_GETSIZE(&tp_p->jobs));
394 	return SCCP_LIST_GETSIZE(&tp_p->jobs);
395 }
396 
397 
398 #if CS_TEST_FRAMEWORK
399 #include <asterisk/test.h>
400 #define NUM_WORK 50
401 #define test_category "/channels/chan_sccp/threadpool/"
sccp_cli_threadpool_test_thread(void * data)402 static void *sccp_cli_threadpool_test_thread(void *data)
403 {
404 	uint num_loops = rand() % 10000;
405 	for(uint loop = 0; loop < num_loops; loop++) {
406 		usleep(1);
407 	}
408 	return 0;
409 }
410 
AST_TEST_DEFINE(sccp_threadpool_create_destroy)411 AST_TEST_DEFINE(sccp_threadpool_create_destroy)
412 {
413 	switch(cmd) {
414 		case TEST_INIT:
415 			info->name = "create_destroy";
416 			info->category = test_category;
417 			info->summary = "chan-sccp-b threadpool creation/destruction";
418 			info->description = "chan-sccp-b threadpool creation/destruction";
419 			return AST_TEST_NOT_RUN;
420 	        case TEST_EXECUTE:
421 	        	break;
422 	}
423 	sccp_threadpool_t *test_threadpool = NULL;
424 
425 	pbx_test_status_update(test, "Destroy non-existing threadpool: %p\n", test_threadpool);
426 	sccp_threadpool_destroy(test_threadpool);
427 	pbx_test_validate(test, sccp_threadpool_destroy(test_threadpool) == FALSE);
428 
429 	pbx_test_status_update(test, "Create threadpool\n");
430 	test_threadpool = sccp_threadpool_init(THREADPOOL_MIN_SIZE);
431 	pbx_test_validate(test, NULL != test_threadpool);
432 
433 	int current_size = sccp_threadpool_thread_count(test_threadpool);
434 	pbx_test_status_update(test, "Grow threadpool by 3, current %d\n", current_size);
435 	SCCP_LIST_LOCK(&(test_threadpool->threads));
436 	sccp_threadpool_grow_locked(test_threadpool, 3);
437 	SCCP_LIST_UNLOCK(&(test_threadpool->threads));
438 	pbx_test_validate(test, sccp_threadpool_thread_count(test_threadpool) == current_size + 3);
439 
440 	current_size = sccp_threadpool_thread_count(test_threadpool);
441 	pbx_test_status_update(test, "Shrink threadpool by 2, current %d\n", current_size);
442 	SCCP_LIST_LOCK(&(test_threadpool->threads));
443 	sccp_threadpool_shrink_locked(test_threadpool, 2);
444 	SCCP_LIST_UNLOCK(&(test_threadpool->threads));
445 	sleep(1);
446 	pbx_test_validate(test, sccp_threadpool_thread_count(test_threadpool) == current_size - 2);
447 
448 	pbx_test_status_update(test, "Destroy threadpool: %p\n", test_threadpool);
449 	pbx_test_validate(test, sccp_threadpool_destroy(test_threadpool) == TRUE);
450 
451 	return AST_TEST_PASS;
452 }
453 
AST_TEST_DEFINE(sccp_threadpool_work)454 AST_TEST_DEFINE(sccp_threadpool_work)
455 {
456 	switch(cmd) {
457 		case TEST_INIT:
458 			info->name = "addwork";
459 			info->category = test_category;
460 			info->summary = "chan-sccp-b threadpool work";
461 			info->description = "chan-sccp-b threadpool work test";
462 			return AST_TEST_NOT_RUN;
463 	        case TEST_EXECUTE:
464 	        	break;
465 	}
466 	sccp_threadpool_t *test_threadpool = NULL;
467 
468 	pbx_test_status_update(test, "Create Test threadpool\n");
469 	test_threadpool = sccp_threadpool_init(THREADPOOL_MIN_SIZE);
470 	pbx_test_validate(test, NULL != test_threadpool);
471 
472 	int current_size = sccp_threadpool_thread_count(test_threadpool);
473 	pbx_test_status_update(test, "Grow threadpool by 3\n");
474 	SCCP_LIST_LOCK(&(test_threadpool->threads));
475 	sccp_threadpool_grow_locked(test_threadpool, 3);
476 	SCCP_LIST_UNLOCK(&(test_threadpool->threads));
477 	pbx_test_validate(test, sccp_threadpool_thread_count(test_threadpool) == current_size + 3);
478 
479 	if (test_threadpool) {
480 		pbx_test_status_update(test, "Adding work to Test threadpool\n");
481 		int loopcount = 0;
482 		for(uint work = 0; work < NUM_WORK; work++) {
483 			pbx_test_validate(test, sccp_threadpool_add_work(test_threadpool, sccp_cli_threadpool_test_thread, test) > 0);
484 		}
485 
486 		pbx_test_status_update(test, "Waiting for work to finishg in Test threadpool\n");
487 		while (sccp_threadpool_jobqueue_count(test_threadpool) > 0 && loopcount++ < 20) {
488 			pbx_test_status_update(test, "Job Queue: %d, Threads: %d\n", sccp_threadpool_jobqueue_count(test_threadpool), sccp_threadpool_thread_count(test_threadpool));
489 			sleep(1);
490 		}
491 		pbx_test_validate(test, sccp_threadpool_jobqueue_count(test_threadpool) == 0);
492 
493 		pbx_test_status_update(test, "Destroy Test threadpool\n");
494 		sccp_threadpool_destroy(test_threadpool);
495 	}
496 	return AST_TEST_PASS;
497 }
498 
sccp_register_tests(void)499 static void __attribute__((constructor)) sccp_register_tests(void)
500 {
501         AST_TEST_REGISTER(sccp_threadpool_create_destroy);
502         AST_TEST_REGISTER(sccp_threadpool_work);
503 }
504 
sccp_unregister_tests(void)505 static void __attribute__((destructor)) sccp_unregister_tests(void)
506 {
507         AST_TEST_UNREGISTER(sccp_threadpool_create_destroy);
508         AST_TEST_UNREGISTER(sccp_threadpool_work);
509 }
510 #endif
511 
512 // kate: indent-width 8; replace-tabs off; indent-mode cstyle; auto-insert-doxygen on; line-numbers on; tab-indents on; keep-extra-spaces off; auto-brackets off;
513