1 /*
2  * * Copyright (C) 2006-2011 Anders Brander <anders@brander.dk>,
3  * * Anders Kvist <akv@lnxbx.dk> and Klaus Post <klauspost@gmail.com>
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18  */
19 
20 #include "rs-io.h"
21 
22 static GStaticMutex init_lock = G_STATIC_MUTEX_INIT;
23 static GAsyncQueue *queue = NULL;
24 static GStaticRecMutex io_lock = G_STATIC_REC_MUTEX_INIT;
25 static gboolean pause_queue = FALSE;
26 static gint queue_active_count = 0;
27 static GStaticMutex count_lock = G_STATIC_MUTEX_INIT;
28 
29 
30 static gint
queue_sort(gconstpointer a,gconstpointer b,gpointer user_data)31 queue_sort(gconstpointer a, gconstpointer b, gpointer user_data)
32 {
33 	gint id1 = 0;
34 	gint id2 = 0;
35 
36 	if (a)
37 		id1 = RS_IO_JOB(a)->priority;
38 	if (b)
39 		id2 = RS_IO_JOB(b)->priority;
40 
41 	return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
42 }
43 
44 static gpointer
queue_worker(gpointer data)45 queue_worker(gpointer data)
46 {
47 	GAsyncQueue *queue = data;
48 	RSIoJob *job;
49 
50 	while (1)
51 	{
52 		if (pause_queue)
53 			g_usleep(1000);
54 		else
55 		{
56 			g_static_mutex_lock(&count_lock);
57 			job = g_async_queue_try_pop(queue);
58 			if (job)
59 				queue_active_count++;
60 			g_static_mutex_unlock(&count_lock);
61 
62 			/* If we somehow got NULL, continue. I'm not sure this will ever happen, but this is better than random segfaults :) */
63 			if (job)
64 			{
65 				rs_io_job_execute(job);
66 				rs_io_job_do_callback(job);
67 				g_static_mutex_lock(&count_lock);
68 				queue_active_count--;
69 				g_static_mutex_unlock(&count_lock);
70 			}
71 			else
72 			{
73 				/* Sleep 1 ms */
74 				g_usleep(1000);
75 			}
76 		}
77 	}
78 
79 	return NULL;
80 }
81 
82 static void
init(void)83 init(void)
84 {
85 	int i;
86 	g_static_mutex_lock(&init_lock);
87 	if (!queue)
88 	{
89 		queue = g_async_queue_new();
90 		for (i = 0; i < rs_get_number_of_processor_cores(); i++)
91 			g_thread_create_full(queue_worker, queue, 0, FALSE, FALSE, G_THREAD_PRIORITY_LOW, NULL);
92 	}
93 	g_static_mutex_unlock(&init_lock);
94 }
95 
96 /**
97  * Add a RSIoJob to be executed later
98  * @param job A RSIoJob. This will be unreffed upon completion
99  * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
100  * @param priority Lower value means higher priority
101  * @param user_data A pointer to pass to the callback
102  */
103 void
rs_io_idle_add_job(RSIoJob * job,gint idle_class,gint priority,gpointer user_data)104 rs_io_idle_add_job(RSIoJob *job, gint idle_class, gint priority, gpointer user_data)
105 {
106 	g_assert(RS_IS_IO_JOB(job));
107 
108 	job->idle_class = idle_class;
109 	job->priority = priority;
110 	job->user_data = user_data;
111 
112 	g_async_queue_push_sorted(queue, job, queue_sort, NULL);
113 }
114 
115 /**
116  * Prefetch a file
117  * @param path Absolute path to a file to prefetch
118  * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
119  * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
120  */
121 const RSIoJob *
rs_io_idle_prefetch_file(const gchar * path,gint idle_class)122 rs_io_idle_prefetch_file(const gchar *path, gint idle_class)
123 {
124 	init();
125 
126 	RSIoJob *job = rs_io_job_prefetch_new(path);
127 	rs_io_idle_add_job(job, idle_class, 20, NULL);
128 
129 	return job;
130 }
131 
132 /**
133  * Load metadata belonging to a photo
134  * @param path Absolute path to a photo
135  * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
136  * @param callback A callback to call when the data is ready or NULL
137  * @param user_data Data to pass to the callback
138  * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
139  */
140 const RSIoJob *
rs_io_idle_read_metadata(const gchar * path,gint idle_class,RSGotMetadataCB callback,gpointer user_data)141 rs_io_idle_read_metadata(const gchar *path, gint idle_class, RSGotMetadataCB callback, gpointer user_data)
142 {
143 	init();
144 
145 	RSIoJob *job = rs_io_job_metadata_new(path, callback);
146 	rs_io_idle_add_job(job, idle_class, 10, user_data);
147 
148 	return job;
149 }
150 
151 /**
152  * Compute a "Rawstudio checksum" of a file
153  * @param path Absolute path to a file
154  * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
155  * @param callback A callback to call when the data is ready or NULL
156  * @param user_data Data to pass to the callback
157  * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
158  */
159 const RSIoJob *
rs_io_idle_read_checksum(const gchar * path,gint idle_class,RSGotChecksumCB callback,gpointer user_data)160 rs_io_idle_read_checksum(const gchar *path, gint idle_class, RSGotChecksumCB callback, gpointer user_data)
161 {
162 	init();
163 
164 	RSIoJob *job = rs_io_job_checksum_new(path, callback);
165 	rs_io_idle_add_job(job, idle_class, 30, user_data);
166 
167 	return job;
168 }
169 
170 /**
171  * Restore tags of a new directory or add tags to a photo
172  * @param filename Absolute path to a file to tags to
173  * @param tag_id The id of the tag to add.
174  * @param auto_tag Is the tag an automatically generated tag
175  * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
176  * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
177  */
178 const RSIoJob *
rs_io_idle_add_tag(const gchar * filename,gint tag_id,gboolean auto_tag,gint idle_class)179 rs_io_idle_add_tag(const gchar *filename, gint tag_id, gboolean auto_tag, gint idle_class)
180 {
181 	init();
182 
183 	RSIoJob *job = rs_io_job_tagging_new(filename, tag_id, auto_tag);
184 	rs_io_idle_add_job(job, idle_class, 50, NULL);
185 
186 	return job;
187 }
188 
189 /**
190  * Restore tags of a new directory or add tags to a photo
191  * @param path Absolute path to a directory to restore tags to
192  * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
193  * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
194  */
195 const RSIoJob *
rs_io_idle_restore_tags(const gchar * path,gint idle_class)196 rs_io_idle_restore_tags(const gchar *path, gint idle_class)
197 {
198 	init();
199 
200 	RSIoJob *job = rs_io_job_tagging_new(path, -1, FALSE);
201 	rs_io_idle_add_job(job, idle_class, 50, NULL);
202 
203 	return job;
204 }
205 
206 /**
207  * Cancel a complete class of idle requests
208  * @param idle_class The class identifier
209  */
210 void
rs_io_idle_cancel_class(gint idle_class)211 rs_io_idle_cancel_class(gint idle_class)
212 {
213 	/* This behaves like rs_io_idle_cancel_class(), please see comments there */
214 	RSIoJob *current_job;
215 	RSIoJob *marker_job = rs_io_job_new();
216 
217 	init();
218 
219 	g_async_queue_lock(queue);
220 
221 	/* Put a marker in the queue, we will rotate the complete queue, so we have to know when we're around */
222 	g_async_queue_push_unlocked(queue, marker_job);
223 
224 	while((current_job = g_async_queue_pop_unlocked(queue)))
225 	{
226 		/* If current job matches marker, we're done */
227 		if (current_job == marker_job)
228 			break;
229 
230 		/* Of the job's idle_class doesn't match the class to cancel, we put the job back in the queue */
231 		if (current_job->idle_class != idle_class)
232 		{
233 			g_async_queue_push_unlocked(queue, current_job);
234 		}
235 	}
236 
237 	/* Make sure the queue is sorted */
238 	g_async_queue_sort_unlocked(queue, queue_sort, NULL);
239 
240 	g_async_queue_unlock(queue);
241 
242 	g_object_unref(marker_job);
243 }
244 
245 /**
246  * Cancel an idle request
247  * @param request_id A request_id as returned by rs_io_idle_read_complete_file()
248  */
249 void
rs_io_idle_cancel(RSIoJob * job)250 rs_io_idle_cancel(RSIoJob *job)
251 {
252 	/* This behaves like rs_io_idle_cancel_class(), please see comments there */
253 	RSIoJob *current_job;
254 	RSIoJob *marker_job = rs_io_job_new();
255 
256 	init();
257 
258 	g_async_queue_lock(queue);
259 
260 	/* Put a marker in the queue, we will rotate the complete queue, so we have to know when we're around */
261 	g_async_queue_push_unlocked(queue, marker_job);
262 
263 	while((current_job = g_async_queue_pop_unlocked(queue)))
264 	{
265 		/* If current job matches marker, we're done */
266 		if (current_job == marker_job)
267 			break;
268 
269 		if (current_job != job)
270 			g_async_queue_push_unlocked(queue, current_job);
271 	}
272 
273 	/* Make sure the queue is sorted */
274 	g_async_queue_sort_unlocked(queue, queue_sort, NULL);
275 
276 	g_async_queue_unlock(queue);
277 
278 	g_object_unref(marker_job);
279 }
280 
281 /**
282  * Aquire the IO lock
283  */
284 void
rs_io_lock(void)285 rs_io_lock(void)
286 {
287 	g_static_rec_mutex_lock(&io_lock);
288 }
289 
290 /**
291  * Release the IO lock
292  */
293 void
rs_io_unlock(void)294 rs_io_unlock(void)
295 {
296 	g_static_rec_mutex_unlock(&io_lock);
297 }
298 
299 /**
300  * Pause the worker threads
301  */
302 void
rs_io_idle_pause(void)303 rs_io_idle_pause(void)
304 {
305 	pause_queue = TRUE;
306 }
307 
308 /**
309  * Unpause the worker threads
310  */
311 void
rs_io_idle_unpause(void)312 rs_io_idle_unpause(void)
313 {
314 	pause_queue = FALSE;
315 }
316 
317 /**
318  * Returns the number of jobs left
319  */
320 gint
rs_io_get_jobs_left(void)321 rs_io_get_jobs_left(void)
322 {
323 	g_static_mutex_lock(&count_lock);
324 	gint left = g_async_queue_length(queue) + queue_active_count;
325 	g_static_mutex_unlock(&count_lock);
326 	return left;
327 }