1 /* A sink that's not attached to anything, eg. find image average,
2  *
3  * 28/3/10
4  * 	- from im_iterate(), reworked for threadpool
5  */
6 
7 /*
8 
9     This file is part of VIPS.
10 
11     VIPS is free software; you can redistribute it and/or modify
12     it under the terms of the GNU Lesser General Public License as published by
13     the Free Software Foundation; either version 2 of the License, or
14     (at your option) any later version.
15 
16     This program is distributed in the hope that it will be useful,
17     but WITHOUT ANY WARRANTY; without even the implied warranty of
18     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19     GNU Lesser General Public License for more details.
20 
21     You should have received a copy of the GNU Lesser General Public License
22     along with this program; if not, write to the Free Software
23     Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24     02110-1301  USA
25 
26  */
27 
28 /*
29 
30     These files are distributed with VIPS - http://www.vips.ecs.soton.ac.uk
31 
32  */
33 
34 /*
35 #define VIPS_DEBUG
36  */
37 
38 #ifdef HAVE_CONFIG_H
39 #include <config.h>
40 #endif /*HAVE_CONFIG_H*/
41 #include <vips/intl.h>
42 
43 #include <stdio.h>
44 #include <stdlib.h>
45 
46 #include <vips/vips.h>
47 #include <vips/thread.h>
48 #include <vips/internal.h>
49 #include <vips/debug.h>
50 
51 #include "sink.h"
52 
53 /* A part of the image we are scanning.
54  *
55  * We can't let any threads fall too far behind as that would mess up seq
56  * image sources. Keep track of two areas moving down the image, and stall if
57  * the previous area still has active threads.
58  */
59 typedef struct _SinkArea {
60 	struct _Sink *sink;
61 
62 	VipsRect rect;		/* Part of image this area covers */
63         VipsSemaphore n_thread;	/* Number of threads scanning this area */
64 } SinkArea;
65 
66 /* Per-call state.
67  */
68 typedef struct _Sink {
69 	SinkBase sink_base;
70 
71 	/* We need a temp "p" image between the source image and us to
72 	 * make sure we can't damage the original.
73 	 */
74 	VipsImage *t;
75 
76 	/* Mutex for serialising calls to VipsStartFn and VipsStopFn.
77 	 */
78 	GMutex *sslock;
79 
80 	/* Call params.
81 	 */
82 	VipsStartFn start_fn;
83 	VipsGenerateFn generate_fn;
84 	VipsStopFn stop_fn;
85 	void *a;
86 	void *b;
87 
88 	/* We are current scanning area, we'll delay starting a new
89 	 * area if old_area (the previous position) hasn't completed.
90 	 */
91 	SinkArea *area;
92 	SinkArea *old_area;
93 
94 } Sink;
95 
96 /* Our per-thread state.
97  */
98 typedef struct _SinkThreadState {
99 	VipsThreadState parent_object;
100 
101 	/* Sequence value for this thread.
102 	 */
103         void *seq;
104 
105 	/* The region we walk over sink.t copy. We can't use
106 	 * parent_object.reg, it's defined on the outer image.
107 	 */
108 	VipsRegion *reg;
109 
110 	/* The area we were allocated from.
111 	 */
112         SinkArea *area;
113 
114 } SinkThreadState;
115 
116 typedef struct _SinkThreadStateClass {
117 	VipsThreadStateClass parent_class;
118 
119 } SinkThreadStateClass;
120 
121 G_DEFINE_TYPE( SinkThreadState, sink_thread_state, VIPS_TYPE_THREAD_STATE );
122 
123 static void
sink_area_free(SinkArea * area)124 sink_area_free( SinkArea *area )
125 {
126 	vips_semaphore_destroy( &area->n_thread );
127 	g_free( area );
128 }
129 
130 static SinkArea *
sink_area_new(Sink * sink)131 sink_area_new( Sink *sink )
132 {
133 	SinkArea *area;
134 
135 	if( !(area = VIPS_NEW( NULL, SinkArea )) )
136 		return( NULL );
137 	area->sink = sink;
138 	vips_semaphore_init( &area->n_thread, 0, "n_thread" );
139 
140 	return( area );
141 }
142 
143 /* Move an area to a position.
144  */
145 static void
sink_area_position(SinkArea * area,int top,int height)146 sink_area_position( SinkArea *area, int top, int height )
147 {
148 	Sink *sink = area->sink;
149 
150 	VipsRect all, rect;
151 
152 	all.left = 0;
153 	all.top = 0;
154 	all.width = sink->sink_base.im->Xsize;
155 	all.height = sink->sink_base.im->Ysize;
156 
157 	rect.left = 0;
158 	rect.top = top;
159 	rect.width = sink->sink_base.im->Xsize;
160 	rect.height = height;
161 
162 	vips_rect_intersectrect( &all, &rect, &area->rect );
163 }
164 
165 /* Our VipsThreadpoolAllocate function ... move the thread to the next tile
166  * that needs doing. If we fill the current area, we block until the previous
167  * area is finished, then swap areas.
168  *
169  * If all tiles are done, we return FALSE to end iteration.
170  */
171 static gboolean
sink_area_allocate_fn(VipsThreadState * state,void * a,gboolean * stop)172 sink_area_allocate_fn( VipsThreadState *state, void *a, gboolean *stop )
173 {
174 	SinkThreadState *sstate = (SinkThreadState *) state;
175 	Sink *sink = (Sink *) a;
176 	SinkBase *sink_base = (SinkBase *) sink;
177 
178 	VipsRect image;
179 	VipsRect tile;
180 
181 	VIPS_DEBUG_MSG( "sink_area_allocate_fn: %p\n", g_thread_self() );
182 
183 	/* Is the state x/y OK? New line or maybe new buffer or maybe even
184 	 * all done.
185 	 */
186 	if( sink_base->x >= sink->area->rect.width ) {
187 		sink_base->x = 0;
188 		sink_base->y += sink_base->tile_height;
189 
190 		if( sink_base->y >= VIPS_RECT_BOTTOM( &sink->area->rect ) ) {
191 			/* Block until the previous area is done.
192 			 */
193 			if( sink->area->rect.top > 0 )
194 				vips_semaphore_downn(
195 					&sink->old_area->n_thread, 0 );
196 
197 			/* End of image?
198 			 */
199 			if( sink_base->y >= sink_base->im->Ysize ) {
200 				*stop = TRUE;
201 				return( 0 );
202 			}
203 
204 			/* Swap buffers.
205 			 */
206 			VIPS_SWAP( SinkArea *,
207 				sink->area, sink->old_area );
208 
209 			/* Position buf at the new y.
210 			 */
211 			sink_area_position( sink->area,
212 				sink_base->y, sink_base->n_lines );
213 		}
214 	}
215 
216 	/* x, y and buf are good: save params for thread.
217 	 */
218 	image.left = 0;
219 	image.top = 0;
220 	image.width = sink_base->im->Xsize;
221 	image.height = sink_base->im->Ysize;
222 	tile.left = sink_base->x;
223 	tile.top = sink_base->y;
224 	tile.width = sink_base->tile_width;
225 	tile.height = sink_base->tile_height;
226 	vips_rect_intersectrect( &image, &tile, &state->pos );
227 
228 	/* The thread needs to know which area it's writing to.
229 	 */
230 	sstate->area = sink->area;
231 
232 	VIPS_DEBUG_MSG( "  %p allocated %d x %d:\n",
233 		g_thread_self(), state->pos.left, state->pos.top );
234 
235 	/* Add to the number of writers on the area.
236 	 */
237 	vips_semaphore_upn( &sink->area->n_thread, -1 );
238 
239 	/* Move state on.
240 	 */
241 	sink_base->x += sink_base->tile_width;
242 
243 	/* Add the number of pixels we've just allocated to progress.
244 	 */
245 	sink_base->processed += state->pos.width * state->pos.height;
246 
247 	return( 0 );
248 }
249 
250 /* Call a thread's stop function.
251  */
252 static int
sink_call_stop(Sink * sink,SinkThreadState * state)253 sink_call_stop( Sink *sink, SinkThreadState *state )
254 {
255 	if( state->seq && sink->stop_fn ) {
256 		int result;
257 
258 		VIPS_DEBUG_MSG( "sink_call_stop: state = %p\n", state );
259 
260 		VIPS_GATE_START( "sink_call_stop: wait" );
261 
262 		g_mutex_lock( sink->sslock );
263 
264 		VIPS_GATE_STOP( "sink_call_stop: wait" );
265 
266 		result = sink->stop_fn( state->seq, sink->a, sink->b );
267 
268 		g_mutex_unlock( sink->sslock );
269 
270 		if( result ) {
271 			SinkBase *sink_base = (SinkBase *) sink;
272 
273 			vips_error( "vips_sink",
274 				_( "stop function failed for image \"%s\"" ),
275 				sink_base->im->filename );
276 			return( -1 );
277 		}
278 
279 		state->seq = NULL;
280 	}
281 
282 	return( 0 );
283 }
284 
285 static void
sink_thread_state_dispose(GObject * gobject)286 sink_thread_state_dispose( GObject *gobject )
287 {
288 	SinkThreadState *state = (SinkThreadState *) gobject;
289 	Sink *sink = (Sink *) ((VipsThreadState *) state)->a;
290 
291 	sink_call_stop( sink, state );
292 	VIPS_UNREF( state->reg );
293 
294 	G_OBJECT_CLASS( sink_thread_state_parent_class )->dispose( gobject );
295 }
296 
297 /* Call the start function for this thread, if necessary.
298  */
299 static int
sink_call_start(Sink * sink,SinkThreadState * state)300 sink_call_start( Sink *sink, SinkThreadState *state )
301 {
302 	if( !state->seq && sink->start_fn ) {
303 		VIPS_DEBUG_MSG( "sink_call_start: state = %p\n", state );
304 
305 		VIPS_GATE_START( "sink_call_start: wait" );
306 
307 		g_mutex_lock( sink->sslock );
308 
309 		VIPS_GATE_STOP( "sink_call_start: wait" );
310 
311 		state->seq = sink->start_fn( sink->t, sink->a, sink->b );
312 
313 		g_mutex_unlock( sink->sslock );
314 
315 		if( !state->seq ) {
316 			SinkBase *sink_base = (SinkBase *) sink;
317 
318 			vips_error( "vips_sink",
319 				_( "start function failed for image \"%s\"" ),
320 				sink_base->im->filename );
321 			return( -1 );
322 		}
323 	}
324 
325 	return( 0 );
326 }
327 
328 static int
sink_thread_state_build(VipsObject * object)329 sink_thread_state_build( VipsObject *object )
330 {
331 	SinkThreadState *state = (SinkThreadState *) object;
332 	Sink *sink = (Sink *) ((VipsThreadState *) state)->a;
333 
334 	if( !(state->reg = vips_region_new( sink->t )) ||
335 		sink_call_start( sink, state ) )
336 		return( -1 );
337 
338 	return( VIPS_OBJECT_CLASS(
339 		sink_thread_state_parent_class )->build( object ) );
340 }
341 
342 static void
sink_thread_state_class_init(SinkThreadStateClass * class)343 sink_thread_state_class_init( SinkThreadStateClass *class )
344 {
345 	GObjectClass *gobject_class = G_OBJECT_CLASS( class );
346 	VipsObjectClass *object_class = VIPS_OBJECT_CLASS( class );
347 
348 	gobject_class->dispose = sink_thread_state_dispose;
349 
350 	object_class->build = sink_thread_state_build;
351 	object_class->nickname = "sinkthreadstate";
352 	object_class->description = _( "per-thread state for sink" );
353 }
354 
355 static void
sink_thread_state_init(SinkThreadState * state)356 sink_thread_state_init( SinkThreadState *state )
357 {
358 	state->seq = NULL;
359 	state->reg = NULL;
360 }
361 
362 VipsThreadState *
vips_sink_thread_state_new(VipsImage * im,void * a)363 vips_sink_thread_state_new( VipsImage *im, void *a )
364 {
365 	return( VIPS_THREAD_STATE( vips_object_new(
366 		sink_thread_state_get_type(),
367 		vips_thread_state_set, im, a ) ) );
368 }
369 
370 static void
sink_free(Sink * sink)371 sink_free( Sink *sink )
372 {
373 	VIPS_FREEF( vips_g_mutex_free, sink->sslock );
374 	VIPS_FREEF( sink_area_free, sink->area );
375 	VIPS_FREEF( sink_area_free, sink->old_area );
376 	VIPS_FREEF( g_object_unref, sink->t );
377 }
378 
379 void
vips_sink_base_init(SinkBase * sink_base,VipsImage * image)380 vips_sink_base_init( SinkBase *sink_base, VipsImage *image )
381 {
382 	/* Always clear kill before we start looping. See the
383 	 * call to vips_image_iskilled() below.
384 	 */
385 	vips_image_set_kill( image, FALSE );
386 
387 	sink_base->im = image;
388 	sink_base->x = 0;
389 	sink_base->y = 0;
390 
391 	vips_get_tile_size( image,
392 		&sink_base->tile_width, &sink_base->tile_height,
393 		&sink_base->n_lines );
394 
395 	sink_base->processed = 0;
396 }
397 
398 static int
sink_init(Sink * sink,VipsImage * image,VipsStartFn start_fn,VipsGenerateFn generate_fn,VipsStopFn stop_fn,void * a,void * b)399 sink_init( Sink *sink,
400 	VipsImage *image,
401 	VipsStartFn start_fn, VipsGenerateFn generate_fn, VipsStopFn stop_fn,
402 	void *a, void *b )
403 {
404 	g_assert( generate_fn );
405 
406 	vips_sink_base_init( &sink->sink_base, image );
407 
408 	sink->t = NULL;
409 	sink->sslock = vips_g_mutex_new();
410 	sink->start_fn = start_fn;
411 	sink->generate_fn = generate_fn;
412 	sink->stop_fn = stop_fn;
413 	sink->a = a;
414 	sink->b = b;
415 
416 	sink->area = NULL;
417 	sink->old_area = NULL;
418 
419 	if( !(sink->t = vips_image_new()) ||
420 		!(sink->area = sink_area_new( sink )) ||
421 		!(sink->old_area = sink_area_new( sink )) ||
422 		vips_image_write( sink->sink_base.im, sink->t ) ) {
423 		sink_free( sink );
424 		return( -1 );
425 	}
426 
427 	return( 0 );
428 }
429 
430 static int
sink_work(VipsThreadState * state,void * a)431 sink_work( VipsThreadState *state, void *a )
432 {
433 	SinkThreadState *sstate = (SinkThreadState *) state;
434 	Sink *sink = (Sink *) a;
435 	SinkArea *area = sstate->area;
436 
437 	int result;
438 
439 	result = vips_region_prepare( sstate->reg, &state->pos );
440 	if( !result )
441 		result = sink->generate_fn( sstate->reg, sstate->seq,
442 			sink->a, sink->b, &state->stop );
443 
444 	/* Tell the allocator we're done.
445 	 */
446 	vips_semaphore_upn( &area->n_thread, 1 );
447 
448 	return( result );
449 }
450 
451 int
vips_sink_base_progress(void * a)452 vips_sink_base_progress( void *a )
453 {
454 	SinkBase *sink_base = (SinkBase *) a;
455 
456 	VIPS_DEBUG_MSG( "vips_sink_base_progress:\n" );
457 
458 	/* Trigger any eval callbacks on our source image and
459 	 * check for errors.
460 	 */
461 	vips_image_eval( sink_base->im, sink_base->processed );
462 	if( vips_image_iskilled( sink_base->im ) )
463 		return( -1 );
464 
465 	return( 0 );
466 }
467 
468 /**
469  * vips_sink_tile: (method)
470  * @im: scan over this image
471  * @tile_width: tile width
472  * @tile_height: tile height
473  * @start_fn: start sequences with this function
474  * @generate_fn: generate pixels with this function
475  * @stop_fn: stop sequences with this function
476  * @a: user data
477  * @b: user data
478  *
479  * Loops over an image. @generate_fn is called for every
480  * pixel in the image, with
481  * the @reg argument being a region of calculated pixels.
482  *
483  * Each set of pixels is @tile_width by @tile_height pixels (less at the
484  * image edges). This is handy for things like writing a tiled TIFF image,
485  * where tiles have to be generated with a certain size.
486  *
487  * See also: vips_sink(), vips_get_tile_size().
488  *
489  * Returns: 0 on success, or -1 on error.
490  */
491 int
vips_sink_tile(VipsImage * im,int tile_width,int tile_height,VipsStartFn start_fn,VipsGenerateFn generate_fn,VipsStopFn stop_fn,void * a,void * b)492 vips_sink_tile( VipsImage *im,
493 	int tile_width, int tile_height,
494 	VipsStartFn start_fn, VipsGenerateFn generate_fn, VipsStopFn stop_fn,
495 	void *a, void *b )
496 {
497 	Sink sink;
498 	int result;
499 
500 	g_assert( vips_object_sanity( VIPS_OBJECT( im ) ) );
501 
502 	/* We don't use this, but make sure it's set in case any old binaries
503 	 * are expecting it.
504 	 */
505 	im->Bbits = vips_format_sizeof( im->BandFmt ) << 3;
506 
507 	if( sink_init( &sink, im, start_fn, generate_fn, stop_fn, a, b ) )
508 		return( -1 );
509 
510 	if( tile_width > 0 ) {
511 		sink.sink_base.tile_width = tile_width;
512 		sink.sink_base.tile_height = tile_height;
513 	}
514 
515 	/* vips_sink_base_progress() signals progress on im, so we have to do
516 	 * pre/post on that too.
517 	 */
518 	vips_image_preeval( im );
519 
520 	sink_area_position( sink.area, 0, sink.sink_base.n_lines );
521 	result = vips_threadpool_run( im,
522 		vips_sink_thread_state_new,
523 		sink_area_allocate_fn,
524 		sink_work,
525 		vips_sink_base_progress,
526 		&sink );
527 
528 	vips_image_posteval( im );
529 
530 	sink_free( &sink );
531 
532 	return( result );
533 }
534 
535 /**
536  * vips_sink: (method)
537  * @im: scan over this image
538  * @start_fn: start sequences with this function
539  * @generate_fn: generate pixels with this function
540  * @stop_fn: stop sequences with this function
541  * @a: user data
542  * @b: user data
543  *
544  * Loops over an image. @generate_fn is called for every pixel in
545  * the image, with
546  * the @reg argument being a region of calculated pixels. vips_sink() is
547  * used to implement operations like vips_avg() which have no image output.
548  *
549  * Each set of pixels is sized according to the requirements of the image
550  * pipeline that generated @im.
551  *
552  * See also: vips_image_generate(), vips_image_new().
553  *
554  * Returns: 0 on success, or -1 on error.
555  */
556 int
vips_sink(VipsImage * im,VipsStartFn start_fn,VipsGenerateFn generate_fn,VipsStopFn stop_fn,void * a,void * b)557 vips_sink( VipsImage *im,
558 	VipsStartFn start_fn, VipsGenerateFn generate_fn, VipsStopFn stop_fn,
559 	void *a, void *b )
560 {
561 	return( vips_sink_tile( im, -1, -1,
562 		start_fn, generate_fn, stop_fn, a, b ) );
563 }
564