1 /* Support for thread pools ... like threadgroups, but lighter.
2 *
3 * 18/3/10
4 * - from threadgroup.c
5 * - distributed work allocation idea from Christian Blenia, thank you
6 * very much
7 * 21/3/10
8 * - progress feedback
9 * - only expose VipsThreadState
10 * 11/5/10
11 * - argh, stopping many threads could sometimes leave allocated work
12 * undone
13 * 17/7/10
14 * - set pool->error whenever we set thr->error, lets us catch allocate
15 * errors (thanks Tim)
16 * 25/7/14
17 * - limit nthr on tiny images
18 * 6/3/17
19 * - remove single-thread-first-request thing, new seq system makes it
20 * unnecessary
21 * 23/4/17
22 * - add ->stall
23 * - don't depend on image width when setting n_lines
24 * 27/2/19 jtorresfabra
25 * - free threadpool earlier
26 * 02/02/20 kleisauke
27 * - reuse threads by using GLib's threadpool
28 * - remove mutex lock for VipsThreadStartFn
29 */
30
31 /*
32
33 This file is part of VIPS.
34
35 VIPS is free software; you can redistribute it and/or modify
36 it under the terms of the GNU Lesser General Public License as published by
37 the Free Software Foundation; either version 2 of the License, or
38 (at your option) any later version.
39
40 This program is distributed in the hope that it will be useful,
41 but WITHOUT ANY WARRANTY; without even the implied warranty of
42 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
43 GNU Lesser General Public License for more details.
44
45 You should have received a copy of the GNU Lesser General Public License
46 along with this program; if not, write to the Free Software
47 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
48 02110-1301 USA
49
50 */
51
52 /*
53
54 These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
55
56 */
57
58 /*
59 #define VIPS_DEBUG
60 #define VIPS_DEBUG_RED
61 */
62
63 #ifdef HAVE_CONFIG_H
64 #include <config.h>
65 #endif /*HAVE_CONFIG_H*/
66 #include <vips/intl.h>
67
68 #include <stdio.h>
69 #include <stdlib.h>
70 #ifdef HAVE_UNISTD_H
71 #include <unistd.h>
72 #endif /*HAVE_UNISTD_H*/
73 #include <errno.h>
74
75 #include <vips/vips.h>
76 #include <vips/internal.h>
77 #include <vips/thread.h>
78 #include <vips/debug.h>
79
80 #ifdef G_OS_WIN32
81 #include <windows.h>
82 #endif /*G_OS_WIN32*/
83
84 /**
85 * SECTION: threadpool
86 * @short_description: pools of worker threads
87 * @stability: Stable
88 * @see_also: <link linkend="libvips-generate">generate</link>
89 * @include: vips/vips.h
90 * @title: VipsThreadpool
91 *
92 * A threadpool which allows reusing already started threads. Implementing
93 * this can be tedious and error-prone. Therefore we use the GLib
94 * provided threadpool for our convenience. An added advantage is, that
95 * the threads can be shared between the different subsystems, when they
96 * are using GLib.
97 *
98 * The threadpool is created during vips_init() and is destroyed by
99 * vips_shutdown().
100 *
101 * vips_threadpool_run() loops a set of threads over an image. Threads take it
102 * in turns to allocate units of work (a unit might be a tile in an image),
103 * then run in parallel to process those units. An optional progress function
104 * can be used to give feedback.
105 */
106
107 /* Maximum number of concurrent threads we allow. It prevents huge values of
108 * VIPS_CONCURRENCY killing the system.
109 */
110 #define MAX_THREADS (1024)
111
112 /* Default tile geometry ... can be set by vips_init().
113 */
114 int vips__tile_width = VIPS__TILE_WIDTH;
115 int vips__tile_height = VIPS__TILE_HEIGHT;
116 int vips__fatstrip_height = VIPS__FATSTRIP_HEIGHT;
117 int vips__thinstrip_height = VIPS__THINSTRIP_HEIGHT;
118
119 /* Default n threads ... 0 means get from environment.
120 */
121 int vips__concurrency = 0;
122
123 /* Set this GPrivate to indicate that this thread is a worker inside
124 * the vips threadpool.
125 */
126 static GPrivate *is_worker_key = NULL;
127
128 /* Set to stall threads for debugging.
129 */
130 static gboolean vips__stall = FALSE;
131
132 /* The thread pool we'll use.
133 */
134 static GThreadPool *vips__pool = NULL;
135
136 /* Glib 2.32 revised the thread API. We need some compat functions.
137 */
138
139 GMutex *
vips_g_mutex_new(void)140 vips_g_mutex_new( void )
141 {
142 GMutex *mutex;
143
144 mutex = g_new( GMutex, 1 );
145 g_mutex_init( mutex );
146
147 return( mutex );
148 }
149
150 void
vips_g_mutex_free(GMutex * mutex)151 vips_g_mutex_free( GMutex *mutex )
152 {
153 g_mutex_clear( mutex );
154 g_free( mutex );
155 }
156
157 GCond *
vips_g_cond_new(void)158 vips_g_cond_new( void )
159 {
160 GCond *cond;
161
162 cond = g_new( GCond, 1 );
163 g_cond_init( cond );
164
165 return( cond );
166 }
167
168 void
vips_g_cond_free(GCond * cond)169 vips_g_cond_free( GCond *cond )
170 {
171 g_cond_clear( cond );
172 g_free( cond );
173 }
174
175 /* TRUE if we are a vips worker thread. We sometimes manage resource allocation
176 * differently for vips workers since we can cheaply free stuff on thread
177 * termination.
178 */
179 gboolean
vips_thread_isworker(void)180 vips_thread_isworker( void )
181 {
182 return( g_private_get( is_worker_key ) != NULL );
183 }
184
185 typedef struct {
186 const char *domain;
187 GThreadFunc func;
188 gpointer data;
189 } VipsThreadInfo;
190
191 static void *
vips_thread_run(gpointer data)192 vips_thread_run( gpointer data )
193 {
194 VipsThreadInfo *info = (VipsThreadInfo *) data;
195
196 void *result;
197
198 /* Set this to something (anything) to tag this thread as a vips
199 * worker.
200 */
201 g_private_set( is_worker_key, data );
202
203 if( vips__thread_profile )
204 vips__thread_profile_attach( info->domain );
205
206 result = info->func( info->data );
207
208 g_free( info );
209
210 vips_thread_shutdown();
211
212 return( result );
213 }
214
215 GThread *
vips_g_thread_new(const char * domain,GThreadFunc func,gpointer data)216 vips_g_thread_new( const char *domain, GThreadFunc func, gpointer data )
217 {
218 GThread *thread;
219 VipsThreadInfo *info;
220 GError *error = NULL;
221
222 info = g_new( VipsThreadInfo, 1 );
223 info->domain = domain;
224 info->func = func;
225 info->data = data;
226
227 thread = g_thread_try_new( domain, vips_thread_run, info, &error );
228
229 VIPS_DEBUG_MSG_RED( "vips_g_thread_new: g_thread_create( %s ) = %p\n",
230 domain, thread );
231
232 if( !thread ) {
233 if( error )
234 vips_g_error( &error );
235 else
236 vips_error( domain,
237 "%s", _( "unable to create thread" ) );
238 }
239
240 return( thread );
241 }
242
243 void *
vips_g_thread_join(GThread * thread)244 vips_g_thread_join( GThread *thread )
245 {
246 void *result;
247
248 result = g_thread_join( thread );
249
250 VIPS_DEBUG_MSG_RED( "vips_g_thread_join: g_thread_join( %p )\n",
251 thread );
252
253 return( result );
254 }
255
256 typedef struct {
257 /* An name for this thread.
258 */
259 const char *name;
260
261 /* The function to execute within the #VipsThreadPool.
262 */
263 GFunc func;
264
265 /* User data that is handed over to func when it is called.
266 */
267 gpointer data;
268 } VipsThreadExec;
269
270 static void
vips_thread_main_loop(gpointer thread_data,gpointer pool_data)271 vips_thread_main_loop( gpointer thread_data, gpointer pool_data )
272 {
273 VipsThreadExec *exec = (VipsThreadExec *) thread_data;
274
275 /* Set this to something (anything) to tag this thread as a vips
276 * worker. No need to call g_private_replace as there is no
277 * GDestroyNotify handler associated with a worker.
278 */
279 g_private_set( is_worker_key, thread_data );
280
281 if( vips__thread_profile )
282 vips__thread_profile_attach( exec->name );
283
284 exec->func( exec->data, pool_data );
285
286 g_free( exec );
287
288 /* Free all thread-private caches, since they probably won't be valid
289 * for the next task this thread is given.
290 */
291 vips_thread_shutdown();
292 }
293
294 static int
get_num_processors(void)295 get_num_processors( void )
296 {
297 #if GLIB_CHECK_VERSION( 2, 48, 1 )
298 /* We could use g_get_num_processors when GLib >= 2.48.1, see:
299 * https://gitlab.gnome.org/GNOME/glib/commit/999711abc82ea3a698d05977f9f91c0b73957f7f
300 * https://gitlab.gnome.org/GNOME/glib/commit/2149b29468bb99af3c29d5de61f75aad735082dc
301 */
302 return( g_get_num_processors() );
303 #else
304 int nproc;
305
306 nproc = 1;
307
308 #ifdef G_OS_UNIX
309
310 #if defined(HAVE_UNISTD_H) && defined(_SC_NPROCESSORS_ONLN)
311 {
312 /* POSIX style.
313 */
314 int x;
315
316 x = sysconf( _SC_NPROCESSORS_ONLN );
317 if( x > 0 )
318 nproc = x;
319 }
320 #elif defined HW_NCPU
321 {
322 /* BSD style.
323 */
324 int x;
325 size_t len = sizeof(x);
326
327 sysctl( (int[2]) {CTL_HW, HW_NCPU}, 2, &x, &len, NULL, 0 );
328 if( x > 0 )
329 nproc = x;
330 }
331 #endif
332
333 /* libgomp has some very complex code on Linux to count the number of
334 * processors available to the current process taking pthread affinity
335 * into account, but we don't attempt that here. Perhaps we should?
336 */
337
338 #endif /*G_OS_UNIX*/
339
340 #ifdef G_OS_WIN32
341 {
342 /* Count the CPUs currently available to this process.
343 */
344 SYSTEM_INFO sysinfo;
345 DWORD_PTR process_cpus;
346 DWORD_PTR system_cpus;
347
348 /* This *never* fails, use it as fallback
349 */
350 GetNativeSystemInfo( &sysinfo );
351 nproc = (int) sysinfo.dwNumberOfProcessors;
352
353 if( GetProcessAffinityMask( GetCurrentProcess(),
354 &process_cpus, &system_cpus ) ) {
355 unsigned int af_count;
356
357 for( af_count = 0; process_cpus != 0; process_cpus >>= 1 )
358 if( process_cpus & 1 )
359 af_count++;
360
361 /* Prefer affinity-based result, if available
362 */
363 if( af_count > 0 )
364 nproc = af_count;
365 }
366 }
367 #endif /*G_OS_WIN32*/
368
369 return( nproc );
370 #endif /*!GLIB_CHECK_VERSION( 2, 48, 1 )*/
371 }
372
373 /* The default concurrency, set by the environment variable VIPS_CONCURRENCY,
374 * or if that is not set, the number of threads available on the host machine.
375 */
376 static int
vips__concurrency_get_default(void)377 vips__concurrency_get_default( void )
378 {
379 const char *str;
380 int nthr;
381 int x;
382
383 /* Tell the threads system how much concurrency we expect.
384 */
385 if( vips__concurrency > 0 )
386 nthr = vips__concurrency;
387 else if( ((str = g_getenv( "VIPS_CONCURRENCY" ))
388 #if ENABLE_DEPRECATED
389 || (str = g_getenv( "IM_CONCURRENCY" ))
390 #endif
391 ) && (x = atoi( str )) > 0 )
392 nthr = x;
393 else
394 nthr = get_num_processors();
395
396 if( nthr < 1 ||
397 nthr > MAX_THREADS ) {
398 nthr = VIPS_CLIP( 1, nthr, MAX_THREADS );
399
400 g_warning( _( "threads clipped to %d" ), nthr );
401 }
402
403 return( nthr );
404 }
405
406 /**
407 * vips_concurrency_set:
408 * @concurrency: number of threads to run
409 *
410 * Sets the number of worker threads that vips should use when running a
411 * #VipsThreadPool.
412 *
413 * The special value 0 means "default". In this case, the number of threads is
414 * set by the environment variable VIPS_CONCURRENCY, or if that is not set, the
415 * number of threads available on the host machine.
416 *
417 * See also: vips_concurrency_get().
418 */
419 void
vips_concurrency_set(int concurrency)420 vips_concurrency_set( int concurrency )
421 {
422 /* Tell the threads system how much concurrency we expect.
423 */
424 if( concurrency < 1 )
425 concurrency = vips__concurrency_get_default();
426 else if( concurrency > MAX_THREADS ) {
427 concurrency = MAX_THREADS;
428
429 g_warning( _( "threads clipped to %d" ), MAX_THREADS );
430 }
431
432 vips__concurrency = concurrency;
433 }
434
435 /**
436 * vips_concurrency_get:
437 *
438 * Returns the number of worker threads that vips should use when running a
439 * #VipsThreadPool.
440 *
441 * vips gets this values from these sources in turn:
442 *
443 * If vips_concurrency_set() has been called, this value is used. The special
444 * value 0 means "default". You can also use the command-line argument
445 * "--vips-concurrency" to set this value.
446 *
447 * If vips_concurrency_set() has not been called and no command-line argument
448 * was used, vips uses the value of the environment variable VIPS_CONCURRENCY,
449 *
450 * If VIPS_CONCURRENCY has not been set, vips finds the number of hardware
451 * threads that the host machine can run in parallel and uses that value.
452 *
453 * The final value is clipped to the range 1 - 1024.
454 *
455 * See also: vips_concurrency_get().
456 *
457 * Returns: number of worker threads to use.
458 */
459 int
vips_concurrency_get(void)460 vips_concurrency_get( void )
461 {
462 return( vips__concurrency );
463 }
464
465 /* The VipsThreadStartFn arg to vips_threadpool_run() is called once for each
466 * thread to make one of these things to hold the thread state.
467 */
468
469 G_DEFINE_TYPE( VipsThreadState, vips_thread_state, VIPS_TYPE_OBJECT );
470
471 static void
vips_thread_state_dispose(GObject * gobject)472 vips_thread_state_dispose( GObject *gobject )
473 {
474 VipsThreadState *state = (VipsThreadState *) gobject;
475
476 VIPS_DEBUG_MSG( "vips_thread_state_dispose:\n" );
477
478 VIPS_UNREF( state->reg );
479
480 G_OBJECT_CLASS( vips_thread_state_parent_class )->dispose( gobject );
481 }
482
483 static int
vips_thread_state_build(VipsObject * object)484 vips_thread_state_build( VipsObject *object )
485 {
486 VipsThreadState *state = (VipsThreadState *) object;
487
488 if( !(state->reg = vips_region_new( state->im )) )
489 return( -1 );
490
491 return( VIPS_OBJECT_CLASS(
492 vips_thread_state_parent_class )->build( object ) );
493 }
494
495 static void
vips_thread_state_class_init(VipsThreadStateClass * class)496 vips_thread_state_class_init( VipsThreadStateClass *class )
497 {
498 GObjectClass *gobject_class = G_OBJECT_CLASS( class );
499 VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
500
501 gobject_class->dispose = vips_thread_state_dispose;
502
503 object_class->build = vips_thread_state_build;
504 object_class->nickname = "threadstate";
505 object_class->description = _( "per-thread state for vipsthreadpool" );
506 }
507
508 static void
vips_thread_state_init(VipsThreadState * state)509 vips_thread_state_init( VipsThreadState *state )
510 {
511 VIPS_DEBUG_MSG( "vips_thread_state_init:\n" );
512
513 state->reg = NULL;
514 state->stop = FALSE;
515 state->stall = FALSE;
516 }
517
518 void *
vips_thread_state_set(VipsObject * object,void * a,void * b)519 vips_thread_state_set( VipsObject *object, void *a, void *b )
520 {
521 VipsThreadState *state = (VipsThreadState *) object;
522 VipsImage *im = (VipsImage *) a;
523
524 VIPS_DEBUG_MSG( "vips_thread_state_set: image %p\n", im );
525
526 state->im = im;
527 state->a = b;
528
529 return( NULL );
530 }
531
532 VipsThreadState *
vips_thread_state_new(VipsImage * im,void * a)533 vips_thread_state_new( VipsImage *im, void *a )
534 {
535 VIPS_DEBUG_MSG( "vips_thread_state_new: image %p\n", im );
536
537 return( VIPS_THREAD_STATE( vips_object_new(
538 VIPS_TYPE_THREAD_STATE, vips_thread_state_set, im, a ) ) );
539 }
540
541 /* A VipsTask is the state of one call to vips_threadpool_run().
542 */
543 typedef struct _VipsTask {
544 /* All private.
545 */
546 /*< private >*/
547 VipsImage *im; /* Image we are calculating */
548
549 /* Start or reuse a thread, do a unit of work (runs in parallel)
550 * and allocate a unit of work (serial). Plus the mutex we use to
551 * serialize work allocation.
552 */
553 VipsThreadStartFn start;
554 VipsThreadpoolAllocateFn allocate;
555 VipsThreadpoolWorkFn work;
556 GMutex *allocate_lock;
557 void *a; /* User argument to start / allocate / etc. */
558
559 /* The caller blocks here until all tasks finish.
560 */
561 VipsSemaphore finish;
562
563 /* Workers up this for every loop to make the main thread tick.
564 */
565 VipsSemaphore tick;
566
567 /* Set this to abort evaluation early with an error.
568 */
569 gboolean error;
570
571 /* Set by Allocate (via an arg) to indicate normal end of computation.
572 */
573 gboolean stop;
574 } VipsTask;
575
576 /* Allocate some work (single-threaded), then do it (many-threaded).
577 *
578 * The very first workunit is also executed single-threaded. This gives
579 * loaders a change to seek to the correct spot, see vips_sequential().
580 */
581 static void
vips_task_work_unit(VipsTask * task,VipsThreadState * state)582 vips_task_work_unit( VipsTask *task, VipsThreadState *state )
583 {
584 if( task->error )
585 return;
586
587 VIPS_GATE_START( "vips_task_work_unit: wait" );
588
589 g_mutex_lock( task->allocate_lock );
590
591 VIPS_GATE_STOP( "vips_task_work_unit: wait" );
592
593 /* Has another worker signaled stop while we've been waiting?
594 */
595 if( task->stop ) {
596 g_mutex_unlock( task->allocate_lock );
597 return;
598 }
599
600 if( task->allocate( state, task->a, &task->stop ) ) {
601 task->error = TRUE;
602 g_mutex_unlock( task->allocate_lock );
603 return;
604 }
605
606 /* Have we just signalled stop?
607 */
608 if( task->stop ) {
609 g_mutex_unlock( task->allocate_lock );
610 return;
611 }
612
613 g_mutex_unlock( task->allocate_lock );
614
615 if( state->stall &&
616 vips__stall ) {
617 /* Sleep for 0.5s. Handy for stressing the seq system. Stall
618 * is set by allocate funcs in various places.
619 */
620 g_usleep( 500000 );
621 state->stall = FALSE;
622 printf( "vips_task_work_unit: "
623 "stall done, releasing y = %d ...\n", state->y );
624 }
625
626 /* Process a work unit.
627 */
628 if( task->work( state, task->a ) )
629 task->error = TRUE;
630 }
631
632 /* What runs as a pipeline thread ... loop, waiting to be told to do stuff.
633 */
634 static void
vips_task_run(gpointer data,gpointer user_data)635 vips_task_run( gpointer data, gpointer user_data )
636 {
637 VipsTask *task = (VipsTask *) data;
638 VipsThreadState *state;
639
640 VIPS_GATE_START( "vips_task_run: thread" );
641
642 if( !(state = task->start( task->im, task->a )) )
643 task->error = TRUE;
644
645 /* Process work units! Always tick, even if we are stopping, so the
646 * main thread will wake up for exit.
647 */
648 for(;;) {
649 VIPS_GATE_START( "vips_task_work_unit: u" );
650 vips_task_work_unit( task, state );
651 VIPS_GATE_STOP( "vips_task_work_unit: u" );
652 vips_semaphore_up( &task->tick );
653
654 if( task->stop ||
655 task->error )
656 break;
657 }
658
659 VIPS_FREEF( g_object_unref, state );
660
661 /* We are exiting: tell the main thread.
662 */
663 vips_semaphore_up( &task->finish );
664
665 VIPS_GATE_STOP( "vips_task_run: thread" );
666 }
667
668 /* Called from vips_shutdown().
669 */
670 void
vips__threadpool_shutdown(void)671 vips__threadpool_shutdown( void )
672 {
673 /* We may come here without having inited.
674 */
675 if( vips__pool ) {
676 VIPS_DEBUG_MSG( "vips__threadpool_shutdown: (%p)\n",
677 vips__pool );
678
679 g_thread_pool_free( vips__pool, TRUE, TRUE );
680 vips__pool = NULL;
681 }
682 }
683
684 static VipsTask *
vips_task_new(VipsImage * im,int * n_tasks)685 vips_task_new( VipsImage *im, int *n_tasks )
686 {
687 VipsTask *task;
688 int tile_width;
689 int tile_height;
690 gint64 n_tiles;
691 int n_lines;
692
693 if( !(task = VIPS_NEW( NULL, VipsTask )) )
694 return( NULL );
695 task->im = im;
696 task->allocate = NULL;
697 task->work = NULL;
698 task->allocate_lock = vips_g_mutex_new();
699 vips_semaphore_init( &task->finish, 0, "finish" );
700 vips_semaphore_init( &task->tick, 0, "tick" );
701 task->error = FALSE;
702 task->stop = FALSE;
703
704 *n_tasks = vips_concurrency_get();
705
706 /* If this is a tiny image, we won't need all n_tasks. Guess how
707 * many tiles we might need to cover the image and use that to limit
708 * the number of tasks we create.
709 */
710 vips_get_tile_size( im, &tile_width, &tile_height, &n_lines );
711 n_tiles = (1 + (gint64) im->Xsize / tile_width) *
712 (1 + (gint64) im->Ysize / tile_height);
713 n_tiles = VIPS_MAX( 1, n_tiles );
714 *n_tasks = VIPS_MIN( *n_tasks, n_tiles );
715
716 VIPS_DEBUG_MSG( "vips_task_new: \"%s\" (%p), with %d tasks\n",
717 im->filename, task, *n_tasks );
718
719 return( task );
720 }
721
722 static void
vips_task_free(VipsTask * task)723 vips_task_free( VipsTask *task )
724 {
725 VIPS_DEBUG_MSG( "vips_task_free: \"%s\" (%p)\n",
726 task->im->filename, task );
727
728 VIPS_FREEF( vips_g_mutex_free, task->allocate_lock );
729 vips_semaphore_destroy( &task->finish );
730 vips_semaphore_destroy( &task->tick );
731 VIPS_FREE( task );
732 }
733
734 static void *
vips__thread_once_init(void * data)735 vips__thread_once_init( void *data )
736 {
737 /* We can have many more than vips__concurrency threads -- each active
738 * pipeline will make vips__concurrency more, see
739 * vips_threadpool_run().
740 */
741 vips__pool = g_thread_pool_new( vips_thread_main_loop, NULL,
742 -1, FALSE, NULL );
743
744 return( NULL );
745 }
746
747 /**
748 * vips__thread_execute:
749 * @name: a name for the thread
750 * @func: a function to execute in the thread pool
751 * @data: an argument to supply to @func
752 *
753 * A newly created or reused thread will execute @func with with the
754 * argument data.
755 *
756 * See also: vips_concurrency_set().
757 *
758 * Returns: 0 on success, -1 on error.
759 */
760 int
vips__thread_execute(const char * name,GFunc func,gpointer data)761 vips__thread_execute( const char *name, GFunc func, gpointer data )
762 {
763 static GOnce once = G_ONCE_INIT;
764
765 VipsThreadExec *exec;
766 GError *error = NULL;
767 gboolean result;
768
769 VIPS_ONCE( &once, vips__thread_once_init, NULL );
770
771 exec = g_new( VipsThreadExec, 1 );
772 exec->name = name;
773 exec->func = func;
774 exec->data = data;
775
776 result = g_thread_pool_push( vips__pool, exec, &error );
777 if( error ) {
778 vips_g_error( &error );
779 return( -1 );
780 }
781
782 VIPS_DEBUG_MSG( "vips__thread_execute: %u threads in pool\n",
783 g_thread_pool_get_num_threads( vips__pool ) );
784
785 return( result ? 0 : -1 );
786 }
787
788 /**
789 * VipsThreadpoolStartFn:
790 * @a: client data
791 * @b: client data
792 * @c: client data
793 *
794 * This function is called once by each worker just before the first time work
795 * is allocated to it to build the per-thread state. Per-thread state is used
796 * by #VipsThreadpoolAllocate and #VipsThreadpoolWork to communicate.
797 *
798 * #VipsThreadState is a subclass of #VipsObject. Start functions can be
799 * executed concurrently.
800 *
801 * See also: vips_threadpool_run().
802 *
803 * Returns: a new #VipsThreadState object, or NULL on error
804 */
805
806 /**
807 * VipsThreadpoolAllocateFn:
808 * @state: per-thread state
809 * @a: client data
810 * @b: client data
811 * @c: client data
812 * @stop: set this to signal end of computation
813 *
814 * This function is called to allocate a new work unit for the thread. It is
815 * always single-threaded, so it can modify per-pool state (such as a
816 * counter).
817 *
818 * @a, @b, @c are the values supplied to the call to
819 * vips_threadpool_run().
820 *
821 * It should set @stop to %TRUE to indicate that no work could be allocated
822 * because the job is done.
823 *
824 * See also: vips_threadpool_run().
825 *
826 * Returns: 0 on success, or -1 on error
827 */
828
829 /**
830 * VipsThreadpoolWorkFn:
831 * @state: per-thread state
832 * @a: client data
833 * @b: client data
834 * @c: client data
835 *
836 * This function is called to process a work unit. Many copies of this can run
837 * at once, so it should not write to the per-pool state. It can write to
838 * per-thread state.
839 *
840 * @a, @b, @c are the values supplied to the call to
841 * vips_threadpool_run().
842 *
843 * See also: vips_threadpool_run().
844 *
845 * Returns: 0 on success, or -1 on error
846 */
847
848 /**
849 * VipsThreadpoolProgressFn:
850 * @a: client data
851 * @b: client data
852 * @c: client data
853 *
854 * This function is called by the main thread once for every work unit
855 * processed. It can be used to give the user progress feedback.
856 *
857 * See also: vips_threadpool_run().
858 *
859 * Returns: 0 on success, or -1 on error
860 */
861
862 /**
863 * vips_threadpool_run:
864 * @im: image to loop over
865 * @start: allocate per-thread state
866 * @allocate: allocate a work unit
867 * @work: process a work unit
868 * @progress: give progress feedback about a work unit, or %NULL
869 * @a: client data
870 *
871 * This function runs a set of threads over an image. It will use a newly
872 * created or reused thread within the #VipsThreadPool. Each thread first calls
873 * @start to create new per-thread state, then runs
874 * @allocate to set up a new work unit (perhaps the next tile in an image, for
875 * example), then @work to process that work unit. After each unit is
876 * processed, @progress is called, so that the operation can give
877 * progress feedback. @progress may be %NULL.
878 *
879 * The object returned by @start must be an instance of a subclass of
880 * #VipsThreadState. Use this to communicate between @allocate and @work.
881 *
882 * @allocate is always single-threaded (so it can write to the
883 * per-pool state), whereas @start and @work can be executed concurrently.
884 * @progress is always called by
885 * the main thread (ie. the thread which called vips_threadpool_run()).
886 *
887 * See also: vips_concurrency_set().
888 *
889 * Returns: 0 on success, or -1 on error.
890 */
891 int
vips_threadpool_run(VipsImage * im,VipsThreadStartFn start,VipsThreadpoolAllocateFn allocate,VipsThreadpoolWorkFn work,VipsThreadpoolProgressFn progress,void * a)892 vips_threadpool_run( VipsImage *im,
893 VipsThreadStartFn start,
894 VipsThreadpoolAllocateFn allocate,
895 VipsThreadpoolWorkFn work,
896 VipsThreadpoolProgressFn progress,
897 void *a )
898 {
899 VipsTask *task;
900 int n_tasks;
901 int i;
902 int result;
903
904 if( !(task = vips_task_new( im, &n_tasks )) )
905 return( -1 );
906
907 task->start = start;
908 task->allocate = allocate;
909 task->work = work;
910 task->a = a;
911
912 /* Create a set of workers for this pipeline.
913 */
914 for( i = 0; i < n_tasks; i++ )
915 if( vips__thread_execute( "worker", vips_task_run, task ) )
916 return( -1 );
917
918 for(;;) {
919 /* Wait for a tick from a worker.
920 */
921 vips_semaphore_down( &task->tick );
922
923 VIPS_DEBUG_MSG( "vips_threadpool_run: tick\n" );
924
925 if( task->stop ||
926 task->error )
927 break;
928
929 if( progress &&
930 progress( task->a ) )
931 task->error = TRUE;
932
933 if( task->stop ||
934 task->error )
935 break;
936 }
937
938 /* Wait for them all to hit finish.
939 */
940 vips_semaphore_downn( &task->finish, n_tasks );
941
942 /* Return 0 for success.
943 */
944 result = task->error ? -1 : 0;
945
946 vips_task_free( task );
947
948 vips_image_minimise_all( im );
949
950 return( result );
951 }
952
953 /* Create the vips threadpool. This is called during vips_init.
954 */
955 void
vips__threadpool_init(void)956 vips__threadpool_init( void )
957 {
958 static GPrivate private = { 0 };
959
960 is_worker_key = &private;
961
962 if( g_getenv( "VIPS_STALL" ) )
963 vips__stall = TRUE;
964
965 if( vips__concurrency == 0 )
966 vips__concurrency = vips__concurrency_get_default();
967
968 /* The threadpool is built in the first vips__thread_execute()
969 * call, since we want thread creation to happen as late as possible.
970 *
971 * Many web platforms start up in a base environment, then fork() for
972 * each request. We must not make the threadpool before the fork.
973 */
974
975 VIPS_DEBUG_MSG( "vips__threadpool_init: (%p)\n", vips__pool );
976 }
977
978 /**
979 * vips_get_tile_size: (method)
980 * @im: image to guess for
981 * @tile_width: (out): return selected tile width
982 * @tile_height: (out): return selected tile height
983 * @n_lines: (out): return buffer height in scanlines
984 *
985 * Pick a tile size and a buffer height for this image and the current
986 * value of vips_concurrency_get(). The buffer height
987 * will always be a multiple of tile_height.
988 *
989 * The buffer height is the height of each buffer we fill in sink disc. Since
990 * we have two buffers, the largest range of input locality is twice the output
991 * buffer size, plus whatever margin we add for things like convolution.
992 */
993 void
vips_get_tile_size(VipsImage * im,int * tile_width,int * tile_height,int * n_lines)994 vips_get_tile_size( VipsImage *im,
995 int *tile_width, int *tile_height, int *n_lines )
996 {
997 const int nthr = vips_concurrency_get();
998 const int typical_image_width = 1000;
999
1000 /* Compiler warnings.
1001 */
1002 *tile_width = 1;
1003 *tile_height = 1;
1004
1005 /* Pick a render geometry.
1006 */
1007 switch( im->dhint ) {
1008 case VIPS_DEMAND_STYLE_SMALLTILE:
1009 *tile_width = vips__tile_width;
1010 *tile_height = vips__tile_height;
1011 break;
1012
1013 case VIPS_DEMAND_STYLE_ANY:
1014 case VIPS_DEMAND_STYLE_FATSTRIP:
1015 *tile_width = im->Xsize;
1016 *tile_height = vips__fatstrip_height;
1017 break;
1018
1019 case VIPS_DEMAND_STYLE_THINSTRIP:
1020 *tile_width = im->Xsize;
1021 *tile_height = vips__thinstrip_height;
1022 break;
1023
1024 default:
1025 g_assert_not_reached();
1026 }
1027
1028 /* We can't set n_lines for the current demand style: a later bit of
1029 * the pipeline might see a different hint and we need to synchronise
1030 * buffer sizes everywhere.
1031 *
1032 * We also can't depend on the current image size, since that might
1033 * change down the pipeline too. Pick a typical image width.
1034 *
1035 * Pick the maximum buffer size we might possibly need, then round up
1036 * to a multiple of tileheight.
1037 */
1038 *n_lines = vips__tile_height *
1039 VIPS_ROUND_UP( vips__tile_width * nthr, typical_image_width ) /
1040 typical_image_width;
1041 *n_lines = VIPS_MAX( *n_lines, vips__fatstrip_height * nthr );
1042 *n_lines = VIPS_MAX( *n_lines, vips__thinstrip_height * nthr );
1043 *n_lines = VIPS_ROUND_UP( *n_lines, *tile_height );
1044
1045 /* We make this assumption in several places.
1046 */
1047 g_assert( *n_lines % *tile_height == 0 );
1048
1049 VIPS_DEBUG_MSG( "vips_get_tile_size: %d by %d patches, "
1050 "groups of %d scanlines\n",
1051 *tile_width, *tile_height, *n_lines );
1052 }
1053