1 /* Support for thread pools ... like threadgroups, but lighter.
2  *
3  * 18/3/10
4  * 	- from threadgroup.c
5  * 	- distributed work allocation idea from Christian Blenia, thank you
6  * 	  very much
7  * 21/3/10
8  * 	- progress feedback
9  * 	- only expose VipsThreadState
10  * 11/5/10
11  * 	- argh, stopping many threads could sometimes leave allocated work
12  * 	  undone
13  * 17/7/10
14  * 	- set pool->error whenever we set thr->error, lets us catch allocate
15  * 	  errors (thanks Tim)
16  * 25/7/14
17  * 	- limit nthr on tiny images
18  * 6/3/17
19  * 	- remove single-thread-first-request thing, new seq system makes it
20  * 	  unnecessary
21  * 23/4/17
22  * 	- add ->stall
23  * 	- don't depend on image width when setting n_lines
24  * 27/2/19 jtorresfabra
25  * 	- free threadpool earlier
26  * 02/02/20 kleisauke
27  *	- reuse threads by using GLib's threadpool
28  * 	- remove mutex lock for VipsThreadStartFn
29  */
30 
31 /*
32 
33     This file is part of VIPS.
34 
35     VIPS is free software; you can redistribute it and/or modify
36     it under the terms of the GNU Lesser General Public License as published by
37     the Free Software Foundation; either version 2 of the License, or
38     (at your option) any later version.
39 
40     This program is distributed in the hope that it will be useful,
41     but WITHOUT ANY WARRANTY; without even the implied warranty of
42     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
43     GNU Lesser General Public License for more details.
44 
45     You should have received a copy of the GNU Lesser General Public License
46     along with this program; if not, write to the Free Software
47     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
48     02110-1301  USA
49 
50  */
51 
52 /*
53 
54     These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
55 
56  */
57 
58 /*
59 #define VIPS_DEBUG
60 #define VIPS_DEBUG_RED
61  */
62 
63 #ifdef HAVE_CONFIG_H
64 #include <config.h>
65 #endif /*HAVE_CONFIG_H*/
66 #include <vips/intl.h>
67 
68 #include <stdio.h>
69 #include <stdlib.h>
70 #ifdef HAVE_UNISTD_H
71 #include <unistd.h>
72 #endif /*HAVE_UNISTD_H*/
73 #include <errno.h>
74 
75 #include <vips/vips.h>
76 #include <vips/internal.h>
77 #include <vips/thread.h>
78 #include <vips/debug.h>
79 
80 #ifdef G_OS_WIN32
81 #include <windows.h>
82 #endif /*G_OS_WIN32*/
83 
84 /**
85  * SECTION: threadpool
86  * @short_description: pools of worker threads
87  * @stability: Stable
88  * @see_also: <link linkend="libvips-generate">generate</link>
89  * @include: vips/vips.h
90  * @title: VipsThreadpool
91  *
92  * A threadpool which allows reusing already started threads. Implementing
93  * this can be tedious and error-prone. Therefore we use the GLib
94  * provided threadpool for our convenience. An added advantage is, that
95  * the threads can be shared between the different subsystems, when they
96  * are using GLib.
97  *
98  * The threadpool is created during vips_init() and is destroyed by
99  * vips_shutdown().
100  *
101  * vips_threadpool_run() loops a set of threads over an image. Threads take it
102  * in turns to allocate units of work (a unit might be a tile in an image),
103  * then run in parallel to process those units. An optional progress function
104  * can be used to give feedback.
105  */
106 
107 /* Maximum number of concurrent threads we allow. It prevents huge values of
108  * VIPS_CONCURRENCY killing the system.
109  */
110 #define MAX_THREADS (1024)
111 
112 /* Default tile geometry ... can be set by vips_init().
113  */
114 int vips__tile_width = VIPS__TILE_WIDTH;
115 int vips__tile_height = VIPS__TILE_HEIGHT;
116 int vips__fatstrip_height = VIPS__FATSTRIP_HEIGHT;
117 int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT;
118 
119 /* Default n threads ... 0 means get from environment.
120  */
121 int vips__concurrency = 0;
122 
123 /* Set this GPrivate to indicate that this thread is a worker inside
124  * the vips threadpool.
125  */
126 static GPrivate *is_worker_key = NULL;
127 
128 /* Set to stall threads for debugging.
129  */
130 static gboolean vips__stall = FALSE;
131 
132 /* The thread pool we'll use.
133  */
134 static GThreadPool *vips__pool = NULL;
135 
136 /* Glib 2.32 revised the thread API. We need some compat functions.
137  */
138 
139 GMutex *
vips_g_mutex_new(void)140 vips_g_mutex_new( void )
141 {
142 	GMutex *mutex;
143 
144 	mutex = g_new( GMutex, 1 );
145 	g_mutex_init( mutex );
146 
147 	return( mutex );
148 }
149 
150 void
vips_g_mutex_free(GMutex * mutex)151 vips_g_mutex_free( GMutex *mutex )
152 {
153 	g_mutex_clear( mutex );
154 	g_free( mutex );
155 }
156 
157 GCond *
vips_g_cond_new(void)158 vips_g_cond_new( void )
159 {
160 	GCond *cond;
161 
162 	cond = g_new( GCond, 1 );
163 	g_cond_init( cond );
164 
165 	return( cond );
166 }
167 
168 void
vips_g_cond_free(GCond * cond)169 vips_g_cond_free( GCond *cond )
170 {
171 	g_cond_clear( cond );
172 	g_free( cond );
173 }
174 
175 /* TRUE if we are a vips worker thread. We sometimes manage resource allocation
176  * differently for vips workers since we can cheaply free stuff on thread
177  * termination.
178  */
179 gboolean
vips_thread_isworker(void)180 vips_thread_isworker( void )
181 {
182 	return( g_private_get( is_worker_key ) != NULL );
183 }
184 
185 typedef struct {
186 	const char *domain;
187 	GThreadFunc func;
188 	gpointer data;
189 } VipsThreadInfo;
190 
191 static void *
vips_thread_run(gpointer data)192 vips_thread_run( gpointer data )
193 {
194 	VipsThreadInfo *info = (VipsThreadInfo *) data;
195 
196 	void *result;
197 
198 	/* Set this to something (anything) to tag this thread as a vips
199 	 * worker.
200 	 */
201 	g_private_set( is_worker_key, data );
202 
203 	if( vips__thread_profile )
204 		vips__thread_profile_attach( info->domain );
205 
206 	result = info->func( info->data );
207 
208 	g_free( info );
209 
210 	vips_thread_shutdown();
211 
212 	return( result );
213 }
214 
215 GThread *
vips_g_thread_new(const char * domain,GThreadFunc func,gpointer data)216 vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data )
217 {
218 	GThread *thread;
219 	VipsThreadInfo *info;
220 	GError *error = NULL;
221 
222 	info = g_new( VipsThreadInfo, 1 );
223 	info->domain = domain;
224 	info->func = func;
225 	info->data = data;
226 
227 	thread = g_thread_try_new( domain, vips_thread_run, info, &error );
228 
229 	VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n",
230 		domain, thread );
231 
232 	if( !thread ) {
233 		if( error )
234 			vips_g_error( &error );
235 		else
236 			vips_error( domain,
237 				"%s", _( "unable to create thread" ) );
238 	}
239 
240 	return( thread );
241 }
242 
243 void *
vips_g_thread_join(GThread * thread)244 vips_g_thread_join( GThread *thread )
245 {
246 	void *result;
247 
248 	result = g_thread_join( thread );
249 
250 	VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n",
251 		thread );
252 
253 	return( result );
254 }
255 
256 typedef struct {
257 	/* An name for this thread.
258 	 */
259 	const char *name;
260 
261 	/* The function to execute within the #VipsThreadPool.
262 	 */
263 	GFunc func;
264 
265 	/* User data that is handed over to func when it is called.
266 	 */
267 	gpointer data;
268 } VipsThreadExec;
269 
270 static void
vips_thread_main_loop(gpointer thread_data,gpointer pool_data)271 vips_thread_main_loop( gpointer thread_data, gpointer pool_data )
272 {
273 	VipsThreadExec *exec = (VipsThreadExec *) thread_data;
274 
275 	/* Set this to something (anything) to tag this thread as a vips
276 	 * worker. No need to call g_private_replace as there is no
277 	 * GDestroyNotify handler associated with a worker.
278 	 */
279 	g_private_set( is_worker_key, thread_data );
280 
281 	if( vips__thread_profile )
282 		vips__thread_profile_attach( exec->name );
283 
284 	exec->func( exec->data, pool_data );
285 
286 	g_free( exec );
287 
288 	/* Free all thread-private caches, since they probably won't be valid
289 	 * for the next task this thread is given.
290 	 */
291 	vips_thread_shutdown();
292 }
293 
294 static int
get_num_processors(void)295 get_num_processors( void )
296 {
297 #if GLIB_CHECK_VERSION( 2, 48, 1 )
298 	/* We could use g_get_num_processors when GLib >= 2.48.1, see:
299 	 * https://gitlab.gnome.org/GNOME/glib/commit/999711abc82ea3a698d05977f9f91c0b73957f7f
300 	 * https://gitlab.gnome.org/GNOME/glib/commit/2149b29468bb99af3c29d5de61f75aad735082dc
301 	 */
302 	return( g_get_num_processors() );
303 #else
304 	int nproc;
305 
306 	nproc = 1;
307 
308 #ifdef G_OS_UNIX
309 
310 #if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN)
311 {
312 	/* POSIX style.
313 	 */
314 	int x;
315 
316 	x = sysconf( _SC_NPROCESSORS_ONLN );
317 	if( x > 0 )
318 		nproc = x;
319 }
320 #elif defined HW_NCPU
321 {
322 	/* BSD style.
323 	 */
324 	int x;
325 	size_t len = sizeof(x);
326 
327 	sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 );
328 	if( x > 0 )
329 		nproc = x;
330 }
331 #endif
332 
333 	/* libgomp has some very complex code on Linux to count the number of
334 	 * processors available to the current process taking pthread affinity
335 	 * into account, but we don't attempt that here. Perhaps we should?
336 	 */
337 
338 #endif /*G_OS_UNIX*/
339 
340 #ifdef G_OS_WIN32
341 {
342 	/* Count the CPUs currently available to this process.
343 	 */
344 	SYSTEM_INFO sysinfo;
345 	DWORD_PTR process_cpus;
346 	DWORD_PTR system_cpus;
347 
348 	/* This *never* fails, use it as fallback
349 	 */
350 	GetNativeSystemInfo( &sysinfo );
351 	nproc = (int) sysinfo.dwNumberOfProcessors;
352 
353 	if( GetProcessAffinityMask( GetCurrentProcess(),
354 		&process_cpus, &system_cpus ) ) {
355 		unsigned int af_count;
356 
357 		for( af_count = 0; process_cpus != 0; process_cpus >>= 1 )
358 			if( process_cpus & 1 )
359 				af_count++;
360 
361 		/* Prefer affinity-based result, if available
362 		 */
363 		if( af_count > 0 )
364 			nproc = af_count;
365 	}
366 }
367 #endif /*G_OS_WIN32*/
368 
369 	return( nproc );
370 #endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/
371 }
372 
373 /* The default concurrency, set by the environment variable VIPS_CONCURRENCY,
374  * or if that is not set, the number of threads available on the host machine.
375  */
376 static int
vips__concurrency_get_default(void)377 vips__concurrency_get_default( void )
378 {
379 	const char *str;
380 	int nthr;
381 	int x;
382 
383 	/* Tell the threads system how much concurrency we expect.
384 	 */
385 	if( vips__concurrency > 0 )
386 		nthr = vips__concurrency;
387 	else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
388 #if ENABLE_DEPRECATED
389 		|| (str = g_getenv( "IM_CONCURRENCY" ))
390 #endif
391 	) && (x = atoi( str )) > 0 )
392 		nthr = x;
393 	else
394 		nthr = get_num_processors();
395 
396 	if( nthr < 1 ||
397 		nthr > MAX_THREADS ) {
398 		nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
399 
400 		g_warning( _( "threads clipped to %d" ), nthr );
401 	}
402 
403 	return( nthr );
404 }
405 
406 /**
407  * vips_concurrency_set:
408  * @concurrency: number of threads to run
409  *
410  * Sets the number of worker threads that vips should use when running a
411  * #VipsThreadPool.
412  *
413  * The special value 0 means "default". In this case, the number of threads is
414  * set by the environment variable VIPS_CONCURRENCY, or if that is not set, the
415  * number of threads available on the host machine.
416  *
417  * See also: vips_concurrency_get().
418  */
419 void
vips_concurrency_set(int concurrency)420 vips_concurrency_set( int concurrency )
421 {
422 	/* Tell the threads system how much concurrency we expect.
423 	 */
424 	if( concurrency < 1 )
425 		concurrency = vips__concurrency_get_default();
426 	else if( concurrency > MAX_THREADS ) {
427 		concurrency = MAX_THREADS;
428 
429 		g_warning( _( "threads clipped to %d" ), MAX_THREADS );
430 	}
431 
432 	vips__concurrency = concurrency;
433 }
434 
435 /**
436  * vips_concurrency_get:
437  *
438  * Returns the number of worker threads that vips should use when running a
439  * #VipsThreadPool.
440  *
441  * vips gets this values from these sources in turn:
442  *
443  * If vips_concurrency_set() has been called, this value is used. The special
444  * value 0 means "default". You can also use the command-line argument
445  * "--vips-concurrency" to set this value.
446  *
447  * If vips_concurrency_set() has not been called and no command-line argument
448  * was used, vips uses the value of the environment variable VIPS_CONCURRENCY,
449  *
450  * If VIPS_CONCURRENCY has not been set, vips finds the number of hardware
451  * threads that the host machine can run in parallel and uses that value.
452  *
453  * The final value is clipped to the range 1 - 1024.
454  *
455  * See also: vips_concurrency_get().
456  *
457  * Returns: number of worker threads to use.
458  */
459 int
vips_concurrency_get(void)460 vips_concurrency_get( void )
461 {
462 	return( vips__concurrency );
463 }
464 
465 /* The VipsThreadStartFn arg to vips_threadpool_run() is called once for each
466  * thread to make one of these things to hold the thread state.
467  */
468 
469 G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
470 
471 static void
vips_thread_state_dispose(GObject * gobject)472 vips_thread_state_dispose( GObject *gobject )
473 {
474 	VipsThreadState *state = (VipsThreadState *) gobject;
475 
476 	VIPS_DEBUG_MSG( "vips_thread_state_dispose:\n" );
477 
478 	VIPS_UNREF( state->reg );
479 
480 	G_OBJECT_CLASS( vips_thread_state_parent_class )->dispose( gobject );
481 }
482 
483 static int
vips_thread_state_build(VipsObject * object)484 vips_thread_state_build( VipsObject *object )
485 {
486 	VipsThreadState *state = (VipsThreadState *) object;
487 
488 	if( !(state->reg = vips_region_new( state->im )) )
489 		return( -1 );
490 
491 	return( VIPS_OBJECT_CLASS(
492 		vips_thread_state_parent_class )->build( object ) );
493 }
494 
495 static void
vips_thread_state_class_init(VipsThreadStateClass * class)496 vips_thread_state_class_init( VipsThreadStateClass *class )
497 {
498 	GObjectClass *gobject_class = G_OBJECT_CLASS( class );
499 	VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
500 
501 	gobject_class->dispose = vips_thread_state_dispose;
502 
503 	object_class->build = vips_thread_state_build;
504 	object_class->nickname = "threadstate";
505 	object_class->description = _( "per-thread state for vipsthreadpool" );
506 }
507 
508 static void
vips_thread_state_init(VipsThreadState * state)509 vips_thread_state_init( VipsThreadState *state )
510 {
511 	VIPS_DEBUG_MSG( "vips_thread_state_init:\n" );
512 
513 	state->reg = NULL;
514 	state->stop = FALSE;
515 	state->stall = FALSE;
516 }
517 
518 void *
vips_thread_state_set(VipsObject * object,void * a,void * b)519 vips_thread_state_set( VipsObject *object, void *a, void *b )
520 {
521 	VipsThreadState *state = (VipsThreadState *) object;
522 	VipsImage *im = (VipsImage *) a;
523 
524 	VIPS_DEBUG_MSG( "vips_thread_state_set: image %p\n", im );
525 
526 	state->im = im;
527 	state->a = b;
528 
529 	return( NULL );
530 }
531 
532 VipsThreadState *
vips_thread_state_new(VipsImage * im,void * a)533 vips_thread_state_new( VipsImage *im, void *a )
534 {
535 	VIPS_DEBUG_MSG( "vips_thread_state_new: image %p\n", im );
536 
537 	return( VIPS_THREAD_STATE( vips_object_new(
538 		VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
539 }
540 
541 /* A VipsTask is the state of one call to vips_threadpool_run().
542  */
543 typedef struct _VipsTask {
544 	/* All private.
545 	 */
546 	/*< private >*/
547 	VipsImage *im;		/* Image we are calculating */
548 
549 	/* Start or reuse a thread, do a unit of work (runs in parallel)
550 	 * and allocate a unit of work (serial). Plus the mutex we use to
551 	 * serialize work allocation.
552 	 */
553 	VipsThreadStartFn start;
554 	VipsThreadpoolAllocateFn allocate;
555 	VipsThreadpoolWorkFn work;
556 	GMutex *allocate_lock;
557         void *a; 		/* User argument to start / allocate / etc. */
558 
559 	/* The caller blocks here until all tasks finish.
560 	 */
561 	VipsSemaphore finish;
562 
563 	/* Workers up this for every loop to make the main thread tick.
564 	 */
565 	VipsSemaphore tick;
566 
567 	/* Set this to abort evaluation early with an error.
568 	 */
569 	gboolean error;
570 
571 	/* Set by Allocate (via an arg) to indicate normal end of computation.
572 	 */
573 	gboolean stop;
574 } VipsTask;
575 
576 /* Allocate some work (single-threaded), then do it (many-threaded).
577  *
578  * The very first workunit is also executed single-threaded. This gives
579  * loaders a change to seek to the correct spot, see vips_sequential().
580  */
581 static void
vips_task_work_unit(VipsTask * task,VipsThreadState * state)582 vips_task_work_unit( VipsTask *task, VipsThreadState *state )
583 {
584 	if( task->error )
585 		return;
586 
587 	VIPS_GATE_START( "vips_task_work_unit: wait" );
588 
589 	g_mutex_lock( task->allocate_lock );
590 
591 	VIPS_GATE_STOP( "vips_task_work_unit: wait" );
592 
593 	/* Has another worker signaled stop while we've been waiting?
594 	 */
595 	if( task->stop ) {
596 		g_mutex_unlock( task->allocate_lock );
597 		return;
598 	}
599 
600 	if( task->allocate( state, task->a, &task->stop ) ) {
601 		task->error = TRUE;
602 		g_mutex_unlock( task->allocate_lock );
603 		return;
604 	}
605 
606 	/* Have we just signalled stop?
607 	 */
608 	if( task->stop ) {
609 		g_mutex_unlock( task->allocate_lock );
610 		return;
611 	}
612 
613 	g_mutex_unlock( task->allocate_lock );
614 
615 	if( state->stall &&
616 		vips__stall ) {
617 		/* Sleep for 0.5s. Handy for stressing the seq system. Stall
618 		 * is set by allocate funcs in various places.
619 		 */
620 		g_usleep( 500000 );
621 		state->stall = FALSE;
622 		printf( "vips_task_work_unit: "
623 			"stall done, releasing y = %d ...\n", state->y );
624 	}
625 
626 	/* Process a work unit.
627 	 */
628 	if( task->work( state, task->a ) )
629 		task->error = TRUE;
630 }
631 
632 /* What runs as a pipeline thread ... loop, waiting to be told to do stuff.
633  */
634 static void
vips_task_run(gpointer data,gpointer user_data)635 vips_task_run( gpointer data, gpointer user_data )
636 {
637 	VipsTask *task = (VipsTask *) data;
638 	VipsThreadState *state;
639 
640 	VIPS_GATE_START( "vips_task_run: thread" );
641 
642 	if( !(state = task->start( task->im, task->a )) )
643 		task->error = TRUE;
644 
645 	/* Process work units! Always tick, even if we are stopping, so the
646 	 * main thread will wake up for exit.
647 	 */
648 	for(;;) {
649 		VIPS_GATE_START( "vips_task_work_unit: u" );
650 		vips_task_work_unit( task, state );
651 		VIPS_GATE_STOP( "vips_task_work_unit: u" );
652 		vips_semaphore_up( &task->tick );
653 
654 		if( task->stop ||
655 			task->error )
656 			break;
657 	}
658 
659 	VIPS_FREEF( g_object_unref, state );
660 
661 	/* We are exiting: tell the main thread.
662 	 */
663 	vips_semaphore_up( &task->finish );
664 
665 	VIPS_GATE_STOP( "vips_task_run: thread" );
666 }
667 
668 /* Called from vips_shutdown().
669  */
670 void
vips__threadpool_shutdown(void)671 vips__threadpool_shutdown( void )
672 {
673 	/* We may come here without having inited.
674 	 */
675 	if( vips__pool ) {
676 		VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n",
677 			vips__pool );
678 
679 		g_thread_pool_free( vips__pool, TRUE, TRUE );
680 		vips__pool = NULL;
681 	}
682 }
683 
684 static VipsTask *
vips_task_new(VipsImage * im,int * n_tasks)685 vips_task_new( VipsImage *im, int *n_tasks )
686 {
687 	VipsTask *task;
688 	int tile_width;
689 	int tile_height;
690 	gint64 n_tiles;
691 	int n_lines;
692 
693 	if( !(task = VIPS_NEW( NULL, VipsTask )) )
694 		return( NULL );
695 	task->im = im;
696 	task->allocate = NULL;
697 	task->work = NULL;
698 	task->allocate_lock = vips_g_mutex_new();
699 	vips_semaphore_init( &task->finish, 0, "finish" );
700 	vips_semaphore_init( &task->tick, 0, "tick" );
701 	task->error = FALSE;
702 	task->stop = FALSE;
703 
704 	*n_tasks = vips_concurrency_get();
705 
706 	/* If this is a tiny image, we won't need all n_tasks. Guess how
707 	 * many tiles we might need to cover the image and use that to limit
708 	 * the number of tasks we create.
709 	 */
710 	vips_get_tile_size( im, &tile_width, &tile_height, &n_lines );
711 	n_tiles = (1 + (gint64) im->Xsize / tile_width) *
712 		(1 + (gint64) im->Ysize / tile_height);
713 	n_tiles = VIPS_MAX( 1, n_tiles );
714 	*n_tasks = VIPS_MIN( *n_tasks, n_tiles );
715 
716 	VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n",
717 		im->filename, task, *n_tasks );
718 
719 	return( task );
720 }
721 
722 static void
vips_task_free(VipsTask * task)723 vips_task_free( VipsTask *task )
724 {
725 	VIPS_DEBUG_MSG( "vips_task_free: \"%s\" (%p)\n",
726 		task->im->filename, task );
727 
728 	VIPS_FREEF( vips_g_mutex_free, task->allocate_lock );
729 	vips_semaphore_destroy( &task->finish );
730 	vips_semaphore_destroy( &task->tick );
731 	VIPS_FREE( task );
732 }
733 
734 static void *
vips__thread_once_init(void * data)735 vips__thread_once_init( void *data )
736 {
737 	/* We can have many more than vips__concurrency threads -- each active
738 	 * pipeline will make vips__concurrency more, see
739 	 * vips_threadpool_run().
740 	 */
741 	vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL,
742 		-1, FALSE, NULL );
743 
744 	return( NULL );
745 }
746 
747 /**
748  * vips__thread_execute:
749  * @name: a name for the thread
750  * @func: a function to execute in the thread pool
751  * @data: an argument to supply to @func
752  *
753  * A newly created or reused thread will execute @func with with the
754  * argument data.
755  *
756  * See also: vips_concurrency_set().
757  *
758  * Returns: 0 on success, -1 on error.
759  */
760 int
vips__thread_execute(const char * name,GFunc func,gpointer data)761 vips__thread_execute( const char *name, GFunc func, gpointer data )
762 {
763 	static GOnce once = G_ONCE_INIT;
764 
765 	VipsThreadExec *exec;
766 	GError *error = NULL;
767 	gboolean result;
768 
769 	VIPS_ONCE( &once, vips__thread_once_init, NULL );
770 
771 	exec = g_new( VipsThreadExec, 1 );
772 	exec->name = name;
773 	exec->func = func;
774 	exec->data = data;
775 
776 	result = g_thread_pool_push( vips__pool, exec, &error );
777 	if( error ) {
778 		vips_g_error( &error );
779 		return( -1 );
780 	}
781 
782 	VIPS_DEBUG_MSG( "vips__thread_execute: %u threads in pool\n",
783 		g_thread_pool_get_num_threads( vips__pool ) );
784 
785 	return( result ? 0 : -1 );
786 }
787 
788 /**
789  * VipsThreadpoolStartFn:
790  * @a: client data
791  * @b: client data
792  * @c: client data
793  *
794  * This function is called once by each worker just before the first time work
795  * is allocated to it to build the per-thread state. Per-thread state is used
796  * by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate.
797  *
798  * #VipsThreadState is a subclass of #VipsObject. Start functions can be
799  * executed concurrently.
800  *
801  * See also: vips_threadpool_run().
802  *
803  * Returns: a new #VipsThreadState object, or NULL on error
804  */
805 
806 /**
807  * VipsThreadpoolAllocateFn:
808  * @state: per-thread state
809  * @a: client data
810  * @b: client data
811  * @c: client data
812  * @stop: set this to signal end of computation
813  *
814  * This function is called to allocate a new work unit for the thread. It is
815  * always single-threaded, so it can modify per-pool state (such as a
816  * counter).
817  *
818  * @a, @b, @c are the values supplied to the call to
819  * vips_threadpool_run().
820  *
821  * It should set @stop to %TRUE to indicate that no work could be allocated
822  * because the job is done.
823  *
824  * See also: vips_threadpool_run().
825  *
826  * Returns: 0 on success, or -1 on error
827  */
828 
829 /**
830  * VipsThreadpoolWorkFn:
831  * @state: per-thread state
832  * @a: client data
833  * @b: client data
834  * @c: client data
835  *
836  * This function is called to process a work unit. Many copies of this can run
837  * at once, so it should not write to the per-pool state. It can write to
838  * per-thread state.
839  *
840  * @a, @b, @c are the values supplied to the call to
841  * vips_threadpool_run().
842  *
843  * See also: vips_threadpool_run().
844  *
845  * Returns: 0 on success, or -1 on error
846  */
847 
848 /**
849  * VipsThreadpoolProgressFn:
850  * @a: client data
851  * @b: client data
852  * @c: client data
853  *
854  * This function is called by the main thread once for every work unit
855  * processed. It can be used to give the user progress feedback.
856  *
857  * See also: vips_threadpool_run().
858  *
859  * Returns: 0 on success, or -1 on error
860  */
861 
862 /**
863  * vips_threadpool_run:
864  * @im: image to loop over
865  * @start: allocate per-thread state
866  * @allocate: allocate a work unit
867  * @work: process a work unit
868  * @progress: give progress feedback about a work unit, or %NULL
869  * @a: client data
870  *
871  * This function runs a set of threads over an image. It will use a newly
872  * created or reused thread within the #VipsThreadPool. Each thread first calls
873  * @start to create new per-thread state, then runs
874  * @allocate to set up a new work unit (perhaps the next tile in an image, for
875  * example), then @work to process that work unit. After each unit is
876  * processed, @progress is called, so that the operation can give
877  * progress feedback. @progress may be %NULL.
878  *
879  * The object returned by @start must be an instance of a subclass of
880  * #VipsThreadState. Use this to communicate between @allocate and @work.
881  *
882  * @allocate is always single-threaded (so it can write to the
883  * per-pool state), whereas @start and @work can be executed concurrently.
884  * @progress is always called by
885  * the main thread (ie. the thread which called vips_threadpool_run()).
886  *
887  * See also: vips_concurrency_set().
888  *
889  * Returns: 0 on success, or -1 on error.
890  */
891 int
vips_threadpool_run(VipsImage * im,VipsThreadStartFn start,VipsThreadpoolAllocateFn allocate,VipsThreadpoolWorkFn work,VipsThreadpoolProgressFn progress,void * a)892 vips_threadpool_run( VipsImage *im,
893 	VipsThreadStartFn start,
894 	VipsThreadpoolAllocateFn allocate,
895 	VipsThreadpoolWorkFn work,
896 	VipsThreadpoolProgressFn progress,
897 	void *a )
898 {
899 	VipsTask *task;
900 	int n_tasks;
901 	int i;
902 	int result;
903 
904 	if( !(task = vips_task_new( im, &n_tasks )) )
905 		return( -1 );
906 
907 	task->start = start;
908 	task->allocate = allocate;
909 	task->work = work;
910 	task->a = a;
911 
912 	/* Create a set of workers for this pipeline.
913 	 */
914 	for( i = 0; i < n_tasks; i++ )
915 		if( vips__thread_execute( "worker", vips_task_run, task ) )
916 			return( -1 );
917 
918 	for(;;) {
919 		/* Wait for a tick from a worker.
920 		 */
921 		vips_semaphore_down( &task->tick );
922 
923 		VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" );
924 
925 		if( task->stop ||
926 			task->error )
927 			break;
928 
929 		if( progress &&
930 			progress( task->a ) )
931 			task->error = TRUE;
932 
933 		if( task->stop ||
934 			task->error )
935 			break;
936 	}
937 
938 	/* Wait for them all to hit finish.
939 	 */
940 	vips_semaphore_downn( &task->finish, n_tasks );
941 
942 	/* Return 0 for success.
943 	 */
944 	result = task->error ? -1 : 0;
945 
946 	vips_task_free( task );
947 
948 	vips_image_minimise_all( im );
949 
950 	return( result );
951 }
952 
953 /* Create the vips threadpool. This is called during vips_init.
954  */
955 void
vips__threadpool_init(void)956 vips__threadpool_init( void )
957 {
958 	static GPrivate private = { 0 };
959 
960 	is_worker_key = &private;
961 
962 	if( g_getenv( "VIPS_STALL" ) )
963 		vips__stall = TRUE;
964 
965 	if( vips__concurrency == 0 )
966 		vips__concurrency = vips__concurrency_get_default();
967 
968 	/* The threadpool is built in the first vips__thread_execute()
969 	 * call, since we want thread creation to happen as late as possible.
970 	 *
971 	 * Many web platforms start up in a base environment, then fork() for
972 	 * each request. We must not make the threadpool before the fork.
973 	 */
974 
975 	VIPS_DEBUG_MSG( "vips__threadpool_init: (%p)\n", vips__pool );
976 }
977 
978 /**
979  * vips_get_tile_size: (method)
980  * @im: image to guess for
981  * @tile_width: (out): return selected tile width
982  * @tile_height: (out): return selected tile height
983  * @n_lines: (out): return buffer height in scanlines
984  *
985  * Pick a tile size and a buffer height for this image and the current
986  * value of vips_concurrency_get(). The buffer height
987  * will always be a multiple of tile_height.
988  *
989  * The buffer height is the height of each buffer we fill in sink disc. Since
990  * we have two buffers, the largest range of input locality is twice the output
991  * buffer size, plus whatever margin we add for things like convolution.
992  */
993 void
vips_get_tile_size(VipsImage * im,int * tile_width,int * tile_height,int * n_lines)994 vips_get_tile_size( VipsImage *im,
995 	int *tile_width, int *tile_height, int *n_lines )
996 {
997 	const int nthr = vips_concurrency_get();
998 	const int typical_image_width = 1000;
999 
1000 	/* Compiler warnings.
1001 	 */
1002 	*tile_width = 1;
1003 	*tile_height = 1;
1004 
1005 	/* Pick a render geometry.
1006 	 */
1007 	switch( im->dhint ) {
1008 	case VIPS_DEMAND_STYLE_SMALLTILE:
1009 		*tile_width = vips__tile_width;
1010 		*tile_height = vips__tile_height;
1011 		break;
1012 
1013 	case VIPS_DEMAND_STYLE_ANY:
1014 	case VIPS_DEMAND_STYLE_FATSTRIP:
1015 		*tile_width = im->Xsize;
1016 		*tile_height = vips__fatstrip_height;
1017 		break;
1018 
1019 	case VIPS_DEMAND_STYLE_THINSTRIP:
1020 		*tile_width = im->Xsize;
1021 		*tile_height = vips__thinstrip_height;
1022 		break;
1023 
1024 	default:
1025 		g_assert_not_reached();
1026 	}
1027 
1028 	/* We can't set n_lines for the current demand style: a later bit of
1029 	 * the pipeline might see a different hint and we need to synchronise
1030 	 * buffer sizes everywhere.
1031 	 *
1032 	 * We also can't depend on the current image size, since that might
1033 	 * change down the pipeline too. Pick a typical image width.
1034 	 *
1035 	 * Pick the maximum buffer size we might possibly need, then round up
1036 	 * to a multiple of tileheight.
1037 	 */
1038 	*n_lines = vips__tile_height *
1039 		VIPS_ROUND_UP( vips__tile_width * nthr, typical_image_width ) /
1040 			typical_image_width;
1041 	*n_lines = VIPS_MAX( *n_lines, vips__fatstrip_height * nthr );
1042 	*n_lines = VIPS_MAX( *n_lines, vips__thinstrip_height * nthr );
1043 	*n_lines = VIPS_ROUND_UP( *n_lines, *tile_height );
1044 
1045 	/* We make this assumption in several places.
1046 	 */
1047 	g_assert( *n_lines % *tile_height == 0 );
1048 
1049 	VIPS_DEBUG_MSG( "vips_get_tile_size: %d by %d patches, "
1050 		"groups of %d scanlines\n",
1051 		*tile_width, *tile_height, *n_lines );
1052 }
1053