1 /*
2  * Copyright (C) 2005 - 2007 Murray Cumming <murrayc@murrayc.com>
3  * Copyright (C) 2005 - 2011 Vivien Malerba <malerba@gnome-db.org>
4  * Copyright (C) 2010 David King <davidk@openismus.com>
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19  */
20 
21 #include <string.h>
22 #include <tools/gda-threader.h>
23 #include <stdlib.h>
24 
25 /*
26  * Main static functions
27  */
28 static void gda_threader_class_init (GdaThreaderClass * class);
29 static void gda_threader_init (GdaThreader * srv);
30 static void gda_threader_dispose (GObject   * object);
31 static void gda_threader_finalize (GObject   * object);
32 
33 /* get a pointer to the parents to be able to call their destructor */
34 static GObjectClass  *parent_class = NULL;
35 
36 /* signals */
37 enum
38 {
39 	FINISHED,
40 	CANCELLED,
41 	LAST_SIGNAL
42 };
43 
44 static gint gda_threader_signals[LAST_SIGNAL] = { 0, 0};
45 
46 struct _GdaThreaderPrivate
47 {
48 	guint        next_job;
49 	guint        nb_jobs;
50 	GHashTable  *jobs; /* key = job id, value=ThreadJob for the job */
51 	GAsyncQueue *queue;
52 	guint        idle_func_id;
53 };
54 
55 typedef struct {
56 	GdaThreader    *thread; /* object to which this jobs belongs to */
57 	guint           id;
58 	GThread        *g_thread; /* sub thread for that job */
59 	GThreadFunc     func;     /* function called in the sub thread */
60 	gpointer        func_data;/* argument to the function called in the sub thread */
61 
62 	gboolean        cancelled; /* TRUE if this job has been cancelled */
63 	GdaThreaderFunc normal_end_cb; /* called when job->func ends */
64 	GdaThreaderFunc cancel_end_cb; /* called when job->func ends and this job has been cancelled */
65 } ThreadJob;
66 
67 GType
gda_threader_get_type(void)68 gda_threader_get_type (void)
69 {
70 	static GType type = 0;
71 
72 	if (G_UNLIKELY (type == 0)) {
73 		static GMutex registering;
74 		static const GTypeInfo info = {
75 			sizeof (GdaThreaderClass),
76 			(GBaseInitFunc) NULL,
77 			(GBaseFinalizeFunc) NULL,
78 			(GClassInitFunc) gda_threader_class_init,
79 			NULL,
80 			NULL,
81 			sizeof (GdaThreader),
82 			0,
83 			(GInstanceInitFunc) gda_threader_init,
84 			0
85 		};
86 
87 		g_mutex_lock (&registering);
88 		if (type == 0)
89 			type = g_type_register_static (G_TYPE_OBJECT, "GdaThreader", &info, 0);
90 		g_mutex_unlock (&registering);
91 	}
92 
93 	return type;
94 }
95 
96 static void
gda_threader_class_init(GdaThreaderClass * class)97 gda_threader_class_init (GdaThreaderClass * class)
98 {
99 	GObjectClass   *object_class = G_OBJECT_CLASS (class);
100 
101 	parent_class = g_type_class_peek_parent (class);
102 
103 	gda_threader_signals[FINISHED] =
104 		g_signal_new ("finished",
105 			      G_TYPE_FROM_CLASS (object_class),
106 			      G_SIGNAL_RUN_FIRST,
107 			      G_STRUCT_OFFSET (GdaThreaderClass, finished),
108 			      NULL, NULL,
109 			      g_cclosure_marshal_VOID__UINT_POINTER, G_TYPE_NONE,
110 			      2, G_TYPE_UINT, G_TYPE_POINTER);
111 	gda_threader_signals[CANCELLED] =
112 		g_signal_new ("cancelled",
113 			      G_TYPE_FROM_CLASS (object_class),
114 			      G_SIGNAL_RUN_FIRST,
115 			      G_STRUCT_OFFSET (GdaThreaderClass, cancelled),
116 			      NULL, NULL,
117 			      g_cclosure_marshal_VOID__UINT_POINTER, G_TYPE_NONE,
118 			      2, G_TYPE_UINT, G_TYPE_POINTER);
119 	class->finished = NULL;
120 	class->cancelled = NULL;
121 
122 	object_class->dispose = gda_threader_dispose;
123 	object_class->finalize = gda_threader_finalize;
124 }
125 
126 static void
gda_threader_init(GdaThreader * thread)127 gda_threader_init (GdaThreader * thread)
128 {
129 	thread->priv = g_new0 (GdaThreaderPrivate, 1);
130 
131 	if (! g_thread_supported())
132 		g_warning ("You must initialize the multi threads environment using g_thread_init()");
133 
134 	thread->priv->next_job = 1;
135 	thread->priv->nb_jobs = 0;
136 	thread->priv->jobs = g_hash_table_new (NULL, NULL);
137 	thread->priv->queue = g_async_queue_new ();
138 	thread->priv->idle_func_id = 0;
139 }
140 
141 /**
142  * gda_threader_new
143  *
144  * Creates a new GdaThreader object.
145  *
146  * Returns: the newly created object
147  */
148 GObject *
gda_threader_new(void)149 gda_threader_new (void)
150 {
151 	GObject   *obj;
152 
153 	GdaThreader *thread;
154 	obj = g_object_new (GDA_TYPE_THREADER, NULL);
155 	thread = GDA_THREADER (obj);
156 
157 	return obj;
158 }
159 
160 
161 static void
gda_threader_dispose(GObject * object)162 gda_threader_dispose (GObject   * object)
163 {
164 	GdaThreader *thread;
165 
166 	g_return_if_fail (object != NULL);
167 	g_return_if_fail (GDA_IS_THREADER (object));
168 
169 	thread = GDA_THREADER (object);
170 	if (thread->priv) {
171 		if (thread->priv->idle_func_id) {
172 			g_idle_remove_by_data (thread);
173 			thread->priv->idle_func_id = 0;
174 		}
175 
176 		if (thread->priv->nb_jobs > 0) {
177 			/*g_warning ("There are still some running threads, some memory will be leaked!");*/
178 			thread->priv->nb_jobs = 0;
179 		}
180 
181 		if (thread->priv->jobs) {
182 			g_hash_table_destroy (thread->priv->jobs);
183 			thread->priv->jobs = NULL;
184 		}
185 	}
186 
187 	/* parent class */
188 	parent_class->dispose (object);
189 }
190 
191 static void
gda_threader_finalize(GObject * object)192 gda_threader_finalize (GObject   * object)
193 {
194 	GdaThreader *thread;
195 
196 	g_return_if_fail (object != NULL);
197 	g_return_if_fail (GDA_IS_THREADER (object));
198 
199 	thread = GDA_THREADER (object);
200 	if (thread->priv) {
201 		g_free (thread->priv);
202 		thread->priv = NULL;
203 	}
204 
205 	/* parent class */
206 	parent_class->finalize (object);
207 }
208 
209 static gboolean idle_catch_threads_end (GdaThreader *thread);
210 static gpointer spawn_new_thread (ThreadJob *job);
211 
212 /**
213  * gda_threader_start_thread
214  * @thread: a #GdaThreader object
215  * @func: the function to be called in the newly created thread
216  * @func_arg: @func's argument
217  * @ok_callback: callback called when @func terminates
218  * @cancel_callback: callback called when @func terminates and the job has been cancelled
219  * @error: place to store an error when creating the thread or %NULL
220  *
221  * Starts a new worker thread, executing the @func function with the @func_arg argument. It is possible
222  * to request the worker thread to terminates using gda_threader_cancel().
223  *
224  * Returns: the id of the new job executed in another thread.
225  */
226 guint
gda_threader_start_thread(GdaThreader * thread,GThreadFunc func,gpointer func_arg,GdaThreaderFunc ok_callback,GdaThreaderFunc cancel_callback,GError ** error)227 gda_threader_start_thread (GdaThreader *thread, GThreadFunc func, gpointer func_arg,
228 			   GdaThreaderFunc ok_callback, GdaThreaderFunc cancel_callback,
229 			   GError **error)
230 {
231 	ThreadJob *job;
232 
233 	g_return_val_if_fail (thread && GDA_IS_THREADER (thread), 0);
234 	g_return_val_if_fail (func, 0);
235 
236 	job = g_new0 (ThreadJob, 1);
237 	job->thread = thread;
238 	job->func = func;
239 	job->func_data = func_arg;
240 	job->id = thread->priv->next_job ++;
241 	job->cancelled = FALSE;
242 	job->normal_end_cb = ok_callback;
243 	job->cancel_end_cb = cancel_callback;
244 
245 	/* g_print ("** New thread starting ..., job = %d\n", job->id); */
246 	job->g_thread = g_thread_create ((GThreadFunc) spawn_new_thread, job, FALSE, error);
247 
248 	if (!job->g_thread) {
249 		g_free (job);
250 		return 0;
251 	}
252 	else {
253 		thread->priv->nb_jobs ++;
254 
255 		g_hash_table_insert (thread->priv->jobs, GUINT_TO_POINTER (job->id), job);
256 		if (! thread->priv->idle_func_id)
257 			thread->priv->idle_func_id = g_timeout_add_full (G_PRIORITY_HIGH_IDLE, 150,
258 									 (GSourceFunc) idle_catch_threads_end,
259 									 thread, NULL);
260 
261 		return job->id;
262 	}
263 }
264 
265 
266 /* WARNING: called in another thread */
267 static gpointer
spawn_new_thread(ThreadJob * job)268 spawn_new_thread (ThreadJob *job)
269 {
270 	GAsyncQueue *queue;
271 
272 	queue = job->thread->priv->queue;
273 	g_async_queue_ref (queue);
274 
275 	/* call job's real function */
276 	/* g_print ("** T: Calling job function for job %d\n", job->id); */
277 	(job->func) (job->func_data);
278 
279 	/* push result when finished */
280 	/* g_print ("** T: Pushing result for job %d\n", job->id); */
281 	g_async_queue_push (queue, job);
282 
283 	/* terminate thread */
284 	g_async_queue_unref (queue);
285 	/* g_print ("** T: End of thread for job %d\n", job->id); */
286 	g_thread_exit (job);
287 
288 	return job;
289 }
290 
291 static gboolean
idle_catch_threads_end(GdaThreader * thread)292 idle_catch_threads_end (GdaThreader *thread)
293 {
294 	ThreadJob *job;
295 	gboolean retval = TRUE;
296 
297 	job = g_async_queue_try_pop (thread->priv->queue);
298 	if (job) {
299 		/* that job has finished */
300 		/* g_print ("** Signaling end of job %d\n", job->id); */
301 
302 		thread->priv->nb_jobs --;
303 		if (thread->priv->nb_jobs == 0) {
304 			retval = FALSE;
305 			thread->priv->idle_func_id = 0;
306 		}
307 		g_hash_table_remove (thread->priv->jobs, GUINT_TO_POINTER (job->id));
308 
309 		if (job->cancelled) {
310 			if (job->cancel_end_cb)
311 				job->cancel_end_cb (thread, job->id, job->func_data);
312 		}
313 		else {
314 			g_signal_emit (thread, gda_threader_signals [FINISHED], 0, job->id, job->func_data);
315 			if (job->normal_end_cb)
316 				job->normal_end_cb (thread, job->id, job->func_data);
317 		}
318 
319 		g_free (job);
320 	}
321 
322 	return retval;
323 }
324 
325 /**
326  * gda_threader_cancel
327  */
328 void
gda_threader_cancel(GdaThreader * thread,guint job_id)329 gda_threader_cancel (GdaThreader *thread, guint job_id)
330 {
331 	ThreadJob *job;
332 
333 	g_return_if_fail (thread && GDA_IS_THREADER (thread));
334 	job = g_hash_table_lookup (thread->priv->jobs, GUINT_TO_POINTER (job_id));
335 	if (!job)
336 		g_warning ("Could not find threaded job %d", job_id);
337 	else {
338 		job->cancelled = TRUE;
339 		g_signal_emit (thread, gda_threader_signals [CANCELLED], 0, job->id, job->func_data);
340 	}
341 }
342