1 /*
2 * * Copyright (C) 2006-2011 Anders Brander <anders@brander.dk>,
3 * * Anders Kvist <akv@lnxbx.dk> and Klaus Post <klauspost@gmail.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 */
19
20 #include "rs-io.h"
21
22 static GStaticMutex init_lock = G_STATIC_MUTEX_INIT;
23 static GAsyncQueue *queue = NULL;
24 static GStaticRecMutex io_lock = G_STATIC_REC_MUTEX_INIT;
25 static gboolean pause_queue = FALSE;
26 static gint queue_active_count = 0;
27 static GStaticMutex count_lock = G_STATIC_MUTEX_INIT;
28
29
30 static gint
queue_sort(gconstpointer a,gconstpointer b,gpointer user_data)31 queue_sort(gconstpointer a, gconstpointer b, gpointer user_data)
32 {
33 gint id1 = 0;
34 gint id2 = 0;
35
36 if (a)
37 id1 = RS_IO_JOB(a)->priority;
38 if (b)
39 id2 = RS_IO_JOB(b)->priority;
40
41 return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
42 }
43
44 static gpointer
queue_worker(gpointer data)45 queue_worker(gpointer data)
46 {
47 GAsyncQueue *queue = data;
48 RSIoJob *job;
49
50 while (1)
51 {
52 if (pause_queue)
53 g_usleep(1000);
54 else
55 {
56 g_static_mutex_lock(&count_lock);
57 job = g_async_queue_try_pop(queue);
58 if (job)
59 queue_active_count++;
60 g_static_mutex_unlock(&count_lock);
61
62 /* If we somehow got NULL, continue. I'm not sure this will ever happen, but this is better than random segfaults :) */
63 if (job)
64 {
65 rs_io_job_execute(job);
66 rs_io_job_do_callback(job);
67 g_static_mutex_lock(&count_lock);
68 queue_active_count--;
69 g_static_mutex_unlock(&count_lock);
70 }
71 else
72 {
73 /* Sleep 1 ms */
74 g_usleep(1000);
75 }
76 }
77 }
78
79 return NULL;
80 }
81
82 static void
init(void)83 init(void)
84 {
85 int i;
86 g_static_mutex_lock(&init_lock);
87 if (!queue)
88 {
89 queue = g_async_queue_new();
90 for (i = 0; i < rs_get_number_of_processor_cores(); i++)
91 g_thread_create_full(queue_worker, queue, 0, FALSE, FALSE, G_THREAD_PRIORITY_LOW, NULL);
92 }
93 g_static_mutex_unlock(&init_lock);
94 }
95
96 /**
97 * Add a RSIoJob to be executed later
98 * @param job A RSIoJob. This will be unreffed upon completion
99 * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
100 * @param priority Lower value means higher priority
101 * @param user_data A pointer to pass to the callback
102 */
103 void
rs_io_idle_add_job(RSIoJob * job,gint idle_class,gint priority,gpointer user_data)104 rs_io_idle_add_job(RSIoJob *job, gint idle_class, gint priority, gpointer user_data)
105 {
106 g_assert(RS_IS_IO_JOB(job));
107
108 job->idle_class = idle_class;
109 job->priority = priority;
110 job->user_data = user_data;
111
112 g_async_queue_push_sorted(queue, job, queue_sort, NULL);
113 }
114
115 /**
116 * Prefetch a file
117 * @param path Absolute path to a file to prefetch
118 * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
119 * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
120 */
121 const RSIoJob *
rs_io_idle_prefetch_file(const gchar * path,gint idle_class)122 rs_io_idle_prefetch_file(const gchar *path, gint idle_class)
123 {
124 init();
125
126 RSIoJob *job = rs_io_job_prefetch_new(path);
127 rs_io_idle_add_job(job, idle_class, 20, NULL);
128
129 return job;
130 }
131
132 /**
133 * Load metadata belonging to a photo
134 * @param path Absolute path to a photo
135 * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
136 * @param callback A callback to call when the data is ready or NULL
137 * @param user_data Data to pass to the callback
138 * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
139 */
140 const RSIoJob *
rs_io_idle_read_metadata(const gchar * path,gint idle_class,RSGotMetadataCB callback,gpointer user_data)141 rs_io_idle_read_metadata(const gchar *path, gint idle_class, RSGotMetadataCB callback, gpointer user_data)
142 {
143 init();
144
145 RSIoJob *job = rs_io_job_metadata_new(path, callback);
146 rs_io_idle_add_job(job, idle_class, 10, user_data);
147
148 return job;
149 }
150
151 /**
152 * Compute a "Rawstudio checksum" of a file
153 * @param path Absolute path to a file
154 * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
155 * @param callback A callback to call when the data is ready or NULL
156 * @param user_data Data to pass to the callback
157 * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
158 */
159 const RSIoJob *
rs_io_idle_read_checksum(const gchar * path,gint idle_class,RSGotChecksumCB callback,gpointer user_data)160 rs_io_idle_read_checksum(const gchar *path, gint idle_class, RSGotChecksumCB callback, gpointer user_data)
161 {
162 init();
163
164 RSIoJob *job = rs_io_job_checksum_new(path, callback);
165 rs_io_idle_add_job(job, idle_class, 30, user_data);
166
167 return job;
168 }
169
170 /**
171 * Restore tags of a new directory or add tags to a photo
172 * @param filename Absolute path to a file to tags to
173 * @param tag_id The id of the tag to add.
174 * @param auto_tag Is the tag an automatically generated tag
175 * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
176 * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
177 */
178 const RSIoJob *
rs_io_idle_add_tag(const gchar * filename,gint tag_id,gboolean auto_tag,gint idle_class)179 rs_io_idle_add_tag(const gchar *filename, gint tag_id, gboolean auto_tag, gint idle_class)
180 {
181 init();
182
183 RSIoJob *job = rs_io_job_tagging_new(filename, tag_id, auto_tag);
184 rs_io_idle_add_job(job, idle_class, 50, NULL);
185
186 return job;
187 }
188
189 /**
190 * Restore tags of a new directory or add tags to a photo
191 * @param path Absolute path to a directory to restore tags to
192 * @param idle_class A user defined variable, this can be used with rs_io_idle_cancel_class() to cancel a batch of queued reads
193 * @return A pointer to a RSIoJob, this can be used with rs_io_idle_cancel()
194 */
195 const RSIoJob *
rs_io_idle_restore_tags(const gchar * path,gint idle_class)196 rs_io_idle_restore_tags(const gchar *path, gint idle_class)
197 {
198 init();
199
200 RSIoJob *job = rs_io_job_tagging_new(path, -1, FALSE);
201 rs_io_idle_add_job(job, idle_class, 50, NULL);
202
203 return job;
204 }
205
206 /**
207 * Cancel a complete class of idle requests
208 * @param idle_class The class identifier
209 */
210 void
rs_io_idle_cancel_class(gint idle_class)211 rs_io_idle_cancel_class(gint idle_class)
212 {
213 /* This behaves like rs_io_idle_cancel_class(), please see comments there */
214 RSIoJob *current_job;
215 RSIoJob *marker_job = rs_io_job_new();
216
217 init();
218
219 g_async_queue_lock(queue);
220
221 /* Put a marker in the queue, we will rotate the complete queue, so we have to know when we're around */
222 g_async_queue_push_unlocked(queue, marker_job);
223
224 while((current_job = g_async_queue_pop_unlocked(queue)))
225 {
226 /* If current job matches marker, we're done */
227 if (current_job == marker_job)
228 break;
229
230 /* Of the job's idle_class doesn't match the class to cancel, we put the job back in the queue */
231 if (current_job->idle_class != idle_class)
232 {
233 g_async_queue_push_unlocked(queue, current_job);
234 }
235 }
236
237 /* Make sure the queue is sorted */
238 g_async_queue_sort_unlocked(queue, queue_sort, NULL);
239
240 g_async_queue_unlock(queue);
241
242 g_object_unref(marker_job);
243 }
244
245 /**
246 * Cancel an idle request
247 * @param request_id A request_id as returned by rs_io_idle_read_complete_file()
248 */
249 void
rs_io_idle_cancel(RSIoJob * job)250 rs_io_idle_cancel(RSIoJob *job)
251 {
252 /* This behaves like rs_io_idle_cancel_class(), please see comments there */
253 RSIoJob *current_job;
254 RSIoJob *marker_job = rs_io_job_new();
255
256 init();
257
258 g_async_queue_lock(queue);
259
260 /* Put a marker in the queue, we will rotate the complete queue, so we have to know when we're around */
261 g_async_queue_push_unlocked(queue, marker_job);
262
263 while((current_job = g_async_queue_pop_unlocked(queue)))
264 {
265 /* If current job matches marker, we're done */
266 if (current_job == marker_job)
267 break;
268
269 if (current_job != job)
270 g_async_queue_push_unlocked(queue, current_job);
271 }
272
273 /* Make sure the queue is sorted */
274 g_async_queue_sort_unlocked(queue, queue_sort, NULL);
275
276 g_async_queue_unlock(queue);
277
278 g_object_unref(marker_job);
279 }
280
281 /**
282 * Aquire the IO lock
283 */
284 void
rs_io_lock(void)285 rs_io_lock(void)
286 {
287 g_static_rec_mutex_lock(&io_lock);
288 }
289
290 /**
291 * Release the IO lock
292 */
293 void
rs_io_unlock(void)294 rs_io_unlock(void)
295 {
296 g_static_rec_mutex_unlock(&io_lock);
297 }
298
299 /**
300 * Pause the worker threads
301 */
302 void
rs_io_idle_pause(void)303 rs_io_idle_pause(void)
304 {
305 pause_queue = TRUE;
306 }
307
308 /**
309 * Unpause the worker threads
310 */
311 void
rs_io_idle_unpause(void)312 rs_io_idle_unpause(void)
313 {
314 pause_queue = FALSE;
315 }
316
317 /**
318 * Returns the number of jobs left
319 */
320 gint
rs_io_get_jobs_left(void)321 rs_io_get_jobs_left(void)
322 {
323 g_static_mutex_lock(&count_lock);
324 gint left = g_async_queue_length(queue) + queue_active_count;
325 g_static_mutex_unlock(&count_lock);
326 return left;
327 }