1 /* A sink that's not attached to anything, eg. find image average,
2 *
3 * 28/3/10
4 * - from im_iterate(), reworked for threadpool
5 */
6
7 /*
8
9 This file is part of VIPS.
10
11 VIPS is free software; you can redistribute it and/or modify
12 it under the terms of the GNU Lesser General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
20
21 You should have received a copy of the GNU Lesser General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301 USA
25
26 */
27
28 /*
29
30 These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
31
32 */
33
34 /*
35 #define VIPS_DEBUG
36 */
37
38 #ifdef HAVE_CONFIG_H
39 #include <config.h>
40 #endif /*HAVE_CONFIG_H*/
41 #include <vips/intl.h>
42
43 #include <stdio.h>
44 #include <stdlib.h>
45
46 #include <vips/vips.h>
47 #include <vips/thread.h>
48 #include <vips/internal.h>
49 #include <vips/debug.h>
50
51 #include "sink.h"
52
53 /* A part of the image we are scanning.
54 *
55 * We can't let any threads fall too far behind as that would mess up seq
56 * image sources. Keep track of two areas moving down the image, and stall if
57 * the previous area still has active threads.
58 */
59 typedef struct _SinkArea {
60 struct _Sink *sink;
61
62 VipsRect rect; /* Part of image this area covers */
63 VipsSemaphore n_thread; /* Number of threads scanning this area */
64 } SinkArea;
65
66 /* Per-call state.
67 */
68 typedef struct _Sink {
69 SinkBase sink_base;
70
71 /* We need a temp "p" image between the source image and us to
72 * make sure we can't damage the original.
73 */
74 VipsImage *t;
75
76 /* Mutex for serialising calls to VipsStartFn and VipsStopFn.
77 */
78 GMutex *sslock;
79
80 /* Call params.
81 */
82 VipsStartFn start_fn;
83 VipsGenerateFn generate_fn;
84 VipsStopFn stop_fn;
85 void *a;
86 void *b;
87
88 /* We are current scanning area, we'll delay starting a new
89 * area if old_area (the previous position) hasn't completed.
90 */
91 SinkArea *area;
92 SinkArea *old_area;
93
94 } Sink;
95
96 /* Our per-thread state.
97 */
98 typedef struct _SinkThreadState {
99 VipsThreadState parent_object;
100
101 /* Sequence value for this thread.
102 */
103 void *seq;
104
105 /* The region we walk over sink.t copy. We can't use
106 * parent_object.reg, it's defined on the outer image.
107 */
108 VipsRegion *reg;
109
110 /* The area we were allocated from.
111 */
112 SinkArea *area;
113
114 } SinkThreadState;
115
116 typedef struct _SinkThreadStateClass {
117 VipsThreadStateClass parent_class;
118
119 } SinkThreadStateClass;
120
121 G_DEFINE_TYPE( SinkThreadState, sink_thread_state, VIPS_TYPE_THREAD_STATE );
122
123 static void
sink_area_free(SinkArea * area)124 sink_area_free( SinkArea *area )
125 {
126 vips_semaphore_destroy( &area->n_thread );
127 g_free( area );
128 }
129
130 static SinkArea *
sink_area_new(Sink * sink)131 sink_area_new( Sink *sink )
132 {
133 SinkArea *area;
134
135 if( !(area = VIPS_NEW( NULL, SinkArea )) )
136 return( NULL );
137 area->sink = sink;
138 vips_semaphore_init( &area->n_thread, 0, "n_thread" );
139
140 return( area );
141 }
142
143 /* Move an area to a position.
144 */
145 static void
sink_area_position(SinkArea * area,int top,int height)146 sink_area_position( SinkArea *area, int top, int height )
147 {
148 Sink *sink = area->sink;
149
150 VipsRect all, rect;
151
152 all.left = 0;
153 all.top = 0;
154 all.width = sink->sink_base.im->Xsize;
155 all.height = sink->sink_base.im->Ysize;
156
157 rect.left = 0;
158 rect.top = top;
159 rect.width = sink->sink_base.im->Xsize;
160 rect.height = height;
161
162 vips_rect_intersectrect( &all, &rect, &area->rect );
163 }
164
165 /* Our VipsThreadpoolAllocate function ... move the thread to the next tile
166 * that needs doing. If we fill the current area, we block until the previous
167 * area is finished, then swap areas.
168 *
169 * If all tiles are done, we return FALSE to end iteration.
170 */
171 static gboolean
sink_area_allocate_fn(VipsThreadState * state,void * a,gboolean * stop)172 sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
173 {
174 SinkThreadState *sstate = (SinkThreadState *) state;
175 Sink *sink = (Sink *) a;
176 SinkBase *sink_base = (SinkBase *) sink;
177
178 VipsRect image;
179 VipsRect tile;
180
181 VIPS_DEBUG_MSG( "sink_area_allocate_fn: %p\n", g_thread_self() );
182
183 /* Is the state x/y OK? New line or maybe new buffer or maybe even
184 * all done.
185 */
186 if( sink_base->x >= sink->area->rect.width ) {
187 sink_base->x = 0;
188 sink_base->y += sink_base->tile_height;
189
190 if( sink_base->y >= VIPS_RECT_BOTTOM( &sink->area->rect ) ) {
191 /* Block until the previous area is done.
192 */
193 if( sink->area->rect.top > 0 )
194 vips_semaphore_downn(
195 &sink->old_area->n_thread, 0 );
196
197 /* End of image?
198 */
199 if( sink_base->y >= sink_base->im->Ysize ) {
200 *stop = TRUE;
201 return( 0 );
202 }
203
204 /* Swap buffers.
205 */
206 VIPS_SWAP( SinkArea *,
207 sink->area, sink->old_area );
208
209 /* Position buf at the new y.
210 */
211 sink_area_position( sink->area,
212 sink_base->y, sink_base->n_lines );
213 }
214 }
215
216 /* x, y and buf are good: save params for thread.
217 */
218 image.left = 0;
219 image.top = 0;
220 image.width = sink_base->im->Xsize;
221 image.height = sink_base->im->Ysize;
222 tile.left = sink_base->x;
223 tile.top = sink_base->y;
224 tile.width = sink_base->tile_width;
225 tile.height = sink_base->tile_height;
226 vips_rect_intersectrect( &image, &tile, &state->pos );
227
228 /* The thread needs to know which area it's writing to.
229 */
230 sstate->area = sink->area;
231
232 VIPS_DEBUG_MSG( " %p allocated %d x %d:\n",
233 g_thread_self(), state->pos.left, state->pos.top );
234
235 /* Add to the number of writers on the area.
236 */
237 vips_semaphore_upn( &sink->area->n_thread, -1 );
238
239 /* Move state on.
240 */
241 sink_base->x += sink_base->tile_width;
242
243 /* Add the number of pixels we've just allocated to progress.
244 */
245 sink_base->processed += state->pos.width * state->pos.height;
246
247 return( 0 );
248 }
249
250 /* Call a thread's stop function.
251 */
252 static int
sink_call_stop(Sink * sink,SinkThreadState * state)253 sink_call_stop( Sink *sink, SinkThreadState *state )
254 {
255 if( state->seq && sink->stop_fn ) {
256 int result;
257
258 VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state );
259
260 VIPS_GATE_START( "sink_call_stop: wait" );
261
262 g_mutex_lock( sink->sslock );
263
264 VIPS_GATE_STOP( "sink_call_stop: wait" );
265
266 result = sink->stop_fn( state->seq, sink->a, sink->b );
267
268 g_mutex_unlock( sink->sslock );
269
270 if( result ) {
271 SinkBase *sink_base = (SinkBase *) sink;
272
273 vips_error( "vips_sink",
274 _( "stop function failed for image \"%s\"" ),
275 sink_base->im->filename );
276 return( -1 );
277 }
278
279 state->seq = NULL;
280 }
281
282 return( 0 );
283 }
284
285 static void
sink_thread_state_dispose(GObject * gobject)286 sink_thread_state_dispose( GObject *gobject )
287 {
288 SinkThreadState *state = (SinkThreadState *) gobject;
289 Sink *sink = (Sink *) ((VipsThreadState *) state)->a;
290
291 sink_call_stop( sink, state );
292 VIPS_UNREF( state->reg );
293
294 G_OBJECT_CLASS( sink_thread_state_parent_class )->dispose( gobject );
295 }
296
297 /* Call the start function for this thread, if necessary.
298 */
299 static int
sink_call_start(Sink * sink,SinkThreadState * state)300 sink_call_start( Sink *sink, SinkThreadState *state )
301 {
302 if( !state->seq && sink->start_fn ) {
303 VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state );
304
305 VIPS_GATE_START( "sink_call_start: wait" );
306
307 g_mutex_lock( sink->sslock );
308
309 VIPS_GATE_STOP( "sink_call_start: wait" );
310
311 state->seq = sink->start_fn( sink->t, sink->a, sink->b );
312
313 g_mutex_unlock( sink->sslock );
314
315 if( !state->seq ) {
316 SinkBase *sink_base = (SinkBase *) sink;
317
318 vips_error( "vips_sink",
319 _( "start function failed for image \"%s\"" ),
320 sink_base->im->filename );
321 return( -1 );
322 }
323 }
324
325 return( 0 );
326 }
327
328 static int
sink_thread_state_build(VipsObject * object)329 sink_thread_state_build( VipsObject *object )
330 {
331 SinkThreadState *state = (SinkThreadState *) object;
332 Sink *sink = (Sink *) ((VipsThreadState *) state)->a;
333
334 if( !(state->reg = vips_region_new( sink->t )) ||
335 sink_call_start( sink, state ) )
336 return( -1 );
337
338 return( VIPS_OBJECT_CLASS(
339 sink_thread_state_parent_class )->build( object ) );
340 }
341
342 static void
sink_thread_state_class_init(SinkThreadStateClass * class)343 sink_thread_state_class_init( SinkThreadStateClass *class )
344 {
345 GObjectClass *gobject_class = G_OBJECT_CLASS( class );
346 VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
347
348 gobject_class->dispose = sink_thread_state_dispose;
349
350 object_class->build = sink_thread_state_build;
351 object_class->nickname = "sinkthreadstate";
352 object_class->description = _( "per-thread state for sink" );
353 }
354
355 static void
sink_thread_state_init(SinkThreadState * state)356 sink_thread_state_init( SinkThreadState *state )
357 {
358 state->seq = NULL;
359 state->reg = NULL;
360 }
361
362 VipsThreadState *
vips_sink_thread_state_new(VipsImage * im,void * a)363 vips_sink_thread_state_new( VipsImage *im, void *a )
364 {
365 return( VIPS_THREAD_STATE( vips_object_new(
366 sink_thread_state_get_type(),
367 vips_thread_state_set, im, a ) ) );
368 }
369
370 static void
sink_free(Sink * sink)371 sink_free( Sink *sink )
372 {
373 VIPS_FREEF( vips_g_mutex_free, sink->sslock );
374 VIPS_FREEF( sink_area_free, sink->area );
375 VIPS_FREEF( sink_area_free, sink->old_area );
376 VIPS_FREEF( g_object_unref, sink->t );
377 }
378
379 void
vips_sink_base_init(SinkBase * sink_base,VipsImage * image)380 vips_sink_base_init( SinkBase *sink_base, VipsImage *image )
381 {
382 /* Always clear kill before we start looping. See the
383 * call to vips_image_iskilled() below.
384 */
385 vips_image_set_kill( image, FALSE );
386
387 sink_base->im = image;
388 sink_base->x = 0;
389 sink_base->y = 0;
390
391 vips_get_tile_size( image,
392 &sink_base->tile_width, &sink_base->tile_height,
393 &sink_base->n_lines );
394
395 sink_base->processed = 0;
396 }
397
398 static int
sink_init(Sink * sink,VipsImage * image,VipsStartFn start_fn,VipsGenerateFn generate_fn,VipsStopFn stop_fn,void * a,void * b)399 sink_init( Sink *sink,
400 VipsImage *image,
401 VipsStartFn start_fn, VipsGenerateFn generate_fn, VipsStopFn stop_fn,
402 void *a, void *b )
403 {
404 g_assert( generate_fn );
405
406 vips_sink_base_init( &sink->sink_base, image );
407
408 sink->t = NULL;
409 sink->sslock = vips_g_mutex_new();
410 sink->start_fn = start_fn;
411 sink->generate_fn = generate_fn;
412 sink->stop_fn = stop_fn;
413 sink->a = a;
414 sink->b = b;
415
416 sink->area = NULL;
417 sink->old_area = NULL;
418
419 if( !(sink->t = vips_image_new()) ||
420 !(sink->area = sink_area_new( sink )) ||
421 !(sink->old_area = sink_area_new( sink )) ||
422 vips_image_write( sink->sink_base.im, sink->t ) ) {
423 sink_free( sink );
424 return( -1 );
425 }
426
427 return( 0 );
428 }
429
430 static int
sink_work(VipsThreadState * state,void * a)431 sink_work( VipsThreadState *state, void *a )
432 {
433 SinkThreadState *sstate = (SinkThreadState *) state;
434 Sink *sink = (Sink *) a;
435 SinkArea *area = sstate->area;
436
437 int result;
438
439 result = vips_region_prepare( sstate->reg, &state->pos );
440 if( !result )
441 result = sink->generate_fn( sstate->reg, sstate->seq,
442 sink->a, sink->b, &state->stop );
443
444 /* Tell the allocator we're done.
445 */
446 vips_semaphore_upn( &area->n_thread, 1 );
447
448 return( result );
449 }
450
451 int
vips_sink_base_progress(void * a)452 vips_sink_base_progress( void *a )
453 {
454 SinkBase *sink_base = (SinkBase *) a;
455
456 VIPS_DEBUG_MSG( "vips_sink_base_progress:\n" );
457
458 /* Trigger any eval callbacks on our source image and
459 * check for errors.
460 */
461 vips_image_eval( sink_base->im, sink_base->processed );
462 if( vips_image_iskilled( sink_base->im ) )
463 return( -1 );
464
465 return( 0 );
466 }
467
468 /**
469 * vips_sink_tile: (method)
470 * @im: scan over this image
471 * @tile_width: tile width
472 * @tile_height: tile height
473 * @start_fn: start sequences with this function
474 * @generate_fn: generate pixels with this function
475 * @stop_fn: stop sequences with this function
476 * @a: user data
477 * @b: user data
478 *
479 * Loops over an image. @generate_fn is called for every
480 * pixel in the image, with
481 * the @reg argument being a region of calculated pixels.
482 *
483 * Each set of pixels is @tile_width by @tile_height pixels (less at the
484 * image edges). This is handy for things like writing a tiled TIFF image,
485 * where tiles have to be generated with a certain size.
486 *
487 * See also: vips_sink(), vips_get_tile_size().
488 *
489 * Returns: 0 on success, or -1 on error.
490 */
491 int
vips_sink_tile(VipsImage * im,int tile_width,int tile_height,VipsStartFn start_fn,VipsGenerateFn generate_fn,VipsStopFn stop_fn,void * a,void * b)492 vips_sink_tile( VipsImage *im,
493 int tile_width, int tile_height,
494 VipsStartFn start_fn, VipsGenerateFn generate_fn, VipsStopFn stop_fn,
495 void *a, void *b )
496 {
497 Sink sink;
498 int result;
499
500 g_assert( vips_object_sanity( VIPS_OBJECT( im ) ) );
501
502 /* We don't use this, but make sure it's set in case any old binaries
503 * are expecting it.
504 */
505 im->Bbits = vips_format_sizeof( im->BandFmt ) << 3;
506
507 if( sink_init( &sink, im, start_fn, generate_fn, stop_fn, a, b ) )
508 return( -1 );
509
510 if( tile_width > 0 ) {
511 sink.sink_base.tile_width = tile_width;
512 sink.sink_base.tile_height = tile_height;
513 }
514
515 /* vips_sink_base_progress() signals progress on im, so we have to do
516 * pre/post on that too.
517 */
518 vips_image_preeval( im );
519
520 sink_area_position( sink.area, 0, sink.sink_base.n_lines );
521 result = vips_threadpool_run( im,
522 vips_sink_thread_state_new,
523 sink_area_allocate_fn,
524 sink_work,
525 vips_sink_base_progress,
526 &sink );
527
528 vips_image_posteval( im );
529
530 sink_free( &sink );
531
532 return( result );
533 }
534
535 /**
536 * vips_sink: (method)
537 * @im: scan over this image
538 * @start_fn: start sequences with this function
539 * @generate_fn: generate pixels with this function
540 * @stop_fn: stop sequences with this function
541 * @a: user data
542 * @b: user data
543 *
544 * Loops over an image. @generate_fn is called for every pixel in
545 * the image, with
546 * the @reg argument being a region of calculated pixels. vips_sink() is
547 * used to implement operations like vips_avg() which have no image output.
548 *
549 * Each set of pixels is sized according to the requirements of the image
550 * pipeline that generated @im.
551 *
552 * See also: vips_image_generate(), vips_image_new().
553 *
554 * Returns: 0 on success, or -1 on error.
555 */
556 int
vips_sink(VipsImage * im,VipsStartFn start_fn,VipsGenerateFn generate_fn,VipsStopFn stop_fn,void * a,void * b)557 vips_sink( VipsImage *im,
558 VipsStartFn start_fn, VipsGenerateFn generate_fn, VipsStopFn stop_fn,
559 void *a, void *b )
560 {
561 return( vips_sink_tile( im, -1, -1,
562 start_fn, generate_fn, stop_fn, a, b ) );
563 }
564