1 /*
2 * Copyright (C) 2005 - 2007 Murray Cumming <murrayc@murrayc.com>
3 * Copyright (C) 2005 - 2011 Vivien Malerba <malerba@gnome-db.org>
4 * Copyright (C) 2010 David King <davidk@openismus.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 */
20
21 #include <string.h>
22 #include <tools/gda-threader.h>
23 #include <stdlib.h>
24
25 /*
26 * Main static functions
27 */
28 static void gda_threader_class_init (GdaThreaderClass * class);
29 static void gda_threader_init (GdaThreader * srv);
30 static void gda_threader_dispose (GObject * object);
31 static void gda_threader_finalize (GObject * object);
32
33 /* get a pointer to the parents to be able to call their destructor */
34 static GObjectClass *parent_class = NULL;
35
36 /* signals */
37 enum
38 {
39 FINISHED,
40 CANCELLED,
41 LAST_SIGNAL
42 };
43
44 static gint gda_threader_signals[LAST_SIGNAL] = { 0, 0};
45
46 struct _GdaThreaderPrivate
47 {
48 guint next_job;
49 guint nb_jobs;
50 GHashTable *jobs; /* key = job id, value=ThreadJob for the job */
51 GAsyncQueue *queue;
52 guint idle_func_id;
53 };
54
55 typedef struct {
56 GdaThreader *thread; /* object to which this jobs belongs to */
57 guint id;
58 GThread *g_thread; /* sub thread for that job */
59 GThreadFunc func; /* function called in the sub thread */
60 gpointer func_data;/* argument to the function called in the sub thread */
61
62 gboolean cancelled; /* TRUE if this job has been cancelled */
63 GdaThreaderFunc normal_end_cb; /* called when job->func ends */
64 GdaThreaderFunc cancel_end_cb; /* called when job->func ends and this job has been cancelled */
65 } ThreadJob;
66
67 GType
gda_threader_get_type(void)68 gda_threader_get_type (void)
69 {
70 static GType type = 0;
71
72 if (G_UNLIKELY (type == 0)) {
73 static GMutex registering;
74 static const GTypeInfo info = {
75 sizeof (GdaThreaderClass),
76 (GBaseInitFunc) NULL,
77 (GBaseFinalizeFunc) NULL,
78 (GClassInitFunc) gda_threader_class_init,
79 NULL,
80 NULL,
81 sizeof (GdaThreader),
82 0,
83 (GInstanceInitFunc) gda_threader_init,
84 0
85 };
86
87 g_mutex_lock (®istering);
88 if (type == 0)
89 type = g_type_register_static (G_TYPE_OBJECT, "GdaThreader", &info, 0);
90 g_mutex_unlock (®istering);
91 }
92
93 return type;
94 }
95
96 static void
gda_threader_class_init(GdaThreaderClass * class)97 gda_threader_class_init (GdaThreaderClass * class)
98 {
99 GObjectClass *object_class = G_OBJECT_CLASS (class);
100
101 parent_class = g_type_class_peek_parent (class);
102
103 gda_threader_signals[FINISHED] =
104 g_signal_new ("finished",
105 G_TYPE_FROM_CLASS (object_class),
106 G_SIGNAL_RUN_FIRST,
107 G_STRUCT_OFFSET (GdaThreaderClass, finished),
108 NULL, NULL,
109 g_cclosure_marshal_VOID__UINT_POINTER, G_TYPE_NONE,
110 2, G_TYPE_UINT, G_TYPE_POINTER);
111 gda_threader_signals[CANCELLED] =
112 g_signal_new ("cancelled",
113 G_TYPE_FROM_CLASS (object_class),
114 G_SIGNAL_RUN_FIRST,
115 G_STRUCT_OFFSET (GdaThreaderClass, cancelled),
116 NULL, NULL,
117 g_cclosure_marshal_VOID__UINT_POINTER, G_TYPE_NONE,
118 2, G_TYPE_UINT, G_TYPE_POINTER);
119 class->finished = NULL;
120 class->cancelled = NULL;
121
122 object_class->dispose = gda_threader_dispose;
123 object_class->finalize = gda_threader_finalize;
124 }
125
126 static void
gda_threader_init(GdaThreader * thread)127 gda_threader_init (GdaThreader * thread)
128 {
129 thread->priv = g_new0 (GdaThreaderPrivate, 1);
130
131 if (! g_thread_supported())
132 g_warning ("You must initialize the multi threads environment using g_thread_init()");
133
134 thread->priv->next_job = 1;
135 thread->priv->nb_jobs = 0;
136 thread->priv->jobs = g_hash_table_new (NULL, NULL);
137 thread->priv->queue = g_async_queue_new ();
138 thread->priv->idle_func_id = 0;
139 }
140
141 /**
142 * gda_threader_new
143 *
144 * Creates a new GdaThreader object.
145 *
146 * Returns: the newly created object
147 */
148 GObject *
gda_threader_new(void)149 gda_threader_new (void)
150 {
151 GObject *obj;
152
153 GdaThreader *thread;
154 obj = g_object_new (GDA_TYPE_THREADER, NULL);
155 thread = GDA_THREADER (obj);
156
157 return obj;
158 }
159
160
161 static void
gda_threader_dispose(GObject * object)162 gda_threader_dispose (GObject * object)
163 {
164 GdaThreader *thread;
165
166 g_return_if_fail (object != NULL);
167 g_return_if_fail (GDA_IS_THREADER (object));
168
169 thread = GDA_THREADER (object);
170 if (thread->priv) {
171 if (thread->priv->idle_func_id) {
172 g_idle_remove_by_data (thread);
173 thread->priv->idle_func_id = 0;
174 }
175
176 if (thread->priv->nb_jobs > 0) {
177 /*g_warning ("There are still some running threads, some memory will be leaked!");*/
178 thread->priv->nb_jobs = 0;
179 }
180
181 if (thread->priv->jobs) {
182 g_hash_table_destroy (thread->priv->jobs);
183 thread->priv->jobs = NULL;
184 }
185 }
186
187 /* parent class */
188 parent_class->dispose (object);
189 }
190
191 static void
gda_threader_finalize(GObject * object)192 gda_threader_finalize (GObject * object)
193 {
194 GdaThreader *thread;
195
196 g_return_if_fail (object != NULL);
197 g_return_if_fail (GDA_IS_THREADER (object));
198
199 thread = GDA_THREADER (object);
200 if (thread->priv) {
201 g_free (thread->priv);
202 thread->priv = NULL;
203 }
204
205 /* parent class */
206 parent_class->finalize (object);
207 }
208
209 static gboolean idle_catch_threads_end (GdaThreader *thread);
210 static gpointer spawn_new_thread (ThreadJob *job);
211
212 /**
213 * gda_threader_start_thread
214 * @thread: a #GdaThreader object
215 * @func: the function to be called in the newly created thread
216 * @func_arg: @func's argument
217 * @ok_callback: callback called when @func terminates
218 * @cancel_callback: callback called when @func terminates and the job has been cancelled
219 * @error: place to store an error when creating the thread or %NULL
220 *
221 * Starts a new worker thread, executing the @func function with the @func_arg argument. It is possible
222 * to request the worker thread to terminates using gda_threader_cancel().
223 *
224 * Returns: the id of the new job executed in another thread.
225 */
226 guint
gda_threader_start_thread(GdaThreader * thread,GThreadFunc func,gpointer func_arg,GdaThreaderFunc ok_callback,GdaThreaderFunc cancel_callback,GError ** error)227 gda_threader_start_thread (GdaThreader *thread, GThreadFunc func, gpointer func_arg,
228 GdaThreaderFunc ok_callback, GdaThreaderFunc cancel_callback,
229 GError **error)
230 {
231 ThreadJob *job;
232
233 g_return_val_if_fail (thread && GDA_IS_THREADER (thread), 0);
234 g_return_val_if_fail (func, 0);
235
236 job = g_new0 (ThreadJob, 1);
237 job->thread = thread;
238 job->func = func;
239 job->func_data = func_arg;
240 job->id = thread->priv->next_job ++;
241 job->cancelled = FALSE;
242 job->normal_end_cb = ok_callback;
243 job->cancel_end_cb = cancel_callback;
244
245 /* g_print ("** New thread starting ..., job = %d\n", job->id); */
246 job->g_thread = g_thread_create ((GThreadFunc) spawn_new_thread, job, FALSE, error);
247
248 if (!job->g_thread) {
249 g_free (job);
250 return 0;
251 }
252 else {
253 thread->priv->nb_jobs ++;
254
255 g_hash_table_insert (thread->priv->jobs, GUINT_TO_POINTER (job->id), job);
256 if (! thread->priv->idle_func_id)
257 thread->priv->idle_func_id = g_timeout_add_full (G_PRIORITY_HIGH_IDLE, 150,
258 (GSourceFunc) idle_catch_threads_end,
259 thread, NULL);
260
261 return job->id;
262 }
263 }
264
265
266 /* WARNING: called in another thread */
267 static gpointer
spawn_new_thread(ThreadJob * job)268 spawn_new_thread (ThreadJob *job)
269 {
270 GAsyncQueue *queue;
271
272 queue = job->thread->priv->queue;
273 g_async_queue_ref (queue);
274
275 /* call job's real function */
276 /* g_print ("** T: Calling job function for job %d\n", job->id); */
277 (job->func) (job->func_data);
278
279 /* push result when finished */
280 /* g_print ("** T: Pushing result for job %d\n", job->id); */
281 g_async_queue_push (queue, job);
282
283 /* terminate thread */
284 g_async_queue_unref (queue);
285 /* g_print ("** T: End of thread for job %d\n", job->id); */
286 g_thread_exit (job);
287
288 return job;
289 }
290
291 static gboolean
idle_catch_threads_end(GdaThreader * thread)292 idle_catch_threads_end (GdaThreader *thread)
293 {
294 ThreadJob *job;
295 gboolean retval = TRUE;
296
297 job = g_async_queue_try_pop (thread->priv->queue);
298 if (job) {
299 /* that job has finished */
300 /* g_print ("** Signaling end of job %d\n", job->id); */
301
302 thread->priv->nb_jobs --;
303 if (thread->priv->nb_jobs == 0) {
304 retval = FALSE;
305 thread->priv->idle_func_id = 0;
306 }
307 g_hash_table_remove (thread->priv->jobs, GUINT_TO_POINTER (job->id));
308
309 if (job->cancelled) {
310 if (job->cancel_end_cb)
311 job->cancel_end_cb (thread, job->id, job->func_data);
312 }
313 else {
314 g_signal_emit (thread, gda_threader_signals [FINISHED], 0, job->id, job->func_data);
315 if (job->normal_end_cb)
316 job->normal_end_cb (thread, job->id, job->func_data);
317 }
318
319 g_free (job);
320 }
321
322 return retval;
323 }
324
325 /**
326 * gda_threader_cancel
327 */
328 void
gda_threader_cancel(GdaThreader * thread,guint job_id)329 gda_threader_cancel (GdaThreader *thread, guint job_id)
330 {
331 ThreadJob *job;
332
333 g_return_if_fail (thread && GDA_IS_THREADER (thread));
334 job = g_hash_table_lookup (thread->priv->jobs, GUINT_TO_POINTER (job_id));
335 if (!job)
336 g_warning ("Could not find threaded job %d", job_id);
337 else {
338 job->cancelled = TRUE;
339 g_signal_emit (thread, gda_threader_signals [CANCELLED], 0, job->id, job->func_data);
340 }
341 }
342